1use std::{io::Read, path::Path};
17
18use csv::{Reader, StringRecord};
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21 data::{DEPTH10_LEN, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
22 enums::{BookAction, OrderSide, RecordFlag},
23 identifiers::InstrumentId,
24 types::Quantity,
25};
26#[cfg(feature = "python")]
27use nautilus_model::{
28 data::{Data, OrderBookDeltas, OrderBookDeltas_API},
29 python::data::data_to_pycapsule,
30};
31#[cfg(feature = "python")]
32use pyo3::{PyObject, Python};
33
34use crate::{
35 csv::{
36 create_book_order, create_csv_reader, infer_precision, parse_delta_record,
37 parse_derivative_ticker_record, parse_quote_record, parse_trade_record,
38 record::{
39 TardisBookUpdateRecord, TardisOrderBookSnapshot5Record,
40 TardisOrderBookSnapshot25Record, TardisQuoteRecord, TardisTradeRecord,
41 },
42 },
43 parse::{parse_instrument_id, parse_timestamp},
44};
45
46struct DeltaStreamIterator {
52 reader: Reader<Box<dyn std::io::Read>>,
53 record: StringRecord,
54 buffer: Vec<OrderBookDelta>,
55 chunk_size: usize,
56 instrument_id: Option<InstrumentId>,
57 price_precision: u8,
58 size_precision: u8,
59 last_ts_event: UnixNanos,
60 limit: Option<usize>,
61 records_processed: usize,
62}
63
64impl DeltaStreamIterator {
65 fn new<P: AsRef<Path>>(
71 filepath: P,
72 chunk_size: usize,
73 price_precision: Option<u8>,
74 size_precision: Option<u8>,
75 instrument_id: Option<InstrumentId>,
76 limit: Option<usize>,
77 ) -> anyhow::Result<Self> {
78 let (final_price_precision, final_size_precision) =
79 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
80 (price_prec, size_prec)
82 } else {
83 let mut reader = create_csv_reader(&filepath)?;
85 let mut record = StringRecord::new();
86 let (detected_price, detected_size) =
87 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
88 (
89 price_precision.unwrap_or(detected_price),
90 size_precision.unwrap_or(detected_size),
91 )
92 };
93
94 let reader = create_csv_reader(filepath)?;
95
96 Ok(Self {
97 reader,
98 record: StringRecord::new(),
99 buffer: Vec::with_capacity(chunk_size),
100 chunk_size,
101 instrument_id,
102 price_precision: final_price_precision,
103 size_precision: final_size_precision,
104 last_ts_event: UnixNanos::default(),
105 limit,
106 records_processed: 0,
107 })
108 }
109
110 fn detect_precision_from_sample(
111 reader: &mut Reader<Box<dyn std::io::Read>>,
112 record: &mut StringRecord,
113 sample_size: usize,
114 ) -> anyhow::Result<(u8, u8)> {
115 let mut max_price_precision = 0u8;
116 let mut max_size_precision = 0u8;
117 let mut records_scanned = 0;
118
119 while records_scanned < sample_size {
120 match reader.read_record(record) {
121 Ok(true) => {
122 if let Ok(data) = record.deserialize::<TardisBookUpdateRecord>(None) {
123 max_price_precision = max_price_precision.max(infer_precision(data.price));
124 max_size_precision = max_size_precision.max(infer_precision(data.amount));
125 records_scanned += 1;
126 }
127 }
128 Ok(false) => break, Err(_) => records_scanned += 1, }
131 }
132
133 Ok((max_price_precision, max_size_precision))
134 }
135}
136
137impl Iterator for DeltaStreamIterator {
138 type Item = anyhow::Result<Vec<OrderBookDelta>>;
139
140 fn next(&mut self) -> Option<Self::Item> {
141 if let Some(limit) = self.limit
142 && self.records_processed >= limit
143 {
144 return None;
145 }
146
147 self.buffer.clear();
148 let mut records_read = 0;
149
150 while records_read < self.chunk_size {
151 match self.reader.read_record(&mut self.record) {
152 Ok(true) => {
153 match self.record.deserialize::<TardisBookUpdateRecord>(None) {
154 Ok(data) => {
155 let delta = parse_delta_record(
156 &data,
157 self.price_precision,
158 self.size_precision,
159 self.instrument_id,
160 );
161
162 if self.last_ts_event != delta.ts_event
164 && let Some(last_delta) = self.buffer.last_mut()
165 {
166 last_delta.flags = RecordFlag::F_LAST.value();
167 }
168
169 assert!(
170 !(delta.action != BookAction::Delete && delta.order.size.is_zero()),
171 "Invalid delta: action {} when size zero, check size_precision ({}) vs data; {data:?}",
172 delta.action,
173 self.size_precision
174 );
175
176 self.last_ts_event = delta.ts_event;
177
178 self.buffer.push(delta);
179 records_read += 1;
180 self.records_processed += 1;
181
182 if let Some(limit) = self.limit
183 && self.records_processed >= limit
184 {
185 break;
186 }
187 }
188 Err(e) => {
189 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
190 }
191 }
192 }
193 Ok(false) => {
194 if self.buffer.is_empty() {
196 return None;
197 }
198 if let Some(last_delta) = self.buffer.last_mut() {
200 last_delta.flags = RecordFlag::F_LAST.value();
201 }
202 return Some(Ok(self.buffer.clone()));
203 }
204 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
205 }
206 }
207
208 if self.buffer.is_empty() {
209 None
210 } else {
211 Some(Ok(self.buffer.clone()))
212 }
213 }
214}
215
216pub fn stream_deltas<P: AsRef<Path>>(
231 filepath: P,
232 chunk_size: usize,
233 price_precision: Option<u8>,
234 size_precision: Option<u8>,
235 instrument_id: Option<InstrumentId>,
236 limit: Option<usize>,
237) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDelta>>>> {
238 DeltaStreamIterator::new(
239 filepath,
240 chunk_size,
241 price_precision,
242 size_precision,
243 instrument_id,
244 limit,
245 )
246}
247
248#[cfg(feature = "python")]
253struct BatchedDeltasStreamIterator {
255 reader: Reader<Box<dyn std::io::Read>>,
256 record: StringRecord,
257 buffer: Vec<PyObject>,
258 current_batch: Vec<OrderBookDelta>,
259 pending_batches: Vec<Vec<OrderBookDelta>>,
260 chunk_size: usize,
261 instrument_id: InstrumentId,
262 price_precision: u8,
263 size_precision: u8,
264 last_ts_event: UnixNanos,
265 limit: Option<usize>,
266 records_processed: usize,
267}
268
269#[cfg(feature = "python")]
270impl BatchedDeltasStreamIterator {
271 fn new<P: AsRef<Path>>(
277 filepath: P,
278 chunk_size: usize,
279 price_precision: Option<u8>,
280 size_precision: Option<u8>,
281 instrument_id: Option<InstrumentId>,
282 limit: Option<usize>,
283 ) -> anyhow::Result<Self> {
284 let mut reader = create_csv_reader(&filepath)?;
285 let mut record = StringRecord::new();
286
287 let first_record = if reader.read_record(&mut record)? {
289 record.deserialize::<TardisBookUpdateRecord>(None)?
290 } else {
291 anyhow::bail!("CSV file is empty");
292 };
293
294 let final_instrument_id = instrument_id
295 .unwrap_or_else(|| parse_instrument_id(&first_record.exchange, first_record.symbol));
296
297 let (final_price_precision, final_size_precision) =
298 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
299 (price_prec, size_prec)
301 } else {
302 let (detected_price, detected_size) =
304 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
305 (
306 price_precision.unwrap_or(detected_price),
307 size_precision.unwrap_or(detected_size),
308 )
309 };
310
311 let reader = create_csv_reader(filepath)?;
312
313 Ok(Self {
314 reader,
315 record: StringRecord::new(),
316 buffer: Vec::with_capacity(chunk_size),
317 current_batch: Vec::new(),
318 pending_batches: Vec::with_capacity(chunk_size),
319 chunk_size,
320 instrument_id: final_instrument_id,
321 price_precision: final_price_precision,
322 size_precision: final_size_precision,
323 last_ts_event: UnixNanos::default(),
324 limit,
325 records_processed: 0,
326 })
327 }
328
329 fn detect_precision_from_sample(
330 reader: &mut Reader<Box<dyn std::io::Read>>,
331 record: &mut StringRecord,
332 sample_size: usize,
333 ) -> anyhow::Result<(u8, u8)> {
334 let mut max_price_precision = 0u8;
335 let mut max_size_precision = 0u8;
336 let mut records_scanned = 0;
337
338 while records_scanned < sample_size {
339 match reader.read_record(record) {
340 Ok(true) => {
341 if let Ok(data) = record.deserialize::<TardisBookUpdateRecord>(None) {
342 max_price_precision = max_price_precision.max(infer_precision(data.price));
343 max_size_precision = max_size_precision.max(infer_precision(data.amount));
344 records_scanned += 1;
345 }
346 }
347 Ok(false) => break, Err(_) => records_scanned += 1, }
350 }
351
352 Ok((max_price_precision, max_size_precision))
353 }
354}
355
356#[cfg(feature = "python")]
357impl Iterator for BatchedDeltasStreamIterator {
358 type Item = anyhow::Result<Vec<PyObject>>;
359
360 fn next(&mut self) -> Option<Self::Item> {
361 if let Some(limit) = self.limit
362 && self.records_processed >= limit
363 {
364 return None;
365 }
366
367 self.buffer.clear();
368 let mut batches_created = 0;
369
370 while batches_created < self.chunk_size {
371 match self.reader.read_record(&mut self.record) {
372 Ok(true) => {
373 let delta = match self.record.deserialize::<TardisBookUpdateRecord>(None) {
374 Ok(data) => parse_delta_record(
375 &data,
376 self.price_precision,
377 self.size_precision,
378 Some(self.instrument_id),
379 ),
380 Err(e) => {
381 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
382 }
383 };
384
385 if self.last_ts_event != delta.ts_event && !self.current_batch.is_empty() {
386 if let Some(last_delta) = self.current_batch.last_mut() {
388 last_delta.flags = RecordFlag::F_LAST.value();
389 }
390 self.pending_batches
391 .push(std::mem::take(&mut self.current_batch));
392 batches_created += 1;
393 }
394
395 self.last_ts_event = delta.ts_event;
396 self.current_batch.push(delta);
397 self.records_processed += 1;
398
399 if let Some(limit) = self.limit
400 && self.records_processed >= limit
401 {
402 break;
403 }
404 }
405 Ok(false) => {
406 break;
408 }
409 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
410 }
411 }
412
413 if !self.current_batch.is_empty() && batches_created < self.chunk_size {
414 if let Some(last_delta) = self.current_batch.last_mut() {
416 last_delta.flags = RecordFlag::F_LAST.value();
417 }
418 self.pending_batches
419 .push(std::mem::take(&mut self.current_batch));
420 }
421
422 if self.pending_batches.is_empty() {
423 None
424 } else {
425 Python::with_gil(|py| {
427 for batch in self.pending_batches.drain(..) {
428 let deltas = OrderBookDeltas::new(self.instrument_id, batch);
429 let deltas = OrderBookDeltas_API::new(deltas);
430 let capsule = data_to_pycapsule(py, Data::Deltas(deltas));
431 self.buffer.push(capsule);
432 }
433 });
434 Some(Ok(std::mem::take(&mut self.buffer)))
435 }
436 }
437}
438
439#[cfg(feature = "python")]
440pub fn stream_batched_deltas<P: AsRef<Path>>(
447 filepath: P,
448 chunk_size: usize,
449 price_precision: Option<u8>,
450 size_precision: Option<u8>,
451 instrument_id: Option<InstrumentId>,
452 limit: Option<usize>,
453) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<PyObject>>>> {
454 BatchedDeltasStreamIterator::new(
455 filepath,
456 chunk_size,
457 price_precision,
458 size_precision,
459 instrument_id,
460 limit,
461 )
462}
463
464struct QuoteStreamIterator {
470 reader: Reader<Box<dyn Read>>,
471 record: StringRecord,
472 buffer: Vec<QuoteTick>,
473 chunk_size: usize,
474 instrument_id: Option<InstrumentId>,
475 price_precision: u8,
476 size_precision: u8,
477 limit: Option<usize>,
478 records_processed: usize,
479}
480
481impl QuoteStreamIterator {
482 pub fn new<P: AsRef<Path>>(
488 filepath: P,
489 chunk_size: usize,
490 price_precision: Option<u8>,
491 size_precision: Option<u8>,
492 instrument_id: Option<InstrumentId>,
493 limit: Option<usize>,
494 ) -> anyhow::Result<Self> {
495 let (final_price_precision, final_size_precision) =
496 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
497 (price_prec, size_prec)
499 } else {
500 let mut reader = create_csv_reader(&filepath)?;
502 let mut record = StringRecord::new();
503 let (detected_price, detected_size) =
504 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
505 (
506 price_precision.unwrap_or(detected_price),
507 size_precision.unwrap_or(detected_size),
508 )
509 };
510
511 let reader = create_csv_reader(filepath)?;
512
513 Ok(Self {
514 reader,
515 record: StringRecord::new(),
516 buffer: Vec::with_capacity(chunk_size),
517 chunk_size,
518 instrument_id,
519 price_precision: final_price_precision,
520 size_precision: final_size_precision,
521 limit,
522 records_processed: 0,
523 })
524 }
525
526 fn detect_precision_from_sample(
527 reader: &mut Reader<Box<dyn std::io::Read>>,
528 record: &mut StringRecord,
529 sample_size: usize,
530 ) -> anyhow::Result<(u8, u8)> {
531 let mut max_price_precision = 2u8;
532 let mut max_size_precision = 0u8;
533 let mut records_scanned = 0;
534
535 while records_scanned < sample_size {
536 match reader.read_record(record) {
537 Ok(true) => {
538 if let Ok(data) = record.deserialize::<TardisQuoteRecord>(None) {
539 if let Some(bid_price_val) = data.bid_price {
540 max_price_precision =
541 max_price_precision.max(infer_precision(bid_price_val));
542 }
543 if let Some(ask_price_val) = data.ask_price {
544 max_price_precision =
545 max_price_precision.max(infer_precision(ask_price_val));
546 }
547 if let Some(bid_amount_val) = data.bid_amount {
548 max_size_precision =
549 max_size_precision.max(infer_precision(bid_amount_val));
550 }
551 if let Some(ask_amount_val) = data.ask_amount {
552 max_size_precision =
553 max_size_precision.max(infer_precision(ask_amount_val));
554 }
555 records_scanned += 1;
556 }
557 }
558 Ok(false) => break, Err(_) => records_scanned += 1, }
561 }
562
563 Ok((max_price_precision, max_size_precision))
564 }
565}
566
567impl Iterator for QuoteStreamIterator {
568 type Item = anyhow::Result<Vec<QuoteTick>>;
569
570 fn next(&mut self) -> Option<Self::Item> {
571 if let Some(limit) = self.limit
572 && self.records_processed >= limit
573 {
574 return None;
575 }
576
577 self.buffer.clear();
578 let mut records_read = 0;
579
580 while records_read < self.chunk_size {
581 match self.reader.read_record(&mut self.record) {
582 Ok(true) => match self.record.deserialize::<TardisQuoteRecord>(None) {
583 Ok(data) => {
584 let quote = parse_quote_record(
585 &data,
586 self.price_precision,
587 self.size_precision,
588 self.instrument_id,
589 );
590
591 self.buffer.push(quote);
592 records_read += 1;
593 self.records_processed += 1;
594
595 if let Some(limit) = self.limit
596 && self.records_processed >= limit
597 {
598 break;
599 }
600 }
601 Err(e) => {
602 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
603 }
604 },
605 Ok(false) => {
606 if self.buffer.is_empty() {
607 return None;
608 }
609 return Some(Ok(self.buffer.clone()));
610 }
611 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
612 }
613 }
614
615 if self.buffer.is_empty() {
616 None
617 } else {
618 Some(Ok(self.buffer.clone()))
619 }
620 }
621}
622
623pub fn stream_quotes<P: AsRef<Path>>(
638 filepath: P,
639 chunk_size: usize,
640 price_precision: Option<u8>,
641 size_precision: Option<u8>,
642 instrument_id: Option<InstrumentId>,
643 limit: Option<usize>,
644) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<QuoteTick>>>> {
645 QuoteStreamIterator::new(
646 filepath,
647 chunk_size,
648 price_precision,
649 size_precision,
650 instrument_id,
651 limit,
652 )
653}
654
655struct TradeStreamIterator {
661 reader: Reader<Box<dyn Read>>,
662 record: StringRecord,
663 buffer: Vec<TradeTick>,
664 chunk_size: usize,
665 instrument_id: Option<InstrumentId>,
666 price_precision: u8,
667 size_precision: u8,
668 limit: Option<usize>,
669 records_processed: usize,
670}
671
672impl TradeStreamIterator {
673 pub fn new<P: AsRef<Path>>(
679 filepath: P,
680 chunk_size: usize,
681 price_precision: Option<u8>,
682 size_precision: Option<u8>,
683 instrument_id: Option<InstrumentId>,
684 limit: Option<usize>,
685 ) -> anyhow::Result<Self> {
686 let (final_price_precision, final_size_precision) =
687 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
688 (price_prec, size_prec)
690 } else {
691 let mut reader = create_csv_reader(&filepath)?;
693 let mut record = StringRecord::new();
694 let (detected_price, detected_size) =
695 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
696 (
697 price_precision.unwrap_or(detected_price),
698 size_precision.unwrap_or(detected_size),
699 )
700 };
701
702 let reader = create_csv_reader(filepath)?;
703
704 Ok(Self {
705 reader,
706 record: StringRecord::new(),
707 buffer: Vec::with_capacity(chunk_size),
708 chunk_size,
709 instrument_id,
710 price_precision: final_price_precision,
711 size_precision: final_size_precision,
712 limit,
713 records_processed: 0,
714 })
715 }
716
717 fn detect_precision_from_sample(
718 reader: &mut Reader<Box<dyn std::io::Read>>,
719 record: &mut StringRecord,
720 sample_size: usize,
721 ) -> anyhow::Result<(u8, u8)> {
722 let mut max_price_precision = 2u8;
723 let mut max_size_precision = 0u8;
724 let mut records_scanned = 0;
725
726 while records_scanned < sample_size {
727 match reader.read_record(record) {
728 Ok(true) => {
729 if let Ok(data) = record.deserialize::<TardisTradeRecord>(None) {
730 max_price_precision = max_price_precision.max(infer_precision(data.price));
731 max_size_precision = max_size_precision.max(infer_precision(data.amount));
732 records_scanned += 1;
733 }
734 }
735 Ok(false) => break, Err(_) => records_scanned += 1, }
738 }
739
740 Ok((max_price_precision, max_size_precision))
741 }
742}
743
744impl Iterator for TradeStreamIterator {
745 type Item = anyhow::Result<Vec<TradeTick>>;
746
747 fn next(&mut self) -> Option<Self::Item> {
748 if let Some(limit) = self.limit
749 && self.records_processed >= limit
750 {
751 return None;
752 }
753
754 self.buffer.clear();
755 let mut records_read = 0;
756
757 while records_read < self.chunk_size {
758 match self.reader.read_record(&mut self.record) {
759 Ok(true) => match self.record.deserialize::<TardisTradeRecord>(None) {
760 Ok(data) => {
761 let size = Quantity::new(data.amount, self.size_precision);
762
763 if size.is_positive() {
764 let trade = parse_trade_record(
765 &data,
766 size,
767 self.price_precision,
768 self.instrument_id,
769 );
770
771 self.buffer.push(trade);
772 records_read += 1;
773 self.records_processed += 1;
774
775 if let Some(limit) = self.limit
776 && self.records_processed >= limit
777 {
778 break;
779 }
780 } else {
781 log::warn!("Skipping zero-sized trade: {data:?}");
782 }
783 }
784 Err(e) => {
785 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
786 }
787 },
788 Ok(false) => {
789 if self.buffer.is_empty() {
790 return None;
791 }
792 return Some(Ok(self.buffer.clone()));
793 }
794 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
795 }
796 }
797
798 if self.buffer.is_empty() {
799 None
800 } else {
801 Some(Ok(self.buffer.clone()))
802 }
803 }
804}
805
806pub fn stream_trades<P: AsRef<Path>>(
821 filepath: P,
822 chunk_size: usize,
823 price_precision: Option<u8>,
824 size_precision: Option<u8>,
825 instrument_id: Option<InstrumentId>,
826 limit: Option<usize>,
827) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<TradeTick>>>> {
828 TradeStreamIterator::new(
829 filepath,
830 chunk_size,
831 price_precision,
832 size_precision,
833 instrument_id,
834 limit,
835 )
836}
837
838struct Depth10StreamIterator {
844 reader: Reader<Box<dyn Read>>,
845 record: StringRecord,
846 buffer: Vec<OrderBookDepth10>,
847 chunk_size: usize,
848 levels: u8,
849 instrument_id: Option<InstrumentId>,
850 price_precision: u8,
851 size_precision: u8,
852 limit: Option<usize>,
853 records_processed: usize,
854}
855
856impl Depth10StreamIterator {
857 pub fn new<P: AsRef<Path>>(
863 filepath: P,
864 chunk_size: usize,
865 levels: u8,
866 price_precision: Option<u8>,
867 size_precision: Option<u8>,
868 instrument_id: Option<InstrumentId>,
869 limit: Option<usize>,
870 ) -> anyhow::Result<Self> {
871 let (final_price_precision, final_size_precision) =
872 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
873 (price_prec, size_prec)
875 } else {
876 let mut reader = create_csv_reader(&filepath)?;
878 let mut record = StringRecord::new();
879 let (detected_price, detected_size) =
880 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
881 (
882 price_precision.unwrap_or(detected_price),
883 size_precision.unwrap_or(detected_size),
884 )
885 };
886
887 let reader = create_csv_reader(filepath)?;
888
889 Ok(Self {
890 reader,
891 record: StringRecord::new(),
892 buffer: Vec::with_capacity(chunk_size),
893 chunk_size,
894 levels,
895 instrument_id,
896 price_precision: final_price_precision,
897 size_precision: final_size_precision,
898 limit,
899 records_processed: 0,
900 })
901 }
902
903 fn process_snapshot5(&mut self, data: TardisOrderBookSnapshot5Record) -> OrderBookDepth10 {
904 let instrument_id = self
905 .instrument_id
906 .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
907
908 let mut bids = [NULL_ORDER; DEPTH10_LEN];
909 let mut asks = [NULL_ORDER; DEPTH10_LEN];
910 let mut bid_counts = [0_u32; DEPTH10_LEN];
911 let mut ask_counts = [0_u32; DEPTH10_LEN];
912
913 for i in 0..5 {
915 let (bid_price, bid_amount) = match i {
916 0 => (data.bids_0_price, data.bids_0_amount),
917 1 => (data.bids_1_price, data.bids_1_amount),
918 2 => (data.bids_2_price, data.bids_2_amount),
919 3 => (data.bids_3_price, data.bids_3_amount),
920 4 => (data.bids_4_price, data.bids_4_amount),
921 _ => unreachable!(),
922 };
923
924 let (ask_price, ask_amount) = match i {
925 0 => (data.asks_0_price, data.asks_0_amount),
926 1 => (data.asks_1_price, data.asks_1_amount),
927 2 => (data.asks_2_price, data.asks_2_amount),
928 3 => (data.asks_3_price, data.asks_3_amount),
929 4 => (data.asks_4_price, data.asks_4_amount),
930 _ => unreachable!(),
931 };
932
933 let (bid_order, bid_count) = create_book_order(
934 OrderSide::Buy,
935 bid_price,
936 bid_amount,
937 self.price_precision,
938 self.size_precision,
939 );
940 bids[i] = bid_order;
941 bid_counts[i] = bid_count;
942
943 let (ask_order, ask_count) = create_book_order(
944 OrderSide::Sell,
945 ask_price,
946 ask_amount,
947 self.price_precision,
948 self.size_precision,
949 );
950 asks[i] = ask_order;
951 ask_counts[i] = ask_count;
952 }
953
954 let flags = RecordFlag::F_SNAPSHOT.value();
955 let sequence = 0;
956 let ts_event = parse_timestamp(data.timestamp);
957 let ts_init = parse_timestamp(data.local_timestamp);
958
959 OrderBookDepth10::new(
960 instrument_id,
961 bids,
962 asks,
963 bid_counts,
964 ask_counts,
965 flags,
966 sequence,
967 ts_event,
968 ts_init,
969 )
970 }
971
972 fn process_snapshot25(&mut self, data: TardisOrderBookSnapshot25Record) -> OrderBookDepth10 {
973 let instrument_id = self
974 .instrument_id
975 .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
976
977 let mut bids = [NULL_ORDER; DEPTH10_LEN];
978 let mut asks = [NULL_ORDER; DEPTH10_LEN];
979 let mut bid_counts = [0_u32; DEPTH10_LEN];
980 let mut ask_counts = [0_u32; DEPTH10_LEN];
981
982 for i in 0..DEPTH10_LEN {
984 let (bid_price, bid_amount) = match i {
985 0 => (data.bids_0_price, data.bids_0_amount),
986 1 => (data.bids_1_price, data.bids_1_amount),
987 2 => (data.bids_2_price, data.bids_2_amount),
988 3 => (data.bids_3_price, data.bids_3_amount),
989 4 => (data.bids_4_price, data.bids_4_amount),
990 5 => (data.bids_5_price, data.bids_5_amount),
991 6 => (data.bids_6_price, data.bids_6_amount),
992 7 => (data.bids_7_price, data.bids_7_amount),
993 8 => (data.bids_8_price, data.bids_8_amount),
994 9 => (data.bids_9_price, data.bids_9_amount),
995 _ => unreachable!(),
996 };
997
998 let (ask_price, ask_amount) = match i {
999 0 => (data.asks_0_price, data.asks_0_amount),
1000 1 => (data.asks_1_price, data.asks_1_amount),
1001 2 => (data.asks_2_price, data.asks_2_amount),
1002 3 => (data.asks_3_price, data.asks_3_amount),
1003 4 => (data.asks_4_price, data.asks_4_amount),
1004 5 => (data.asks_5_price, data.asks_5_amount),
1005 6 => (data.asks_6_price, data.asks_6_amount),
1006 7 => (data.asks_7_price, data.asks_7_amount),
1007 8 => (data.asks_8_price, data.asks_8_amount),
1008 9 => (data.asks_9_price, data.asks_9_amount),
1009 _ => unreachable!(),
1010 };
1011
1012 let (bid_order, bid_count) = create_book_order(
1013 OrderSide::Buy,
1014 bid_price,
1015 bid_amount,
1016 self.price_precision,
1017 self.size_precision,
1018 );
1019 bids[i] = bid_order;
1020 bid_counts[i] = bid_count;
1021
1022 let (ask_order, ask_count) = create_book_order(
1023 OrderSide::Sell,
1024 ask_price,
1025 ask_amount,
1026 self.price_precision,
1027 self.size_precision,
1028 );
1029 asks[i] = ask_order;
1030 ask_counts[i] = ask_count;
1031 }
1032
1033 let flags = RecordFlag::F_SNAPSHOT.value();
1034 let sequence = 0;
1035 let ts_event = parse_timestamp(data.timestamp);
1036 let ts_init = parse_timestamp(data.local_timestamp);
1037
1038 OrderBookDepth10::new(
1039 instrument_id,
1040 bids,
1041 asks,
1042 bid_counts,
1043 ask_counts,
1044 flags,
1045 sequence,
1046 ts_event,
1047 ts_init,
1048 )
1049 }
1050
1051 fn detect_precision_from_sample(
1052 reader: &mut Reader<Box<dyn std::io::Read>>,
1053 record: &mut StringRecord,
1054 sample_size: usize,
1055 ) -> anyhow::Result<(u8, u8)> {
1056 let mut max_price_precision = 2u8;
1057 let mut max_size_precision = 0u8;
1058 let mut records_scanned = 0;
1059
1060 while records_scanned < sample_size {
1061 match reader.read_record(record) {
1062 Ok(true) => {
1063 if let Ok(data) = record.deserialize::<TardisOrderBookSnapshot5Record>(None) {
1065 if let Some(bid_price) = data.bids_0_price {
1066 max_price_precision =
1067 max_price_precision.max(infer_precision(bid_price));
1068 }
1069 if let Some(ask_price) = data.asks_0_price {
1070 max_price_precision =
1071 max_price_precision.max(infer_precision(ask_price));
1072 }
1073 if let Some(bid_amount) = data.bids_0_amount {
1074 max_size_precision =
1075 max_size_precision.max(infer_precision(bid_amount));
1076 }
1077 if let Some(ask_amount) = data.asks_0_amount {
1078 max_size_precision =
1079 max_size_precision.max(infer_precision(ask_amount));
1080 }
1081 records_scanned += 1;
1082 } else if let Ok(data) =
1083 record.deserialize::<TardisOrderBookSnapshot25Record>(None)
1084 {
1085 if let Some(bid_price) = data.bids_0_price {
1086 max_price_precision =
1087 max_price_precision.max(infer_precision(bid_price));
1088 }
1089 if let Some(ask_price) = data.asks_0_price {
1090 max_price_precision =
1091 max_price_precision.max(infer_precision(ask_price));
1092 }
1093 if let Some(bid_amount) = data.bids_0_amount {
1094 max_size_precision =
1095 max_size_precision.max(infer_precision(bid_amount));
1096 }
1097 if let Some(ask_amount) = data.asks_0_amount {
1098 max_size_precision =
1099 max_size_precision.max(infer_precision(ask_amount));
1100 }
1101 records_scanned += 1;
1102 }
1103 }
1104 Ok(false) => break, Err(_) => records_scanned += 1, }
1107 }
1108
1109 Ok((max_price_precision, max_size_precision))
1110 }
1111}
1112
1113impl Iterator for Depth10StreamIterator {
1114 type Item = anyhow::Result<Vec<OrderBookDepth10>>;
1115
1116 fn next(&mut self) -> Option<Self::Item> {
1117 if let Some(limit) = self.limit
1118 && self.records_processed >= limit
1119 {
1120 return None;
1121 }
1122
1123 if !self.buffer.is_empty() {
1124 let chunk = self.buffer.split_off(0);
1125 return Some(Ok(chunk));
1126 }
1127
1128 self.buffer.clear();
1129 let mut records_read = 0;
1130
1131 while records_read < self.chunk_size {
1132 match self.reader.read_record(&mut self.record) {
1133 Ok(true) => {
1134 let result = match self.levels {
1135 5 => self
1136 .record
1137 .deserialize::<TardisOrderBookSnapshot5Record>(None)
1138 .map(|data| self.process_snapshot5(data)),
1139 25 => self
1140 .record
1141 .deserialize::<TardisOrderBookSnapshot25Record>(None)
1142 .map(|data| self.process_snapshot25(data)),
1143 _ => return Some(Err(anyhow::anyhow!("Invalid levels: {}", self.levels))),
1144 };
1145
1146 match result {
1147 Ok(depth) => {
1148 self.buffer.push(depth);
1149 records_read += 1;
1150 self.records_processed += 1;
1151
1152 if let Some(limit) = self.limit
1153 && self.records_processed >= limit
1154 {
1155 break;
1156 }
1157 }
1158 Err(e) => {
1159 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
1160 }
1161 }
1162 }
1163 Ok(false) => {
1164 if self.buffer.is_empty() {
1165 return None;
1166 }
1167 let chunk = self.buffer.split_off(0);
1168 return Some(Ok(chunk));
1169 }
1170 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
1171 }
1172 }
1173
1174 if self.buffer.is_empty() {
1175 None
1176 } else {
1177 let chunk = self.buffer.split_off(0);
1178 Some(Ok(chunk))
1179 }
1180 }
1181}
1182
1183pub fn stream_depth10_from_snapshot5<P: AsRef<Path>>(
1198 filepath: P,
1199 chunk_size: usize,
1200 price_precision: Option<u8>,
1201 size_precision: Option<u8>,
1202 instrument_id: Option<InstrumentId>,
1203 limit: Option<usize>,
1204) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDepth10>>>> {
1205 Depth10StreamIterator::new(
1206 filepath,
1207 chunk_size,
1208 5,
1209 price_precision,
1210 size_precision,
1211 instrument_id,
1212 limit,
1213 )
1214}
1215
1216pub fn stream_depth10_from_snapshot25<P: AsRef<Path>>(
1231 filepath: P,
1232 chunk_size: usize,
1233 price_precision: Option<u8>,
1234 size_precision: Option<u8>,
1235 instrument_id: Option<InstrumentId>,
1236 limit: Option<usize>,
1237) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDepth10>>>> {
1238 Depth10StreamIterator::new(
1239 filepath,
1240 chunk_size,
1241 25,
1242 price_precision,
1243 size_precision,
1244 instrument_id,
1245 limit,
1246 )
1247}
1248
1249use nautilus_model::data::FundingRateUpdate;
1254
1255use crate::csv::record::TardisDerivativeTickerRecord;
1256
1257struct FundingRateStreamIterator {
1259 reader: Reader<Box<dyn Read>>,
1260 record: StringRecord,
1261 buffer: Vec<FundingRateUpdate>,
1262 chunk_size: usize,
1263 instrument_id: Option<InstrumentId>,
1264 limit: Option<usize>,
1265 records_processed: usize,
1266}
1267
1268impl FundingRateStreamIterator {
1269 fn new<P: AsRef<Path>>(
1275 filepath: P,
1276 chunk_size: usize,
1277 instrument_id: Option<InstrumentId>,
1278 limit: Option<usize>,
1279 ) -> anyhow::Result<Self> {
1280 let reader = create_csv_reader(filepath)?;
1281
1282 Ok(Self {
1283 reader,
1284 record: StringRecord::new(),
1285 buffer: Vec::with_capacity(chunk_size),
1286 chunk_size,
1287 instrument_id,
1288 limit,
1289 records_processed: 0,
1290 })
1291 }
1292}
1293
1294impl Iterator for FundingRateStreamIterator {
1295 type Item = anyhow::Result<Vec<FundingRateUpdate>>;
1296
1297 fn next(&mut self) -> Option<Self::Item> {
1298 if let Some(limit) = self.limit
1299 && self.records_processed >= limit
1300 {
1301 return None;
1302 }
1303
1304 if !self.buffer.is_empty() {
1305 let chunk = self.buffer.split_off(0);
1306 return Some(Ok(chunk));
1307 }
1308
1309 self.buffer.clear();
1310 let mut records_read = 0;
1311
1312 while records_read < self.chunk_size {
1313 match self.reader.read_record(&mut self.record) {
1314 Ok(true) => {
1315 let result = self
1316 .record
1317 .deserialize::<TardisDerivativeTickerRecord>(None)
1318 .map_err(anyhow::Error::from)
1319 .map(|data| parse_derivative_ticker_record(&data, self.instrument_id));
1320
1321 match result {
1322 Ok(Some(funding_rate)) => {
1323 self.buffer.push(funding_rate);
1324 records_read += 1;
1325 self.records_processed += 1;
1326
1327 if let Some(limit) = self.limit
1328 && self.records_processed >= limit
1329 {
1330 break;
1331 }
1332 }
1333 Ok(None) => {
1334 self.records_processed += 1;
1336 }
1337 Err(e) => {
1338 return Some(Err(anyhow::anyhow!(
1339 "Failed to parse funding rate record: {e}"
1340 )));
1341 }
1342 }
1343 }
1344 Ok(false) => {
1345 if self.buffer.is_empty() {
1346 return None;
1347 }
1348 let chunk = self.buffer.split_off(0);
1349 return Some(Ok(chunk));
1350 }
1351 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
1352 }
1353 }
1354
1355 if self.buffer.is_empty() {
1356 None
1357 } else {
1358 let chunk = self.buffer.split_off(0);
1359 Some(Ok(chunk))
1360 }
1361 }
1362}
1363
1364pub fn stream_funding_rates<P: AsRef<Path>>(
1374 filepath: P,
1375 chunk_size: usize,
1376 instrument_id: Option<InstrumentId>,
1377 limit: Option<usize>,
1378) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<FundingRateUpdate>>>> {
1379 FundingRateStreamIterator::new(filepath, chunk_size, instrument_id, limit)
1380}
1381
1382#[cfg(test)]
1386mod tests {
1387 use nautilus_model::{
1388 enums::AggressorSide,
1389 identifiers::TradeId,
1390 types::{Price, Quantity},
1391 };
1392 use rstest::*;
1393
1394 use super::*;
1395 use crate::{csv::load::load_deltas, parse::parse_price, tests::get_test_data_path};
1396
1397 #[rstest]
1398 #[case(0.0, 0)]
1399 #[case(42.0, 0)]
1400 #[case(0.1, 1)]
1401 #[case(0.25, 2)]
1402 #[case(123.0001, 4)]
1403 #[case(-42.987654321, 9)]
1404 #[case(1.234_567_890_123, 12)]
1405 fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
1406 assert_eq!(infer_precision(input), expected);
1407 }
1408
1409 #[rstest]
1410 pub fn test_stream_deltas_chunked() {
1411 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1412binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
1413binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
1414binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
1415binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
1416binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
1417
1418 let temp_file = std::env::temp_dir().join("test_stream_deltas.csv");
1419 std::fs::write(&temp_file, csv_data).unwrap();
1420
1421 let stream = stream_deltas(&temp_file, 2, Some(4), Some(1), None, None).unwrap();
1422 let chunks: Vec<_> = stream.collect();
1423
1424 assert_eq!(chunks.len(), 3);
1425
1426 let chunk1 = chunks[0].as_ref().unwrap();
1427 assert_eq!(chunk1.len(), 2);
1428 assert_eq!(chunk1[0].order.price.precision, 4);
1429 assert_eq!(chunk1[1].order.price.precision, 4);
1430
1431 let chunk2 = chunks[1].as_ref().unwrap();
1432 assert_eq!(chunk2.len(), 2);
1433 assert_eq!(chunk2[0].order.price.precision, 4);
1434 assert_eq!(chunk2[1].order.price.precision, 4);
1435
1436 let chunk3 = chunks[2].as_ref().unwrap();
1437 assert_eq!(chunk3.len(), 1);
1438 assert_eq!(chunk3[0].order.price.precision, 4);
1439
1440 let total_deltas: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1441 assert_eq!(total_deltas, 5);
1442
1443 std::fs::remove_file(&temp_file).ok();
1444 }
1445
1446 #[rstest]
1447 pub fn test_stream_quotes_chunked() {
1448 let csv_data =
1449 "exchange,symbol,timestamp,local_timestamp,ask_amount,ask_price,bid_price,bid_amount
1450binance,BTCUSDT,1640995200000000,1640995200100000,1.0,50000.0,49999.0,1.5
1451binance,BTCUSDT,1640995201000000,1640995201100000,2.0,50000.5,49999.5,2.5
1452binance,BTCUSDT,1640995202000000,1640995202100000,1.5,50000.12,49999.12,1.8
1453binance,BTCUSDT,1640995203000000,1640995203100000,3.0,50000.123,49999.123,3.2
1454binance,BTCUSDT,1640995204000000,1640995204100000,0.5,50000.1234,49999.1234,0.8";
1455
1456 let temp_file = std::env::temp_dir().join("test_stream_quotes.csv");
1457 std::fs::write(&temp_file, csv_data).unwrap();
1458
1459 let stream = stream_quotes(&temp_file, 2, Some(4), Some(1), None, None).unwrap();
1460 let chunks: Vec<_> = stream.collect();
1461
1462 assert_eq!(chunks.len(), 3);
1463
1464 let chunk1 = chunks[0].as_ref().unwrap();
1465 assert_eq!(chunk1.len(), 2);
1466 assert_eq!(chunk1[0].bid_price.precision, 4);
1467 assert_eq!(chunk1[1].bid_price.precision, 4);
1468
1469 let chunk2 = chunks[1].as_ref().unwrap();
1470 assert_eq!(chunk2.len(), 2);
1471 assert_eq!(chunk2[0].bid_price.precision, 4);
1472 assert_eq!(chunk2[1].bid_price.precision, 4);
1473
1474 let chunk3 = chunks[2].as_ref().unwrap();
1475 assert_eq!(chunk3.len(), 1);
1476 assert_eq!(chunk3[0].bid_price.precision, 4);
1477
1478 let total_quotes: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1479 assert_eq!(total_quotes, 5);
1480
1481 std::fs::remove_file(&temp_file).ok();
1482 }
1483
1484 #[rstest]
1485 pub fn test_stream_trades_chunked() {
1486 let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1487binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1488binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,2.0
1489binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1490binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0
1491binance,BTCUSDT,1640995204000000,1640995204100000,trade5,buy,50000.1234,0.5";
1492
1493 let temp_file = std::env::temp_dir().join("test_stream_trades.csv");
1494 std::fs::write(&temp_file, csv_data).unwrap();
1495
1496 let stream = stream_trades(&temp_file, 3, Some(4), Some(1), None, None).unwrap();
1497 let chunks: Vec<_> = stream.collect();
1498
1499 assert_eq!(chunks.len(), 2);
1500
1501 let chunk1 = chunks[0].as_ref().unwrap();
1502 assert_eq!(chunk1.len(), 3);
1503 assert_eq!(chunk1[0].price.precision, 4);
1504 assert_eq!(chunk1[1].price.precision, 4);
1505 assert_eq!(chunk1[2].price.precision, 4);
1506
1507 let chunk2 = chunks[1].as_ref().unwrap();
1508 assert_eq!(chunk2.len(), 2);
1509 assert_eq!(chunk2[0].price.precision, 4);
1510 assert_eq!(chunk2[1].price.precision, 4);
1511
1512 assert_eq!(chunk1[0].aggressor_side, AggressorSide::Buyer);
1513 assert_eq!(chunk1[1].aggressor_side, AggressorSide::Seller);
1514
1515 let total_trades: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1516 assert_eq!(total_trades, 5);
1517
1518 std::fs::remove_file(&temp_file).ok();
1519 }
1520
1521 #[rstest]
1522 pub fn test_stream_trades_with_zero_sized_trade() {
1523 let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1525binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1526binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,0.0
1527binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1528binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0";
1529
1530 let temp_file = std::env::temp_dir().join("test_stream_trades_zero_size.csv");
1531 std::fs::write(&temp_file, csv_data).unwrap();
1532
1533 let stream = stream_trades(&temp_file, 3, Some(4), Some(1), None, None).unwrap();
1534 let chunks: Vec<_> = stream.collect();
1535
1536 assert_eq!(chunks.len(), 1);
1538
1539 let chunk1 = chunks[0].as_ref().unwrap();
1540 assert_eq!(chunk1.len(), 3);
1541
1542 assert_eq!(chunk1[0].size, Quantity::from("1.0"));
1544 assert_eq!(chunk1[1].size, Quantity::from("1.5"));
1545 assert_eq!(chunk1[2].size, Quantity::from("3.0"));
1546
1547 assert_eq!(chunk1[0].trade_id, TradeId::new("trade1"));
1549 assert_eq!(chunk1[1].trade_id, TradeId::new("trade3"));
1550 assert_eq!(chunk1[2].trade_id, TradeId::new("trade4"));
1551
1552 std::fs::remove_file(&temp_file).ok();
1553 }
1554
1555 #[rstest]
1556 pub fn test_stream_depth10_from_snapshot5_chunked() {
1557 let csv_data = "exchange,symbol,timestamp,local_timestamp,asks[0].price,asks[0].amount,bids[0].price,bids[0].amount,asks[1].price,asks[1].amount,bids[1].price,bids[1].amount,asks[2].price,asks[2].amount,bids[2].price,bids[2].amount,asks[3].price,asks[3].amount,bids[3].price,bids[3].amount,asks[4].price,asks[4].amount,bids[4].price,bids[4].amount
1558binance,BTCUSDT,1640995200000000,1640995200100000,50001.0,1.0,49999.0,1.5,50002.0,2.0,49998.0,2.5,50003.0,3.0,49997.0,3.5,50004.0,4.0,49996.0,4.5,50005.0,5.0,49995.0,5.5
1559binance,BTCUSDT,1640995201000000,1640995201100000,50001.5,1.1,49999.5,1.6,50002.5,2.1,49998.5,2.6,50003.5,3.1,49997.5,3.6,50004.5,4.1,49996.5,4.6,50005.5,5.1,49995.5,5.6
1560binance,BTCUSDT,1640995202000000,1640995202100000,50001.12,1.12,49999.12,1.62,50002.12,2.12,49998.12,2.62,50003.12,3.12,49997.12,3.62,50004.12,4.12,49996.12,4.62,50005.12,5.12,49995.12,5.62";
1561
1562 let temp_file = std::env::temp_dir().join("test_stream_depth10_snapshot5.csv");
1564 std::fs::write(&temp_file, csv_data).unwrap();
1565
1566 let stream = stream_depth10_from_snapshot5(&temp_file, 2, None, None, None, None).unwrap();
1568 let chunks: Vec<_> = stream.collect();
1569
1570 assert_eq!(chunks.len(), 2);
1572
1573 let chunk1 = chunks[0].as_ref().unwrap();
1575 assert_eq!(chunk1.len(), 2);
1576
1577 let chunk2 = chunks[1].as_ref().unwrap();
1579 assert_eq!(chunk2.len(), 1);
1580
1581 let first_depth = &chunk1[0];
1583 assert_eq!(first_depth.bids.len(), 10); assert_eq!(first_depth.asks.len(), 10);
1585
1586 assert_eq!(first_depth.bids[0].price, parse_price(49999.0, 1));
1588 assert_eq!(first_depth.asks[0].price, parse_price(50001.0, 1));
1589
1590 let total_depths: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1592 assert_eq!(total_depths, 3);
1593
1594 std::fs::remove_file(&temp_file).ok();
1596 }
1597
1598 #[rstest]
1599 pub fn test_stream_depth10_from_snapshot25_chunked() {
1600 let mut header_parts = vec!["exchange", "symbol", "timestamp", "local_timestamp"];
1602
1603 let mut bid_headers = Vec::new();
1605 let mut ask_headers = Vec::new();
1606 for i in 0..25 {
1607 bid_headers.push(format!("bids[{i}].price"));
1608 bid_headers.push(format!("bids[{i}].amount"));
1609 }
1610 for i in 0..25 {
1611 ask_headers.push(format!("asks[{i}].price"));
1612 ask_headers.push(format!("asks[{i}].amount"));
1613 }
1614
1615 for header in &bid_headers {
1616 header_parts.push(header);
1617 }
1618 for header in &ask_headers {
1619 header_parts.push(header);
1620 }
1621
1622 let header = header_parts.join(",");
1623
1624 let mut row1_parts = vec![
1626 "binance".to_string(),
1627 "BTCUSDT".to_string(),
1628 "1640995200000000".to_string(),
1629 "1640995200100000".to_string(),
1630 ];
1631
1632 for i in 0..25 {
1634 if i < 5 {
1635 let bid_price = f64::from(i).mul_add(-0.01, 49999.0);
1636 let bid_amount = 1.0 + f64::from(i);
1637 row1_parts.push(bid_price.to_string());
1638 row1_parts.push(bid_amount.to_string());
1639 } else {
1640 row1_parts.push(String::new());
1641 row1_parts.push(String::new());
1642 }
1643 }
1644
1645 for i in 0..25 {
1647 if i < 5 {
1648 let ask_price = f64::from(i).mul_add(0.01, 50000.0);
1649 let ask_amount = 1.0 + f64::from(i);
1650 row1_parts.push(ask_price.to_string());
1651 row1_parts.push(ask_amount.to_string());
1652 } else {
1653 row1_parts.push(String::new());
1654 row1_parts.push(String::new());
1655 }
1656 }
1657
1658 let csv_data = format!("{}\n{}", header, row1_parts.join(","));
1659
1660 let temp_file = std::env::temp_dir().join("test_stream_depth10_snapshot25.csv");
1662 std::fs::write(&temp_file, &csv_data).unwrap();
1663
1664 let stream = stream_depth10_from_snapshot25(&temp_file, 1, None, None, None, None).unwrap();
1666 let chunks: Vec<_> = stream.collect();
1667
1668 assert_eq!(chunks.len(), 1);
1670
1671 let chunk1 = chunks[0].as_ref().unwrap();
1672 assert_eq!(chunk1.len(), 1);
1673
1674 let depth = &chunk1[0];
1676 assert_eq!(depth.bids.len(), 10); assert_eq!(depth.asks.len(), 10);
1678
1679 let actual_bid_price = depth.bids[0].price;
1681 let actual_ask_price = depth.asks[0].price;
1682 assert!(actual_bid_price.as_f64() > 0.0);
1683 assert!(actual_ask_price.as_f64() > 0.0);
1684
1685 std::fs::remove_file(&temp_file).ok();
1687 }
1688
1689 #[rstest]
1690 pub fn test_stream_error_handling() {
1691 let non_existent = std::path::Path::new("does_not_exist.csv");
1693
1694 let result = stream_deltas(non_existent, 10, None, None, None, None);
1695 assert!(result.is_err());
1696
1697 let result = stream_quotes(non_existent, 10, None, None, None, None);
1698 assert!(result.is_err());
1699
1700 let result = stream_trades(non_existent, 10, None, None, None, None);
1701 assert!(result.is_err());
1702
1703 let result = stream_depth10_from_snapshot5(non_existent, 10, None, None, None, None);
1704 assert!(result.is_err());
1705
1706 let result = stream_depth10_from_snapshot25(non_existent, 10, None, None, None, None);
1707 assert!(result.is_err());
1708 }
1709
1710 #[rstest]
1711 pub fn test_stream_empty_file() {
1712 let temp_file = std::env::temp_dir().join("test_empty.csv");
1714 std::fs::write(&temp_file, "").unwrap();
1715
1716 let stream = stream_deltas(&temp_file, 10, None, None, None, None).unwrap();
1717 let chunks: Vec<_> = stream.collect();
1718 assert_eq!(chunks.len(), 0);
1719
1720 std::fs::remove_file(&temp_file).ok();
1722 }
1723
1724 #[rstest]
1725 pub fn test_stream_precision_consistency() {
1726 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1728binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
1729binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
1730binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
1731binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0";
1732
1733 let temp_file = std::env::temp_dir().join("test_precision_consistency.csv");
1734 std::fs::write(&temp_file, csv_data).unwrap();
1735
1736 let bulk_deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
1738
1739 let stream = stream_deltas(&temp_file, 2, None, None, None, None).unwrap();
1741 let chunks: Vec<_> = stream.collect();
1742 let streamed_deltas: Vec<_> = chunks
1743 .into_iter()
1744 .flat_map(|chunk| chunk.unwrap())
1745 .collect();
1746
1747 assert_eq!(bulk_deltas.len(), streamed_deltas.len());
1749
1750 for (bulk, streamed) in bulk_deltas.iter().zip(streamed_deltas.iter()) {
1752 assert_eq!(bulk.instrument_id, streamed.instrument_id);
1753 assert_eq!(bulk.action, streamed.action);
1754 assert_eq!(bulk.order.side, streamed.order.side);
1755 assert_eq!(bulk.ts_event, streamed.ts_event);
1756 assert_eq!(bulk.ts_init, streamed.ts_init);
1757 }
1759
1760 std::fs::remove_file(&temp_file).ok();
1762 }
1763
1764 #[rstest]
1765 pub fn test_stream_trades_from_local_file() {
1766 let filepath = get_test_data_path("csv/trades_1.csv");
1767 let mut stream = stream_trades(filepath, 1, Some(1), Some(0), None, None).unwrap();
1768
1769 let chunk1 = stream.next().unwrap().unwrap();
1770 assert_eq!(chunk1.len(), 1);
1771 assert_eq!(chunk1[0].price, Price::from("8531.5"));
1772
1773 let chunk2 = stream.next().unwrap().unwrap();
1774 assert_eq!(chunk2.len(), 1);
1775 assert_eq!(chunk2[0].size, Quantity::from("1000"));
1776
1777 assert!(stream.next().is_none());
1778 }
1779
1780 #[rstest]
1781 pub fn test_stream_deltas_from_local_file() {
1782 let filepath = get_test_data_path("csv/deltas_1.csv");
1783 let mut stream = stream_deltas(filepath, 1, Some(1), Some(0), None, None).unwrap();
1784
1785 let chunk1 = stream.next().unwrap().unwrap();
1786 assert_eq!(chunk1.len(), 1);
1787 assert_eq!(chunk1[0].order.price, Price::from("6421.5"));
1788
1789 let chunk2 = stream.next().unwrap().unwrap();
1790 assert_eq!(chunk2.len(), 1);
1791 assert_eq!(chunk2[0].order.size, Quantity::from("10000"));
1792
1793 assert!(stream.next().is_none());
1794 }
1795
1796 #[rstest]
1797 pub fn test_stream_deltas_with_limit() {
1798 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1799binance,BTCUSDT,1640995200000000,1640995200100000,false,bid,50000.0,1.0
1800binance,BTCUSDT,1640995201000000,1640995201100000,false,ask,50001.0,2.0
1801binance,BTCUSDT,1640995202000000,1640995202100000,false,bid,49999.0,1.5
1802binance,BTCUSDT,1640995203000000,1640995203100000,false,ask,50002.0,3.0
1803binance,BTCUSDT,1640995204000000,1640995204100000,false,bid,49998.0,0.5";
1804
1805 let temp_file = std::env::temp_dir().join("test_stream_deltas_limit.csv");
1806 std::fs::write(&temp_file, csv_data).unwrap();
1807
1808 let stream = stream_deltas(&temp_file, 2, Some(4), Some(1), None, Some(3)).unwrap();
1810 let chunks: Vec<_> = stream.collect();
1811
1812 assert_eq!(chunks.len(), 2);
1814 let chunk1 = chunks[0].as_ref().unwrap();
1815 assert_eq!(chunk1.len(), 2);
1816 let chunk2 = chunks[1].as_ref().unwrap();
1817 assert_eq!(chunk2.len(), 1);
1818
1819 let total_deltas: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1821 assert_eq!(total_deltas, 3);
1822
1823 std::fs::remove_file(&temp_file).ok();
1824 }
1825
1826 #[rstest]
1827 pub fn test_stream_quotes_with_limit() {
1828 let csv_data =
1829 "exchange,symbol,timestamp,local_timestamp,ask_price,ask_amount,bid_price,bid_amount
1830binance,BTCUSDT,1640995200000000,1640995200100000,50001.0,1.0,50000.0,1.5
1831binance,BTCUSDT,1640995201000000,1640995201100000,50002.0,2.0,49999.0,2.5
1832binance,BTCUSDT,1640995202000000,1640995202100000,50003.0,1.5,49998.0,3.0
1833binance,BTCUSDT,1640995203000000,1640995203100000,50004.0,3.0,49997.0,3.5";
1834
1835 let temp_file = std::env::temp_dir().join("test_stream_quotes_limit.csv");
1836 std::fs::write(&temp_file, csv_data).unwrap();
1837
1838 let stream = stream_quotes(&temp_file, 2, Some(4), Some(1), None, Some(2)).unwrap();
1840 let chunks: Vec<_> = stream.collect();
1841
1842 assert_eq!(chunks.len(), 1);
1844 let chunk1 = chunks[0].as_ref().unwrap();
1845 assert_eq!(chunk1.len(), 2);
1846
1847 let total_quotes: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1849 assert_eq!(total_quotes, 2);
1850
1851 std::fs::remove_file(&temp_file).ok();
1852 }
1853
1854 #[rstest]
1855 pub fn test_stream_trades_with_limit() {
1856 let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1857binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1858binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,2.0
1859binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1860binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0
1861binance,BTCUSDT,1640995204000000,1640995204100000,trade5,buy,50000.1234,0.5";
1862
1863 let temp_file = std::env::temp_dir().join("test_stream_trades_limit.csv");
1864 std::fs::write(&temp_file, csv_data).unwrap();
1865
1866 let stream = stream_trades(&temp_file, 2, Some(4), Some(1), None, Some(3)).unwrap();
1868 let chunks: Vec<_> = stream.collect();
1869
1870 assert_eq!(chunks.len(), 2);
1872 let chunk1 = chunks[0].as_ref().unwrap();
1873 assert_eq!(chunk1.len(), 2);
1874 let chunk2 = chunks[1].as_ref().unwrap();
1875 assert_eq!(chunk2.len(), 1);
1876
1877 let total_trades: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1879 assert_eq!(total_trades, 3);
1880
1881 std::fs::remove_file(&temp_file).ok();
1882 }
1883}