nautilus_tardis/csv/
stream.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{io::Read, path::Path};
17
18use csv::{Reader, StringRecord};
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21    data::{DEPTH10_LEN, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
22    enums::{BookAction, OrderSide, RecordFlag},
23    identifiers::InstrumentId,
24    types::Quantity,
25};
26#[cfg(feature = "python")]
27use nautilus_model::{
28    data::{Data, OrderBookDeltas, OrderBookDeltas_API},
29    python::data::data_to_pycapsule,
30};
31#[cfg(feature = "python")]
32use pyo3::{PyObject, Python};
33
34use crate::{
35    csv::{
36        create_book_order, create_csv_reader, infer_precision, parse_delta_record,
37        parse_derivative_ticker_record, parse_quote_record, parse_trade_record,
38        record::{
39            TardisBookUpdateRecord, TardisOrderBookSnapshot5Record,
40            TardisOrderBookSnapshot25Record, TardisQuoteRecord, TardisTradeRecord,
41        },
42    },
43    parse::{parse_instrument_id, parse_timestamp},
44};
45
46////////////////////////////////////////////////////////////////////////////////
47// OrderBookDelta Streaming
48////////////////////////////////////////////////////////////////////////////////
49
50/// Streaming iterator over CSV records that yields chunks of parsed data.
51struct DeltaStreamIterator {
52    reader: Reader<Box<dyn std::io::Read>>,
53    record: StringRecord,
54    buffer: Vec<OrderBookDelta>,
55    chunk_size: usize,
56    instrument_id: Option<InstrumentId>,
57    price_precision: u8,
58    size_precision: u8,
59    last_ts_event: UnixNanos,
60    limit: Option<usize>,
61    records_processed: usize,
62}
63
64impl DeltaStreamIterator {
65    /// Creates a new [`DeltaStreamIterator`].
66    ///
67    /// # Errors
68    ///
69    /// Returns an error if the file cannot be opened or read.
70    fn new<P: AsRef<Path>>(
71        filepath: P,
72        chunk_size: usize,
73        price_precision: Option<u8>,
74        size_precision: Option<u8>,
75        instrument_id: Option<InstrumentId>,
76        limit: Option<usize>,
77    ) -> anyhow::Result<Self> {
78        let (final_price_precision, final_size_precision) =
79            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
80                // Both precisions provided, use them directly
81                (price_prec, size_prec)
82            } else {
83                // One or both precisions missing, detect only the missing ones
84                let mut reader = create_csv_reader(&filepath)?;
85                let mut record = StringRecord::new();
86                let (detected_price, detected_size) =
87                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
88                (
89                    price_precision.unwrap_or(detected_price),
90                    size_precision.unwrap_or(detected_size),
91                )
92            };
93
94        let reader = create_csv_reader(filepath)?;
95
96        Ok(Self {
97            reader,
98            record: StringRecord::new(),
99            buffer: Vec::with_capacity(chunk_size),
100            chunk_size,
101            instrument_id,
102            price_precision: final_price_precision,
103            size_precision: final_size_precision,
104            last_ts_event: UnixNanos::default(),
105            limit,
106            records_processed: 0,
107        })
108    }
109
110    fn detect_precision_from_sample(
111        reader: &mut Reader<Box<dyn std::io::Read>>,
112        record: &mut StringRecord,
113        sample_size: usize,
114    ) -> anyhow::Result<(u8, u8)> {
115        let mut max_price_precision = 0u8;
116        let mut max_size_precision = 0u8;
117        let mut records_scanned = 0;
118
119        while records_scanned < sample_size {
120            match reader.read_record(record) {
121                Ok(true) => {
122                    if let Ok(data) = record.deserialize::<TardisBookUpdateRecord>(None) {
123                        max_price_precision = max_price_precision.max(infer_precision(data.price));
124                        max_size_precision = max_size_precision.max(infer_precision(data.amount));
125                        records_scanned += 1;
126                    }
127                }
128                Ok(false) => break,             // End of file
129                Err(_) => records_scanned += 1, // Skip malformed records
130            }
131        }
132
133        Ok((max_price_precision, max_size_precision))
134    }
135}
136
137impl Iterator for DeltaStreamIterator {
138    type Item = anyhow::Result<Vec<OrderBookDelta>>;
139
140    fn next(&mut self) -> Option<Self::Item> {
141        if let Some(limit) = self.limit
142            && self.records_processed >= limit
143        {
144            return None;
145        }
146
147        self.buffer.clear();
148        let mut records_read = 0;
149
150        while records_read < self.chunk_size {
151            match self.reader.read_record(&mut self.record) {
152                Ok(true) => {
153                    match self.record.deserialize::<TardisBookUpdateRecord>(None) {
154                        Ok(data) => {
155                            let delta = parse_delta_record(
156                                &data,
157                                self.price_precision,
158                                self.size_precision,
159                                self.instrument_id,
160                            );
161
162                            // Check if timestamp is different from last timestamp
163                            if self.last_ts_event != delta.ts_event
164                                && let Some(last_delta) = self.buffer.last_mut()
165                            {
166                                last_delta.flags = RecordFlag::F_LAST.value();
167                            }
168
169                            assert!(
170                                !(delta.action != BookAction::Delete && delta.order.size.is_zero()),
171                                "Invalid delta: action {} when size zero, check size_precision ({}) vs data; {data:?}",
172                                delta.action,
173                                self.size_precision
174                            );
175
176                            self.last_ts_event = delta.ts_event;
177
178                            self.buffer.push(delta);
179                            records_read += 1;
180                            self.records_processed += 1;
181
182                            if let Some(limit) = self.limit
183                                && self.records_processed >= limit
184                            {
185                                break;
186                            }
187                        }
188                        Err(e) => {
189                            return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
190                        }
191                    }
192                }
193                Ok(false) => {
194                    // End of file reached
195                    if self.buffer.is_empty() {
196                        return None;
197                    }
198                    // Set F_LAST flag for final delta in chunk
199                    if let Some(last_delta) = self.buffer.last_mut() {
200                        last_delta.flags = RecordFlag::F_LAST.value();
201                    }
202                    return Some(Ok(self.buffer.clone()));
203                }
204                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
205            }
206        }
207
208        if self.buffer.is_empty() {
209            None
210        } else {
211            Some(Ok(self.buffer.clone()))
212        }
213    }
214}
215
216/// Streams [`OrderBookDelta`]s from a Tardis format CSV at the given `filepath`,
217/// yielding chunks of the specified size.
218///
219/// # Precision Inference Warning
220///
221/// When using streaming with precision inference (not providing explicit precisions),
222/// the inferred precision may differ from bulk loading the entire file. This is because
223/// precision inference works within chunk boundaries, and different chunks may contain
224/// values with different precision requirements. For deterministic precision behavior,
225/// provide explicit `price_precision` and `size_precision` parameters.
226///
227/// # Errors
228///
229/// Returns an error if the file cannot be opened, read, or parsed as CSV.
230pub fn stream_deltas<P: AsRef<Path>>(
231    filepath: P,
232    chunk_size: usize,
233    price_precision: Option<u8>,
234    size_precision: Option<u8>,
235    instrument_id: Option<InstrumentId>,
236    limit: Option<usize>,
237) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDelta>>>> {
238    DeltaStreamIterator::new(
239        filepath,
240        chunk_size,
241        price_precision,
242        size_precision,
243        instrument_id,
244        limit,
245    )
246}
247
248////////////////////////////////////////////////////////////////////////////////
249// Vec<PyObject> (OrderBookDeltas as PyCapsule) Streaming
250////////////////////////////////////////////////////////////////////////////////
251
252#[cfg(feature = "python")]
253/// Streaming iterator over CSV records that yields chunks of parsed data.
254struct BatchedDeltasStreamIterator {
255    reader: Reader<Box<dyn std::io::Read>>,
256    record: StringRecord,
257    buffer: Vec<PyObject>,
258    current_batch: Vec<OrderBookDelta>,
259    pending_batches: Vec<Vec<OrderBookDelta>>,
260    chunk_size: usize,
261    instrument_id: InstrumentId,
262    price_precision: u8,
263    size_precision: u8,
264    last_ts_event: UnixNanos,
265    limit: Option<usize>,
266    records_processed: usize,
267}
268
269#[cfg(feature = "python")]
270impl BatchedDeltasStreamIterator {
271    /// Creates a new [`DeltaStreamIterator`].
272    ///
273    /// # Errors
274    ///
275    /// Returns an error if the file cannot be opened or read.
276    fn new<P: AsRef<Path>>(
277        filepath: P,
278        chunk_size: usize,
279        price_precision: Option<u8>,
280        size_precision: Option<u8>,
281        instrument_id: Option<InstrumentId>,
282        limit: Option<usize>,
283    ) -> anyhow::Result<Self> {
284        let mut reader = create_csv_reader(&filepath)?;
285        let mut record = StringRecord::new();
286
287        // Read the first record to get instrument_id
288        let first_record = if reader.read_record(&mut record)? {
289            record.deserialize::<TardisBookUpdateRecord>(None)?
290        } else {
291            anyhow::bail!("CSV file is empty");
292        };
293
294        let final_instrument_id = instrument_id
295            .unwrap_or_else(|| parse_instrument_id(&first_record.exchange, first_record.symbol));
296
297        let (final_price_precision, final_size_precision) =
298            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
299                // Both precisions provided, use them directly
300                (price_prec, size_prec)
301            } else {
302                // One or both precisions missing, detect from sample including first record
303                let (detected_price, detected_size) =
304                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
305                (
306                    price_precision.unwrap_or(detected_price),
307                    size_precision.unwrap_or(detected_size),
308                )
309            };
310
311        let reader = create_csv_reader(filepath)?;
312
313        Ok(Self {
314            reader,
315            record: StringRecord::new(),
316            buffer: Vec::with_capacity(chunk_size),
317            current_batch: Vec::new(),
318            pending_batches: Vec::with_capacity(chunk_size),
319            chunk_size,
320            instrument_id: final_instrument_id,
321            price_precision: final_price_precision,
322            size_precision: final_size_precision,
323            last_ts_event: UnixNanos::default(),
324            limit,
325            records_processed: 0,
326        })
327    }
328
329    fn detect_precision_from_sample(
330        reader: &mut Reader<Box<dyn std::io::Read>>,
331        record: &mut StringRecord,
332        sample_size: usize,
333    ) -> anyhow::Result<(u8, u8)> {
334        let mut max_price_precision = 0u8;
335        let mut max_size_precision = 0u8;
336        let mut records_scanned = 0;
337
338        while records_scanned < sample_size {
339            match reader.read_record(record) {
340                Ok(true) => {
341                    if let Ok(data) = record.deserialize::<TardisBookUpdateRecord>(None) {
342                        max_price_precision = max_price_precision.max(infer_precision(data.price));
343                        max_size_precision = max_size_precision.max(infer_precision(data.amount));
344                        records_scanned += 1;
345                    }
346                }
347                Ok(false) => break,             // End of file
348                Err(_) => records_scanned += 1, // Skip malformed records
349            }
350        }
351
352        Ok((max_price_precision, max_size_precision))
353    }
354}
355
356#[cfg(feature = "python")]
357impl Iterator for BatchedDeltasStreamIterator {
358    type Item = anyhow::Result<Vec<PyObject>>;
359
360    fn next(&mut self) -> Option<Self::Item> {
361        if let Some(limit) = self.limit
362            && self.records_processed >= limit
363        {
364            return None;
365        }
366
367        self.buffer.clear();
368        let mut batches_created = 0;
369
370        while batches_created < self.chunk_size {
371            match self.reader.read_record(&mut self.record) {
372                Ok(true) => {
373                    let delta = match self.record.deserialize::<TardisBookUpdateRecord>(None) {
374                        Ok(data) => parse_delta_record(
375                            &data,
376                            self.price_precision,
377                            self.size_precision,
378                            Some(self.instrument_id),
379                        ),
380                        Err(e) => {
381                            return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
382                        }
383                    };
384
385                    if self.last_ts_event != delta.ts_event && !self.current_batch.is_empty() {
386                        // Set F_LAST on the last delta of the completed batch
387                        if let Some(last_delta) = self.current_batch.last_mut() {
388                            last_delta.flags = RecordFlag::F_LAST.value();
389                        }
390                        self.pending_batches
391                            .push(std::mem::take(&mut self.current_batch));
392                        batches_created += 1;
393                    }
394
395                    self.last_ts_event = delta.ts_event;
396                    self.current_batch.push(delta);
397                    self.records_processed += 1;
398
399                    if let Some(limit) = self.limit
400                        && self.records_processed >= limit
401                    {
402                        break;
403                    }
404                }
405                Ok(false) => {
406                    // End of file
407                    break;
408                }
409                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
410            }
411        }
412
413        if !self.current_batch.is_empty() && batches_created < self.chunk_size {
414            // Ensure the last delta of the last batch has F_LAST set
415            if let Some(last_delta) = self.current_batch.last_mut() {
416                last_delta.flags = RecordFlag::F_LAST.value();
417            }
418            self.pending_batches
419                .push(std::mem::take(&mut self.current_batch));
420        }
421
422        if self.pending_batches.is_empty() {
423            None
424        } else {
425            // Create all capsules in a single GIL acquisition
426            Python::with_gil(|py| {
427                for batch in self.pending_batches.drain(..) {
428                    let deltas = OrderBookDeltas::new(self.instrument_id, batch);
429                    let deltas = OrderBookDeltas_API::new(deltas);
430                    let capsule = data_to_pycapsule(py, Data::Deltas(deltas));
431                    self.buffer.push(capsule);
432                }
433            });
434            Some(Ok(std::mem::take(&mut self.buffer)))
435        }
436    }
437}
438
439#[cfg(feature = "python")]
440/// Streams [`Vec<PyObject>`]s (`PyCapsule`) from a Tardis format CSV at the given `filepath`,
441/// yielding chunks of the specified size.
442///
443/// # Errors
444///
445/// Returns an error if the file cannot be opened, read, or parsed as CSV.
446pub fn stream_batched_deltas<P: AsRef<Path>>(
447    filepath: P,
448    chunk_size: usize,
449    price_precision: Option<u8>,
450    size_precision: Option<u8>,
451    instrument_id: Option<InstrumentId>,
452    limit: Option<usize>,
453) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<PyObject>>>> {
454    BatchedDeltasStreamIterator::new(
455        filepath,
456        chunk_size,
457        price_precision,
458        size_precision,
459        instrument_id,
460        limit,
461    )
462}
463
464////////////////////////////////////////////////////////////////////////////////
465// Quote Streaming
466////////////////////////////////////////////////////////////////////////////////
467
468/// An iterator for streaming [`QuoteTick`]s from a Tardis CSV file in chunks.
469struct QuoteStreamIterator {
470    reader: Reader<Box<dyn Read>>,
471    record: StringRecord,
472    buffer: Vec<QuoteTick>,
473    chunk_size: usize,
474    instrument_id: Option<InstrumentId>,
475    price_precision: u8,
476    size_precision: u8,
477    limit: Option<usize>,
478    records_processed: usize,
479}
480
481impl QuoteStreamIterator {
482    /// Creates a new [`QuoteStreamIterator`].
483    ///
484    /// # Errors
485    ///
486    /// Returns an error if the file cannot be opened or read.
487    pub fn new<P: AsRef<Path>>(
488        filepath: P,
489        chunk_size: usize,
490        price_precision: Option<u8>,
491        size_precision: Option<u8>,
492        instrument_id: Option<InstrumentId>,
493        limit: Option<usize>,
494    ) -> anyhow::Result<Self> {
495        let (final_price_precision, final_size_precision) =
496            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
497                // Both precisions provided, use them directly
498                (price_prec, size_prec)
499            } else {
500                // One or both precisions missing, detect only the missing ones
501                let mut reader = create_csv_reader(&filepath)?;
502                let mut record = StringRecord::new();
503                let (detected_price, detected_size) =
504                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
505                (
506                    price_precision.unwrap_or(detected_price),
507                    size_precision.unwrap_or(detected_size),
508                )
509            };
510
511        let reader = create_csv_reader(filepath)?;
512
513        Ok(Self {
514            reader,
515            record: StringRecord::new(),
516            buffer: Vec::with_capacity(chunk_size),
517            chunk_size,
518            instrument_id,
519            price_precision: final_price_precision,
520            size_precision: final_size_precision,
521            limit,
522            records_processed: 0,
523        })
524    }
525
526    fn detect_precision_from_sample(
527        reader: &mut Reader<Box<dyn std::io::Read>>,
528        record: &mut StringRecord,
529        sample_size: usize,
530    ) -> anyhow::Result<(u8, u8)> {
531        let mut max_price_precision = 2u8;
532        let mut max_size_precision = 0u8;
533        let mut records_scanned = 0;
534
535        while records_scanned < sample_size {
536            match reader.read_record(record) {
537                Ok(true) => {
538                    if let Ok(data) = record.deserialize::<TardisQuoteRecord>(None) {
539                        if let Some(bid_price_val) = data.bid_price {
540                            max_price_precision =
541                                max_price_precision.max(infer_precision(bid_price_val));
542                        }
543                        if let Some(ask_price_val) = data.ask_price {
544                            max_price_precision =
545                                max_price_precision.max(infer_precision(ask_price_val));
546                        }
547                        if let Some(bid_amount_val) = data.bid_amount {
548                            max_size_precision =
549                                max_size_precision.max(infer_precision(bid_amount_val));
550                        }
551                        if let Some(ask_amount_val) = data.ask_amount {
552                            max_size_precision =
553                                max_size_precision.max(infer_precision(ask_amount_val));
554                        }
555                        records_scanned += 1;
556                    }
557                }
558                Ok(false) => break,             // End of file
559                Err(_) => records_scanned += 1, // Skip malformed records
560            }
561        }
562
563        Ok((max_price_precision, max_size_precision))
564    }
565}
566
567impl Iterator for QuoteStreamIterator {
568    type Item = anyhow::Result<Vec<QuoteTick>>;
569
570    fn next(&mut self) -> Option<Self::Item> {
571        if let Some(limit) = self.limit
572            && self.records_processed >= limit
573        {
574            return None;
575        }
576
577        self.buffer.clear();
578        let mut records_read = 0;
579
580        while records_read < self.chunk_size {
581            match self.reader.read_record(&mut self.record) {
582                Ok(true) => match self.record.deserialize::<TardisQuoteRecord>(None) {
583                    Ok(data) => {
584                        let quote = parse_quote_record(
585                            &data,
586                            self.price_precision,
587                            self.size_precision,
588                            self.instrument_id,
589                        );
590
591                        self.buffer.push(quote);
592                        records_read += 1;
593                        self.records_processed += 1;
594
595                        if let Some(limit) = self.limit
596                            && self.records_processed >= limit
597                        {
598                            break;
599                        }
600                    }
601                    Err(e) => {
602                        return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
603                    }
604                },
605                Ok(false) => {
606                    if self.buffer.is_empty() {
607                        return None;
608                    }
609                    return Some(Ok(self.buffer.clone()));
610                }
611                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
612            }
613        }
614
615        if self.buffer.is_empty() {
616            None
617        } else {
618            Some(Ok(self.buffer.clone()))
619        }
620    }
621}
622
623/// Streams [`QuoteTick`]s from a Tardis format CSV at the given `filepath`,
624/// yielding chunks of the specified size.
625///
626/// # Precision Inference Warning
627///
628/// When using streaming with precision inference (not providing explicit precisions),
629/// the inferred precision may differ from bulk loading the entire file. This is because
630/// precision inference works within chunk boundaries, and different chunks may contain
631/// values with different precision requirements. For deterministic precision behavior,
632/// provide explicit `price_precision` and `size_precision` parameters.
633///
634/// # Errors
635///
636/// Returns an error if the file cannot be opened, read, or parsed as CSV.
637pub fn stream_quotes<P: AsRef<Path>>(
638    filepath: P,
639    chunk_size: usize,
640    price_precision: Option<u8>,
641    size_precision: Option<u8>,
642    instrument_id: Option<InstrumentId>,
643    limit: Option<usize>,
644) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<QuoteTick>>>> {
645    QuoteStreamIterator::new(
646        filepath,
647        chunk_size,
648        price_precision,
649        size_precision,
650        instrument_id,
651        limit,
652    )
653}
654
655////////////////////////////////////////////////////////////////////////////////
656// Trade Streaming
657////////////////////////////////////////////////////////////////////////////////
658
659/// An iterator for streaming [`TradeTick`]s from a Tardis CSV file in chunks.
660struct TradeStreamIterator {
661    reader: Reader<Box<dyn Read>>,
662    record: StringRecord,
663    buffer: Vec<TradeTick>,
664    chunk_size: usize,
665    instrument_id: Option<InstrumentId>,
666    price_precision: u8,
667    size_precision: u8,
668    limit: Option<usize>,
669    records_processed: usize,
670}
671
672impl TradeStreamIterator {
673    /// Creates a new [`TradeStreamIterator`].
674    ///
675    /// # Errors
676    ///
677    /// Returns an error if the file cannot be opened or read.
678    pub fn new<P: AsRef<Path>>(
679        filepath: P,
680        chunk_size: usize,
681        price_precision: Option<u8>,
682        size_precision: Option<u8>,
683        instrument_id: Option<InstrumentId>,
684        limit: Option<usize>,
685    ) -> anyhow::Result<Self> {
686        let (final_price_precision, final_size_precision) =
687            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
688                // Both precisions provided, use them directly
689                (price_prec, size_prec)
690            } else {
691                // One or both precisions missing, detect only the missing ones
692                let mut reader = create_csv_reader(&filepath)?;
693                let mut record = StringRecord::new();
694                let (detected_price, detected_size) =
695                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
696                (
697                    price_precision.unwrap_or(detected_price),
698                    size_precision.unwrap_or(detected_size),
699                )
700            };
701
702        let reader = create_csv_reader(filepath)?;
703
704        Ok(Self {
705            reader,
706            record: StringRecord::new(),
707            buffer: Vec::with_capacity(chunk_size),
708            chunk_size,
709            instrument_id,
710            price_precision: final_price_precision,
711            size_precision: final_size_precision,
712            limit,
713            records_processed: 0,
714        })
715    }
716
717    fn detect_precision_from_sample(
718        reader: &mut Reader<Box<dyn std::io::Read>>,
719        record: &mut StringRecord,
720        sample_size: usize,
721    ) -> anyhow::Result<(u8, u8)> {
722        let mut max_price_precision = 2u8;
723        let mut max_size_precision = 0u8;
724        let mut records_scanned = 0;
725
726        while records_scanned < sample_size {
727            match reader.read_record(record) {
728                Ok(true) => {
729                    if let Ok(data) = record.deserialize::<TardisTradeRecord>(None) {
730                        max_price_precision = max_price_precision.max(infer_precision(data.price));
731                        max_size_precision = max_size_precision.max(infer_precision(data.amount));
732                        records_scanned += 1;
733                    }
734                }
735                Ok(false) => break,             // End of file
736                Err(_) => records_scanned += 1, // Skip malformed records
737            }
738        }
739
740        Ok((max_price_precision, max_size_precision))
741    }
742}
743
744impl Iterator for TradeStreamIterator {
745    type Item = anyhow::Result<Vec<TradeTick>>;
746
747    fn next(&mut self) -> Option<Self::Item> {
748        if let Some(limit) = self.limit
749            && self.records_processed >= limit
750        {
751            return None;
752        }
753
754        self.buffer.clear();
755        let mut records_read = 0;
756
757        while records_read < self.chunk_size {
758            match self.reader.read_record(&mut self.record) {
759                Ok(true) => match self.record.deserialize::<TardisTradeRecord>(None) {
760                    Ok(data) => {
761                        let size = Quantity::new(data.amount, self.size_precision);
762
763                        if size.is_positive() {
764                            let trade = parse_trade_record(
765                                &data,
766                                size,
767                                self.price_precision,
768                                self.instrument_id,
769                            );
770
771                            self.buffer.push(trade);
772                            records_read += 1;
773                            self.records_processed += 1;
774
775                            if let Some(limit) = self.limit
776                                && self.records_processed >= limit
777                            {
778                                break;
779                            }
780                        } else {
781                            log::warn!("Skipping zero-sized trade: {data:?}");
782                        }
783                    }
784                    Err(e) => {
785                        return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
786                    }
787                },
788                Ok(false) => {
789                    if self.buffer.is_empty() {
790                        return None;
791                    }
792                    return Some(Ok(self.buffer.clone()));
793                }
794                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
795            }
796        }
797
798        if self.buffer.is_empty() {
799            None
800        } else {
801            Some(Ok(self.buffer.clone()))
802        }
803    }
804}
805
806/// Streams [`TradeTick`]s from a Tardis format CSV at the given `filepath`,
807/// yielding chunks of the specified size.
808///
809/// # Precision Inference Warning
810///
811/// When using streaming with precision inference (not providing explicit precisions),
812/// the inferred precision may differ from bulk loading the entire file. This is because
813/// precision inference works within chunk boundaries, and different chunks may contain
814/// values with different precision requirements. For deterministic precision behavior,
815/// provide explicit `price_precision` and `size_precision` parameters.
816///
817/// # Errors
818///
819/// Returns an error if the file cannot be opened, read, or parsed as CSV.
820pub fn stream_trades<P: AsRef<Path>>(
821    filepath: P,
822    chunk_size: usize,
823    price_precision: Option<u8>,
824    size_precision: Option<u8>,
825    instrument_id: Option<InstrumentId>,
826    limit: Option<usize>,
827) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<TradeTick>>>> {
828    TradeStreamIterator::new(
829        filepath,
830        chunk_size,
831        price_precision,
832        size_precision,
833        instrument_id,
834        limit,
835    )
836}
837
838////////////////////////////////////////////////////////////////////////////////
839// Depth10 Streaming
840////////////////////////////////////////////////////////////////////////////////
841
842/// An iterator for streaming [`OrderBookDepth10`]s from a Tardis CSV file in chunks.
843struct Depth10StreamIterator {
844    reader: Reader<Box<dyn Read>>,
845    record: StringRecord,
846    buffer: Vec<OrderBookDepth10>,
847    chunk_size: usize,
848    levels: u8,
849    instrument_id: Option<InstrumentId>,
850    price_precision: u8,
851    size_precision: u8,
852    limit: Option<usize>,
853    records_processed: usize,
854}
855
856impl Depth10StreamIterator {
857    /// Creates a new [`Depth10StreamIterator`].
858    ///
859    /// # Errors
860    ///
861    /// Returns an error if the file cannot be opened or read.
862    pub fn new<P: AsRef<Path>>(
863        filepath: P,
864        chunk_size: usize,
865        levels: u8,
866        price_precision: Option<u8>,
867        size_precision: Option<u8>,
868        instrument_id: Option<InstrumentId>,
869        limit: Option<usize>,
870    ) -> anyhow::Result<Self> {
871        let (final_price_precision, final_size_precision) =
872            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
873                // Both precisions provided, use them directly
874                (price_prec, size_prec)
875            } else {
876                // One or both precisions missing, detect only the missing ones
877                let mut reader = create_csv_reader(&filepath)?;
878                let mut record = StringRecord::new();
879                let (detected_price, detected_size) =
880                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
881                (
882                    price_precision.unwrap_or(detected_price),
883                    size_precision.unwrap_or(detected_size),
884                )
885            };
886
887        let reader = create_csv_reader(filepath)?;
888
889        Ok(Self {
890            reader,
891            record: StringRecord::new(),
892            buffer: Vec::with_capacity(chunk_size),
893            chunk_size,
894            levels,
895            instrument_id,
896            price_precision: final_price_precision,
897            size_precision: final_size_precision,
898            limit,
899            records_processed: 0,
900        })
901    }
902
903    fn process_snapshot5(&mut self, data: TardisOrderBookSnapshot5Record) -> OrderBookDepth10 {
904        let instrument_id = self
905            .instrument_id
906            .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
907
908        let mut bids = [NULL_ORDER; DEPTH10_LEN];
909        let mut asks = [NULL_ORDER; DEPTH10_LEN];
910        let mut bid_counts = [0_u32; DEPTH10_LEN];
911        let mut ask_counts = [0_u32; DEPTH10_LEN];
912
913        // Process first 5 levels from snapshot5 data
914        for i in 0..5 {
915            let (bid_price, bid_amount) = match i {
916                0 => (data.bids_0_price, data.bids_0_amount),
917                1 => (data.bids_1_price, data.bids_1_amount),
918                2 => (data.bids_2_price, data.bids_2_amount),
919                3 => (data.bids_3_price, data.bids_3_amount),
920                4 => (data.bids_4_price, data.bids_4_amount),
921                _ => unreachable!(),
922            };
923
924            let (ask_price, ask_amount) = match i {
925                0 => (data.asks_0_price, data.asks_0_amount),
926                1 => (data.asks_1_price, data.asks_1_amount),
927                2 => (data.asks_2_price, data.asks_2_amount),
928                3 => (data.asks_3_price, data.asks_3_amount),
929                4 => (data.asks_4_price, data.asks_4_amount),
930                _ => unreachable!(),
931            };
932
933            let (bid_order, bid_count) = create_book_order(
934                OrderSide::Buy,
935                bid_price,
936                bid_amount,
937                self.price_precision,
938                self.size_precision,
939            );
940            bids[i] = bid_order;
941            bid_counts[i] = bid_count;
942
943            let (ask_order, ask_count) = create_book_order(
944                OrderSide::Sell,
945                ask_price,
946                ask_amount,
947                self.price_precision,
948                self.size_precision,
949            );
950            asks[i] = ask_order;
951            ask_counts[i] = ask_count;
952        }
953
954        let flags = RecordFlag::F_SNAPSHOT.value();
955        let sequence = 0;
956        let ts_event = parse_timestamp(data.timestamp);
957        let ts_init = parse_timestamp(data.local_timestamp);
958
959        OrderBookDepth10::new(
960            instrument_id,
961            bids,
962            asks,
963            bid_counts,
964            ask_counts,
965            flags,
966            sequence,
967            ts_event,
968            ts_init,
969        )
970    }
971
972    fn process_snapshot25(&mut self, data: TardisOrderBookSnapshot25Record) -> OrderBookDepth10 {
973        let instrument_id = self
974            .instrument_id
975            .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
976
977        let mut bids = [NULL_ORDER; DEPTH10_LEN];
978        let mut asks = [NULL_ORDER; DEPTH10_LEN];
979        let mut bid_counts = [0_u32; DEPTH10_LEN];
980        let mut ask_counts = [0_u32; DEPTH10_LEN];
981
982        // Process first 10 levels from snapshot25 data
983        for i in 0..DEPTH10_LEN {
984            let (bid_price, bid_amount) = match i {
985                0 => (data.bids_0_price, data.bids_0_amount),
986                1 => (data.bids_1_price, data.bids_1_amount),
987                2 => (data.bids_2_price, data.bids_2_amount),
988                3 => (data.bids_3_price, data.bids_3_amount),
989                4 => (data.bids_4_price, data.bids_4_amount),
990                5 => (data.bids_5_price, data.bids_5_amount),
991                6 => (data.bids_6_price, data.bids_6_amount),
992                7 => (data.bids_7_price, data.bids_7_amount),
993                8 => (data.bids_8_price, data.bids_8_amount),
994                9 => (data.bids_9_price, data.bids_9_amount),
995                _ => unreachable!(),
996            };
997
998            let (ask_price, ask_amount) = match i {
999                0 => (data.asks_0_price, data.asks_0_amount),
1000                1 => (data.asks_1_price, data.asks_1_amount),
1001                2 => (data.asks_2_price, data.asks_2_amount),
1002                3 => (data.asks_3_price, data.asks_3_amount),
1003                4 => (data.asks_4_price, data.asks_4_amount),
1004                5 => (data.asks_5_price, data.asks_5_amount),
1005                6 => (data.asks_6_price, data.asks_6_amount),
1006                7 => (data.asks_7_price, data.asks_7_amount),
1007                8 => (data.asks_8_price, data.asks_8_amount),
1008                9 => (data.asks_9_price, data.asks_9_amount),
1009                _ => unreachable!(),
1010            };
1011
1012            let (bid_order, bid_count) = create_book_order(
1013                OrderSide::Buy,
1014                bid_price,
1015                bid_amount,
1016                self.price_precision,
1017                self.size_precision,
1018            );
1019            bids[i] = bid_order;
1020            bid_counts[i] = bid_count;
1021
1022            let (ask_order, ask_count) = create_book_order(
1023                OrderSide::Sell,
1024                ask_price,
1025                ask_amount,
1026                self.price_precision,
1027                self.size_precision,
1028            );
1029            asks[i] = ask_order;
1030            ask_counts[i] = ask_count;
1031        }
1032
1033        let flags = RecordFlag::F_SNAPSHOT.value();
1034        let sequence = 0;
1035        let ts_event = parse_timestamp(data.timestamp);
1036        let ts_init = parse_timestamp(data.local_timestamp);
1037
1038        OrderBookDepth10::new(
1039            instrument_id,
1040            bids,
1041            asks,
1042            bid_counts,
1043            ask_counts,
1044            flags,
1045            sequence,
1046            ts_event,
1047            ts_init,
1048        )
1049    }
1050
1051    fn detect_precision_from_sample(
1052        reader: &mut Reader<Box<dyn std::io::Read>>,
1053        record: &mut StringRecord,
1054        sample_size: usize,
1055    ) -> anyhow::Result<(u8, u8)> {
1056        let mut max_price_precision = 2u8;
1057        let mut max_size_precision = 0u8;
1058        let mut records_scanned = 0;
1059
1060        while records_scanned < sample_size {
1061            match reader.read_record(record) {
1062                Ok(true) => {
1063                    // Try to deserialize as snapshot5 record first
1064                    if let Ok(data) = record.deserialize::<TardisOrderBookSnapshot5Record>(None) {
1065                        if let Some(bid_price) = data.bids_0_price {
1066                            max_price_precision =
1067                                max_price_precision.max(infer_precision(bid_price));
1068                        }
1069                        if let Some(ask_price) = data.asks_0_price {
1070                            max_price_precision =
1071                                max_price_precision.max(infer_precision(ask_price));
1072                        }
1073                        if let Some(bid_amount) = data.bids_0_amount {
1074                            max_size_precision =
1075                                max_size_precision.max(infer_precision(bid_amount));
1076                        }
1077                        if let Some(ask_amount) = data.asks_0_amount {
1078                            max_size_precision =
1079                                max_size_precision.max(infer_precision(ask_amount));
1080                        }
1081                        records_scanned += 1;
1082                    } else if let Ok(data) =
1083                        record.deserialize::<TardisOrderBookSnapshot25Record>(None)
1084                    {
1085                        if let Some(bid_price) = data.bids_0_price {
1086                            max_price_precision =
1087                                max_price_precision.max(infer_precision(bid_price));
1088                        }
1089                        if let Some(ask_price) = data.asks_0_price {
1090                            max_price_precision =
1091                                max_price_precision.max(infer_precision(ask_price));
1092                        }
1093                        if let Some(bid_amount) = data.bids_0_amount {
1094                            max_size_precision =
1095                                max_size_precision.max(infer_precision(bid_amount));
1096                        }
1097                        if let Some(ask_amount) = data.asks_0_amount {
1098                            max_size_precision =
1099                                max_size_precision.max(infer_precision(ask_amount));
1100                        }
1101                        records_scanned += 1;
1102                    }
1103                }
1104                Ok(false) => break,             // End of file
1105                Err(_) => records_scanned += 1, // Skip malformed records
1106            }
1107        }
1108
1109        Ok((max_price_precision, max_size_precision))
1110    }
1111}
1112
1113impl Iterator for Depth10StreamIterator {
1114    type Item = anyhow::Result<Vec<OrderBookDepth10>>;
1115
1116    fn next(&mut self) -> Option<Self::Item> {
1117        if let Some(limit) = self.limit
1118            && self.records_processed >= limit
1119        {
1120            return None;
1121        }
1122
1123        if !self.buffer.is_empty() {
1124            let chunk = self.buffer.split_off(0);
1125            return Some(Ok(chunk));
1126        }
1127
1128        self.buffer.clear();
1129        let mut records_read = 0;
1130
1131        while records_read < self.chunk_size {
1132            match self.reader.read_record(&mut self.record) {
1133                Ok(true) => {
1134                    let result = match self.levels {
1135                        5 => self
1136                            .record
1137                            .deserialize::<TardisOrderBookSnapshot5Record>(None)
1138                            .map(|data| self.process_snapshot5(data)),
1139                        25 => self
1140                            .record
1141                            .deserialize::<TardisOrderBookSnapshot25Record>(None)
1142                            .map(|data| self.process_snapshot25(data)),
1143                        _ => return Some(Err(anyhow::anyhow!("Invalid levels: {}", self.levels))),
1144                    };
1145
1146                    match result {
1147                        Ok(depth) => {
1148                            self.buffer.push(depth);
1149                            records_read += 1;
1150                            self.records_processed += 1;
1151
1152                            if let Some(limit) = self.limit
1153                                && self.records_processed >= limit
1154                            {
1155                                break;
1156                            }
1157                        }
1158                        Err(e) => {
1159                            return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
1160                        }
1161                    }
1162                }
1163                Ok(false) => {
1164                    if self.buffer.is_empty() {
1165                        return None;
1166                    }
1167                    let chunk = self.buffer.split_off(0);
1168                    return Some(Ok(chunk));
1169                }
1170                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
1171            }
1172        }
1173
1174        if self.buffer.is_empty() {
1175            None
1176        } else {
1177            let chunk = self.buffer.split_off(0);
1178            Some(Ok(chunk))
1179        }
1180    }
1181}
1182
1183/// Streams [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
1184/// yielding chunks of the specified size.
1185///
1186/// # Precision Inference Warning
1187///
1188/// When using streaming with precision inference (not providing explicit precisions),
1189/// the inferred precision may differ from bulk loading the entire file. This is because
1190/// precision inference works within chunk boundaries, and different chunks may contain
1191/// values with different precision requirements. For deterministic precision behavior,
1192/// provide explicit `price_precision` and `size_precision` parameters.
1193///
1194/// # Errors
1195///
1196/// Returns an error if the file cannot be opened, read, or parsed as CSV.
1197pub fn stream_depth10_from_snapshot5<P: AsRef<Path>>(
1198    filepath: P,
1199    chunk_size: usize,
1200    price_precision: Option<u8>,
1201    size_precision: Option<u8>,
1202    instrument_id: Option<InstrumentId>,
1203    limit: Option<usize>,
1204) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDepth10>>>> {
1205    Depth10StreamIterator::new(
1206        filepath,
1207        chunk_size,
1208        5,
1209        price_precision,
1210        size_precision,
1211        instrument_id,
1212        limit,
1213    )
1214}
1215
1216/// Streams [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
1217/// yielding chunks of the specified size.
1218///
1219/// # Precision Inference Warning
1220///
1221/// When using streaming with precision inference (not providing explicit precisions),
1222/// the inferred precision may differ from bulk loading the entire file. This is because
1223/// precision inference works within chunk boundaries, and different chunks may contain
1224/// values with different precision requirements. For deterministic precision behavior,
1225/// provide explicit `price_precision` and `size_precision` parameters.
1226///
1227/// # Errors
1228///
1229/// Returns an error if the file cannot be opened, read, or parsed as CSV.
1230pub fn stream_depth10_from_snapshot25<P: AsRef<Path>>(
1231    filepath: P,
1232    chunk_size: usize,
1233    price_precision: Option<u8>,
1234    size_precision: Option<u8>,
1235    instrument_id: Option<InstrumentId>,
1236    limit: Option<usize>,
1237) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDepth10>>>> {
1238    Depth10StreamIterator::new(
1239        filepath,
1240        chunk_size,
1241        25,
1242        price_precision,
1243        size_precision,
1244        instrument_id,
1245        limit,
1246    )
1247}
1248
1249////////////////////////////////////////////////////////////////////////////////
1250// FundingRateUpdate Streaming
1251////////////////////////////////////////////////////////////////////////////////
1252
1253use nautilus_model::data::FundingRateUpdate;
1254
1255use crate::csv::record::TardisDerivativeTickerRecord;
1256
1257/// An iterator for streaming [`FundingRateUpdate`]s from a Tardis CSV file in chunks.
1258struct FundingRateStreamIterator {
1259    reader: Reader<Box<dyn Read>>,
1260    record: StringRecord,
1261    buffer: Vec<FundingRateUpdate>,
1262    chunk_size: usize,
1263    instrument_id: Option<InstrumentId>,
1264    limit: Option<usize>,
1265    records_processed: usize,
1266}
1267
1268impl FundingRateStreamIterator {
1269    /// Creates a new [`FundingRateStreamIterator`].
1270    ///
1271    /// # Errors
1272    ///
1273    /// Returns an error if the file cannot be opened or read.
1274    fn new<P: AsRef<Path>>(
1275        filepath: P,
1276        chunk_size: usize,
1277        instrument_id: Option<InstrumentId>,
1278        limit: Option<usize>,
1279    ) -> anyhow::Result<Self> {
1280        let reader = create_csv_reader(filepath)?;
1281
1282        Ok(Self {
1283            reader,
1284            record: StringRecord::new(),
1285            buffer: Vec::with_capacity(chunk_size),
1286            chunk_size,
1287            instrument_id,
1288            limit,
1289            records_processed: 0,
1290        })
1291    }
1292}
1293
1294impl Iterator for FundingRateStreamIterator {
1295    type Item = anyhow::Result<Vec<FundingRateUpdate>>;
1296
1297    fn next(&mut self) -> Option<Self::Item> {
1298        if let Some(limit) = self.limit
1299            && self.records_processed >= limit
1300        {
1301            return None;
1302        }
1303
1304        if !self.buffer.is_empty() {
1305            let chunk = self.buffer.split_off(0);
1306            return Some(Ok(chunk));
1307        }
1308
1309        self.buffer.clear();
1310        let mut records_read = 0;
1311
1312        while records_read < self.chunk_size {
1313            match self.reader.read_record(&mut self.record) {
1314                Ok(true) => {
1315                    let result = self
1316                        .record
1317                        .deserialize::<TardisDerivativeTickerRecord>(None)
1318                        .map_err(anyhow::Error::from)
1319                        .map(|data| parse_derivative_ticker_record(&data, self.instrument_id));
1320
1321                    match result {
1322                        Ok(Some(funding_rate)) => {
1323                            self.buffer.push(funding_rate);
1324                            records_read += 1;
1325                            self.records_processed += 1;
1326
1327                            if let Some(limit) = self.limit
1328                                && self.records_processed >= limit
1329                            {
1330                                break;
1331                            }
1332                        }
1333                        Ok(None) => {
1334                            // Skip this record as it has no funding data
1335                            self.records_processed += 1;
1336                        }
1337                        Err(e) => {
1338                            return Some(Err(anyhow::anyhow!(
1339                                "Failed to parse funding rate record: {e}"
1340                            )));
1341                        }
1342                    }
1343                }
1344                Ok(false) => {
1345                    if self.buffer.is_empty() {
1346                        return None;
1347                    }
1348                    let chunk = self.buffer.split_off(0);
1349                    return Some(Ok(chunk));
1350                }
1351                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
1352            }
1353        }
1354
1355        if self.buffer.is_empty() {
1356            None
1357        } else {
1358            let chunk = self.buffer.split_off(0);
1359            Some(Ok(chunk))
1360        }
1361    }
1362}
1363
1364/// Streams [`FundingRateUpdate`]s from a Tardis derivative ticker CSV file,
1365/// yielding chunks of the specified size.
1366///
1367/// This function parses the `funding_rate`, `predicted_funding_rate`, and `funding_timestamp`
1368/// fields from derivative ticker data to create funding rate updates.
1369///
1370/// # Errors
1371///
1372/// Returns an error if the file cannot be opened, read, or parsed as CSV.
1373pub fn stream_funding_rates<P: AsRef<Path>>(
1374    filepath: P,
1375    chunk_size: usize,
1376    instrument_id: Option<InstrumentId>,
1377    limit: Option<usize>,
1378) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<FundingRateUpdate>>>> {
1379    FundingRateStreamIterator::new(filepath, chunk_size, instrument_id, limit)
1380}
1381
1382////////////////////////////////////////////////////////////////////////////////
1383// Tests
1384////////////////////////////////////////////////////////////////////////////////
1385#[cfg(test)]
1386mod tests {
1387    use nautilus_model::{
1388        enums::AggressorSide,
1389        identifiers::TradeId,
1390        types::{Price, Quantity},
1391    };
1392    use rstest::*;
1393
1394    use super::*;
1395    use crate::{csv::load::load_deltas, parse::parse_price, tests::get_test_data_path};
1396
1397    #[rstest]
1398    #[case(0.0, 0)]
1399    #[case(42.0, 0)]
1400    #[case(0.1, 1)]
1401    #[case(0.25, 2)]
1402    #[case(123.0001, 4)]
1403    #[case(-42.987654321,       9)]
1404    #[case(1.234_567_890_123, 12)]
1405    fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
1406        assert_eq!(infer_precision(input), expected);
1407    }
1408
1409    #[rstest]
1410    pub fn test_stream_deltas_chunked() {
1411        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1412binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
1413binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
1414binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
1415binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
1416binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
1417
1418        let temp_file = std::env::temp_dir().join("test_stream_deltas.csv");
1419        std::fs::write(&temp_file, csv_data).unwrap();
1420
1421        let stream = stream_deltas(&temp_file, 2, Some(4), Some(1), None, None).unwrap();
1422        let chunks: Vec<_> = stream.collect();
1423
1424        assert_eq!(chunks.len(), 3);
1425
1426        let chunk1 = chunks[0].as_ref().unwrap();
1427        assert_eq!(chunk1.len(), 2);
1428        assert_eq!(chunk1[0].order.price.precision, 4);
1429        assert_eq!(chunk1[1].order.price.precision, 4);
1430
1431        let chunk2 = chunks[1].as_ref().unwrap();
1432        assert_eq!(chunk2.len(), 2);
1433        assert_eq!(chunk2[0].order.price.precision, 4);
1434        assert_eq!(chunk2[1].order.price.precision, 4);
1435
1436        let chunk3 = chunks[2].as_ref().unwrap();
1437        assert_eq!(chunk3.len(), 1);
1438        assert_eq!(chunk3[0].order.price.precision, 4);
1439
1440        let total_deltas: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1441        assert_eq!(total_deltas, 5);
1442
1443        std::fs::remove_file(&temp_file).ok();
1444    }
1445
1446    #[rstest]
1447    pub fn test_stream_quotes_chunked() {
1448        let csv_data =
1449            "exchange,symbol,timestamp,local_timestamp,ask_amount,ask_price,bid_price,bid_amount
1450binance,BTCUSDT,1640995200000000,1640995200100000,1.0,50000.0,49999.0,1.5
1451binance,BTCUSDT,1640995201000000,1640995201100000,2.0,50000.5,49999.5,2.5
1452binance,BTCUSDT,1640995202000000,1640995202100000,1.5,50000.12,49999.12,1.8
1453binance,BTCUSDT,1640995203000000,1640995203100000,3.0,50000.123,49999.123,3.2
1454binance,BTCUSDT,1640995204000000,1640995204100000,0.5,50000.1234,49999.1234,0.8";
1455
1456        let temp_file = std::env::temp_dir().join("test_stream_quotes.csv");
1457        std::fs::write(&temp_file, csv_data).unwrap();
1458
1459        let stream = stream_quotes(&temp_file, 2, Some(4), Some(1), None, None).unwrap();
1460        let chunks: Vec<_> = stream.collect();
1461
1462        assert_eq!(chunks.len(), 3);
1463
1464        let chunk1 = chunks[0].as_ref().unwrap();
1465        assert_eq!(chunk1.len(), 2);
1466        assert_eq!(chunk1[0].bid_price.precision, 4);
1467        assert_eq!(chunk1[1].bid_price.precision, 4);
1468
1469        let chunk2 = chunks[1].as_ref().unwrap();
1470        assert_eq!(chunk2.len(), 2);
1471        assert_eq!(chunk2[0].bid_price.precision, 4);
1472        assert_eq!(chunk2[1].bid_price.precision, 4);
1473
1474        let chunk3 = chunks[2].as_ref().unwrap();
1475        assert_eq!(chunk3.len(), 1);
1476        assert_eq!(chunk3[0].bid_price.precision, 4);
1477
1478        let total_quotes: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1479        assert_eq!(total_quotes, 5);
1480
1481        std::fs::remove_file(&temp_file).ok();
1482    }
1483
1484    #[rstest]
1485    pub fn test_stream_trades_chunked() {
1486        let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1487binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1488binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,2.0
1489binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1490binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0
1491binance,BTCUSDT,1640995204000000,1640995204100000,trade5,buy,50000.1234,0.5";
1492
1493        let temp_file = std::env::temp_dir().join("test_stream_trades.csv");
1494        std::fs::write(&temp_file, csv_data).unwrap();
1495
1496        let stream = stream_trades(&temp_file, 3, Some(4), Some(1), None, None).unwrap();
1497        let chunks: Vec<_> = stream.collect();
1498
1499        assert_eq!(chunks.len(), 2);
1500
1501        let chunk1 = chunks[0].as_ref().unwrap();
1502        assert_eq!(chunk1.len(), 3);
1503        assert_eq!(chunk1[0].price.precision, 4);
1504        assert_eq!(chunk1[1].price.precision, 4);
1505        assert_eq!(chunk1[2].price.precision, 4);
1506
1507        let chunk2 = chunks[1].as_ref().unwrap();
1508        assert_eq!(chunk2.len(), 2);
1509        assert_eq!(chunk2[0].price.precision, 4);
1510        assert_eq!(chunk2[1].price.precision, 4);
1511
1512        assert_eq!(chunk1[0].aggressor_side, AggressorSide::Buyer);
1513        assert_eq!(chunk1[1].aggressor_side, AggressorSide::Seller);
1514
1515        let total_trades: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1516        assert_eq!(total_trades, 5);
1517
1518        std::fs::remove_file(&temp_file).ok();
1519    }
1520
1521    #[rstest]
1522    pub fn test_stream_trades_with_zero_sized_trade() {
1523        // Test CSV data with one zero-sized trade that should be skipped
1524        let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1525binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1526binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,0.0
1527binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1528binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0";
1529
1530        let temp_file = std::env::temp_dir().join("test_stream_trades_zero_size.csv");
1531        std::fs::write(&temp_file, csv_data).unwrap();
1532
1533        let stream = stream_trades(&temp_file, 3, Some(4), Some(1), None, None).unwrap();
1534        let chunks: Vec<_> = stream.collect();
1535
1536        // Should have 1 chunk with 3 valid trades (zero-sized trade skipped)
1537        assert_eq!(chunks.len(), 1);
1538
1539        let chunk1 = chunks[0].as_ref().unwrap();
1540        assert_eq!(chunk1.len(), 3);
1541
1542        // Verify the trades are the correct ones (not the zero-sized one)
1543        assert_eq!(chunk1[0].size, Quantity::from("1.0"));
1544        assert_eq!(chunk1[1].size, Quantity::from("1.5"));
1545        assert_eq!(chunk1[2].size, Quantity::from("3.0"));
1546
1547        // Verify trade IDs to confirm correct trades were loaded
1548        assert_eq!(chunk1[0].trade_id, TradeId::new("trade1"));
1549        assert_eq!(chunk1[1].trade_id, TradeId::new("trade3"));
1550        assert_eq!(chunk1[2].trade_id, TradeId::new("trade4"));
1551
1552        std::fs::remove_file(&temp_file).ok();
1553    }
1554
1555    #[rstest]
1556    pub fn test_stream_depth10_from_snapshot5_chunked() {
1557        let csv_data = "exchange,symbol,timestamp,local_timestamp,asks[0].price,asks[0].amount,bids[0].price,bids[0].amount,asks[1].price,asks[1].amount,bids[1].price,bids[1].amount,asks[2].price,asks[2].amount,bids[2].price,bids[2].amount,asks[3].price,asks[3].amount,bids[3].price,bids[3].amount,asks[4].price,asks[4].amount,bids[4].price,bids[4].amount
1558binance,BTCUSDT,1640995200000000,1640995200100000,50001.0,1.0,49999.0,1.5,50002.0,2.0,49998.0,2.5,50003.0,3.0,49997.0,3.5,50004.0,4.0,49996.0,4.5,50005.0,5.0,49995.0,5.5
1559binance,BTCUSDT,1640995201000000,1640995201100000,50001.5,1.1,49999.5,1.6,50002.5,2.1,49998.5,2.6,50003.5,3.1,49997.5,3.6,50004.5,4.1,49996.5,4.6,50005.5,5.1,49995.5,5.6
1560binance,BTCUSDT,1640995202000000,1640995202100000,50001.12,1.12,49999.12,1.62,50002.12,2.12,49998.12,2.62,50003.12,3.12,49997.12,3.62,50004.12,4.12,49996.12,4.62,50005.12,5.12,49995.12,5.62";
1561
1562        // Write to temporary file
1563        let temp_file = std::env::temp_dir().join("test_stream_depth10_snapshot5.csv");
1564        std::fs::write(&temp_file, csv_data).unwrap();
1565
1566        // Stream with chunk size of 2
1567        let stream = stream_depth10_from_snapshot5(&temp_file, 2, None, None, None, None).unwrap();
1568        let chunks: Vec<_> = stream.collect();
1569
1570        // Should have 2 chunks: [2 items, 1 item]
1571        assert_eq!(chunks.len(), 2);
1572
1573        // First chunk: 2 depth snapshots
1574        let chunk1 = chunks[0].as_ref().unwrap();
1575        assert_eq!(chunk1.len(), 2);
1576
1577        // Second chunk: 1 depth snapshot
1578        let chunk2 = chunks[1].as_ref().unwrap();
1579        assert_eq!(chunk2.len(), 1);
1580
1581        // Verify depth structure
1582        let first_depth = &chunk1[0];
1583        assert_eq!(first_depth.bids.len(), 10); // Should have 10 levels
1584        assert_eq!(first_depth.asks.len(), 10);
1585
1586        // Verify some specific prices
1587        assert_eq!(first_depth.bids[0].price, parse_price(49999.0, 1));
1588        assert_eq!(first_depth.asks[0].price, parse_price(50001.0, 1));
1589
1590        // Verify total count
1591        let total_depths: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1592        assert_eq!(total_depths, 3);
1593
1594        // Clean up
1595        std::fs::remove_file(&temp_file).ok();
1596    }
1597
1598    #[rstest]
1599    pub fn test_stream_depth10_from_snapshot25_chunked() {
1600        // Create minimal snapshot25 CSV data (first 10 levels only for testing)
1601        let mut header_parts = vec!["exchange", "symbol", "timestamp", "local_timestamp"];
1602
1603        // Add bid and ask levels (we'll only populate first few for testing)
1604        let mut bid_headers = Vec::new();
1605        let mut ask_headers = Vec::new();
1606        for i in 0..25 {
1607            bid_headers.push(format!("bids[{i}].price"));
1608            bid_headers.push(format!("bids[{i}].amount"));
1609        }
1610        for i in 0..25 {
1611            ask_headers.push(format!("asks[{i}].price"));
1612            ask_headers.push(format!("asks[{i}].amount"));
1613        }
1614
1615        for header in &bid_headers {
1616            header_parts.push(header);
1617        }
1618        for header in &ask_headers {
1619            header_parts.push(header);
1620        }
1621
1622        let header = header_parts.join(",");
1623
1624        // Create a row with data for first 5 levels (rest will be empty)
1625        let mut row1_parts = vec![
1626            "binance".to_string(),
1627            "BTCUSDT".to_string(),
1628            "1640995200000000".to_string(),
1629            "1640995200100000".to_string(),
1630        ];
1631
1632        // Add bid levels (first 5 with data, rest empty)
1633        for i in 0..25 {
1634            if i < 5 {
1635                let bid_price = f64::from(i).mul_add(-0.01, 49999.0);
1636                let bid_amount = 1.0 + f64::from(i);
1637                row1_parts.push(bid_price.to_string());
1638                row1_parts.push(bid_amount.to_string());
1639            } else {
1640                row1_parts.push(String::new());
1641                row1_parts.push(String::new());
1642            }
1643        }
1644
1645        // Add ask levels (first 5 with data, rest empty)
1646        for i in 0..25 {
1647            if i < 5 {
1648                let ask_price = f64::from(i).mul_add(0.01, 50000.0);
1649                let ask_amount = 1.0 + f64::from(i);
1650                row1_parts.push(ask_price.to_string());
1651                row1_parts.push(ask_amount.to_string());
1652            } else {
1653                row1_parts.push(String::new());
1654                row1_parts.push(String::new());
1655            }
1656        }
1657
1658        let csv_data = format!("{}\n{}", header, row1_parts.join(","));
1659
1660        // Write to temporary file
1661        let temp_file = std::env::temp_dir().join("test_stream_depth10_snapshot25.csv");
1662        std::fs::write(&temp_file, &csv_data).unwrap();
1663
1664        // Stream with chunk size of 1
1665        let stream = stream_depth10_from_snapshot25(&temp_file, 1, None, None, None, None).unwrap();
1666        let chunks: Vec<_> = stream.collect();
1667
1668        // Should have 1 chunk with 1 item
1669        assert_eq!(chunks.len(), 1);
1670
1671        let chunk1 = chunks[0].as_ref().unwrap();
1672        assert_eq!(chunk1.len(), 1);
1673
1674        // Verify depth structure
1675        let depth = &chunk1[0];
1676        assert_eq!(depth.bids.len(), 10); // Should have 10 levels
1677        assert_eq!(depth.asks.len(), 10);
1678
1679        // Verify first level has data - check whatever we actually get
1680        let actual_bid_price = depth.bids[0].price;
1681        let actual_ask_price = depth.asks[0].price;
1682        assert!(actual_bid_price.as_f64() > 0.0);
1683        assert!(actual_ask_price.as_f64() > 0.0);
1684
1685        // Clean up
1686        std::fs::remove_file(&temp_file).ok();
1687    }
1688
1689    #[rstest]
1690    pub fn test_stream_error_handling() {
1691        // Test with non-existent file
1692        let non_existent = std::path::Path::new("does_not_exist.csv");
1693
1694        let result = stream_deltas(non_existent, 10, None, None, None, None);
1695        assert!(result.is_err());
1696
1697        let result = stream_quotes(non_existent, 10, None, None, None, None);
1698        assert!(result.is_err());
1699
1700        let result = stream_trades(non_existent, 10, None, None, None, None);
1701        assert!(result.is_err());
1702
1703        let result = stream_depth10_from_snapshot5(non_existent, 10, None, None, None, None);
1704        assert!(result.is_err());
1705
1706        let result = stream_depth10_from_snapshot25(non_existent, 10, None, None, None, None);
1707        assert!(result.is_err());
1708    }
1709
1710    #[rstest]
1711    pub fn test_stream_empty_file() {
1712        // Test with empty CSV file
1713        let temp_file = std::env::temp_dir().join("test_empty.csv");
1714        std::fs::write(&temp_file, "").unwrap();
1715
1716        let stream = stream_deltas(&temp_file, 10, None, None, None, None).unwrap();
1717        let chunks: Vec<_> = stream.collect();
1718        assert_eq!(chunks.len(), 0);
1719
1720        // Clean up
1721        std::fs::remove_file(&temp_file).ok();
1722    }
1723
1724    #[rstest]
1725    pub fn test_stream_precision_consistency() {
1726        // Test that streaming produces same results as bulk loading for precision inference
1727        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1728binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
1729binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
1730binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
1731binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0";
1732
1733        let temp_file = std::env::temp_dir().join("test_precision_consistency.csv");
1734        std::fs::write(&temp_file, csv_data).unwrap();
1735
1736        // Load all at once
1737        let bulk_deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
1738
1739        // Stream in chunks and collect
1740        let stream = stream_deltas(&temp_file, 2, None, None, None, None).unwrap();
1741        let chunks: Vec<_> = stream.collect();
1742        let streamed_deltas: Vec<_> = chunks
1743            .into_iter()
1744            .flat_map(|chunk| chunk.unwrap())
1745            .collect();
1746
1747        // Should have same number of deltas
1748        assert_eq!(bulk_deltas.len(), streamed_deltas.len());
1749
1750        // Compare key properties (precision inference will be different due to chunking)
1751        for (bulk, streamed) in bulk_deltas.iter().zip(streamed_deltas.iter()) {
1752            assert_eq!(bulk.instrument_id, streamed.instrument_id);
1753            assert_eq!(bulk.action, streamed.action);
1754            assert_eq!(bulk.order.side, streamed.order.side);
1755            assert_eq!(bulk.ts_event, streamed.ts_event);
1756            assert_eq!(bulk.ts_init, streamed.ts_init);
1757            // Note: precision may differ between bulk and streaming due to chunk boundaries
1758        }
1759
1760        // Clean up
1761        std::fs::remove_file(&temp_file).ok();
1762    }
1763
1764    #[rstest]
1765    pub fn test_stream_trades_from_local_file() {
1766        let filepath = get_test_data_path("csv/trades_1.csv");
1767        let mut stream = stream_trades(filepath, 1, Some(1), Some(0), None, None).unwrap();
1768
1769        let chunk1 = stream.next().unwrap().unwrap();
1770        assert_eq!(chunk1.len(), 1);
1771        assert_eq!(chunk1[0].price, Price::from("8531.5"));
1772
1773        let chunk2 = stream.next().unwrap().unwrap();
1774        assert_eq!(chunk2.len(), 1);
1775        assert_eq!(chunk2[0].size, Quantity::from("1000"));
1776
1777        assert!(stream.next().is_none());
1778    }
1779
1780    #[rstest]
1781    pub fn test_stream_deltas_from_local_file() {
1782        let filepath = get_test_data_path("csv/deltas_1.csv");
1783        let mut stream = stream_deltas(filepath, 1, Some(1), Some(0), None, None).unwrap();
1784
1785        let chunk1 = stream.next().unwrap().unwrap();
1786        assert_eq!(chunk1.len(), 1);
1787        assert_eq!(chunk1[0].order.price, Price::from("6421.5"));
1788
1789        let chunk2 = stream.next().unwrap().unwrap();
1790        assert_eq!(chunk2.len(), 1);
1791        assert_eq!(chunk2[0].order.size, Quantity::from("10000"));
1792
1793        assert!(stream.next().is_none());
1794    }
1795
1796    #[rstest]
1797    pub fn test_stream_deltas_with_limit() {
1798        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1799binance,BTCUSDT,1640995200000000,1640995200100000,false,bid,50000.0,1.0
1800binance,BTCUSDT,1640995201000000,1640995201100000,false,ask,50001.0,2.0
1801binance,BTCUSDT,1640995202000000,1640995202100000,false,bid,49999.0,1.5
1802binance,BTCUSDT,1640995203000000,1640995203100000,false,ask,50002.0,3.0
1803binance,BTCUSDT,1640995204000000,1640995204100000,false,bid,49998.0,0.5";
1804
1805        let temp_file = std::env::temp_dir().join("test_stream_deltas_limit.csv");
1806        std::fs::write(&temp_file, csv_data).unwrap();
1807
1808        // Test with limit of 3 records
1809        let stream = stream_deltas(&temp_file, 2, Some(4), Some(1), None, Some(3)).unwrap();
1810        let chunks: Vec<_> = stream.collect();
1811
1812        // Should have 2 chunks: [2 items, 1 item] = 3 total (limited)
1813        assert_eq!(chunks.len(), 2);
1814        let chunk1 = chunks[0].as_ref().unwrap();
1815        assert_eq!(chunk1.len(), 2);
1816        let chunk2 = chunks[1].as_ref().unwrap();
1817        assert_eq!(chunk2.len(), 1);
1818
1819        // Total should be exactly 3 records due to limit
1820        let total_deltas: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1821        assert_eq!(total_deltas, 3);
1822
1823        std::fs::remove_file(&temp_file).ok();
1824    }
1825
1826    #[rstest]
1827    pub fn test_stream_quotes_with_limit() {
1828        let csv_data =
1829            "exchange,symbol,timestamp,local_timestamp,ask_price,ask_amount,bid_price,bid_amount
1830binance,BTCUSDT,1640995200000000,1640995200100000,50001.0,1.0,50000.0,1.5
1831binance,BTCUSDT,1640995201000000,1640995201100000,50002.0,2.0,49999.0,2.5
1832binance,BTCUSDT,1640995202000000,1640995202100000,50003.0,1.5,49998.0,3.0
1833binance,BTCUSDT,1640995203000000,1640995203100000,50004.0,3.0,49997.0,3.5";
1834
1835        let temp_file = std::env::temp_dir().join("test_stream_quotes_limit.csv");
1836        std::fs::write(&temp_file, csv_data).unwrap();
1837
1838        // Test with limit of 2 records
1839        let stream = stream_quotes(&temp_file, 2, Some(4), Some(1), None, Some(2)).unwrap();
1840        let chunks: Vec<_> = stream.collect();
1841
1842        // Should have 1 chunk with 2 items (limited)
1843        assert_eq!(chunks.len(), 1);
1844        let chunk1 = chunks[0].as_ref().unwrap();
1845        assert_eq!(chunk1.len(), 2);
1846
1847        // Verify we get exactly 2 records
1848        let total_quotes: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1849        assert_eq!(total_quotes, 2);
1850
1851        std::fs::remove_file(&temp_file).ok();
1852    }
1853
1854    #[rstest]
1855    pub fn test_stream_trades_with_limit() {
1856        let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1857binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1858binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,2.0
1859binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1860binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0
1861binance,BTCUSDT,1640995204000000,1640995204100000,trade5,buy,50000.1234,0.5";
1862
1863        let temp_file = std::env::temp_dir().join("test_stream_trades_limit.csv");
1864        std::fs::write(&temp_file, csv_data).unwrap();
1865
1866        // Test with limit of 3 records
1867        let stream = stream_trades(&temp_file, 2, Some(4), Some(1), None, Some(3)).unwrap();
1868        let chunks: Vec<_> = stream.collect();
1869
1870        // Should have 2 chunks: [2 items, 1 item] = 3 total (limited)
1871        assert_eq!(chunks.len(), 2);
1872        let chunk1 = chunks[0].as_ref().unwrap();
1873        assert_eq!(chunk1.len(), 2);
1874        let chunk2 = chunks[1].as_ref().unwrap();
1875        assert_eq!(chunk2.len(), 1);
1876
1877        // Verify we get exactly 3 records
1878        let total_trades: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1879        assert_eq!(total_trades, 3);
1880
1881        std::fs::remove_file(&temp_file).ok();
1882    }
1883}