nautilus_tardis/csv/
stream.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{io::Read, path::Path};
17
18use csv::{Reader, StringRecord};
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21    data::{DEPTH10_LEN, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
22    enums::{OrderSide, RecordFlag},
23    identifiers::InstrumentId,
24    types::Quantity,
25};
26#[cfg(feature = "python")]
27use nautilus_model::{
28    data::{Data, OrderBookDeltas, OrderBookDeltas_API},
29    python::data::data_to_pycapsule,
30};
31#[cfg(feature = "python")]
32use pyo3::{Py, PyAny, Python};
33
34use crate::{
35    csv::{
36        create_book_order, create_csv_reader, infer_precision, parse_delta_record,
37        parse_derivative_ticker_record, parse_quote_record, parse_trade_record,
38        record::{
39            TardisBookUpdateRecord, TardisOrderBookSnapshot5Record,
40            TardisOrderBookSnapshot25Record, TardisQuoteRecord, TardisTradeRecord,
41        },
42    },
43    parse::{parse_instrument_id, parse_timestamp},
44};
45
46////////////////////////////////////////////////////////////////////////////////
47// OrderBookDelta Streaming
48////////////////////////////////////////////////////////////////////////////////
49
50/// Streaming iterator over CSV records that yields chunks of parsed data.
51struct DeltaStreamIterator {
52    reader: Reader<Box<dyn std::io::Read>>,
53    record: StringRecord,
54    buffer: Vec<OrderBookDelta>,
55    chunk_size: usize,
56    instrument_id: Option<InstrumentId>,
57    price_precision: u8,
58    size_precision: u8,
59    last_ts_event: UnixNanos,
60    last_is_snapshot: bool,
61    limit: Option<usize>,
62    deltas_emitted: usize,
63
64    /// Pending record to process in next iteration (when CLEAR filled the chunk).
65    pending_record: Option<TardisBookUpdateRecord>,
66}
67
68impl DeltaStreamIterator {
69    /// Creates a new [`DeltaStreamIterator`].
70    ///
71    /// # Errors
72    ///
73    /// Returns an error if the file cannot be opened or read.
74    fn new<P: AsRef<Path>>(
75        filepath: P,
76        chunk_size: usize,
77        price_precision: Option<u8>,
78        size_precision: Option<u8>,
79        instrument_id: Option<InstrumentId>,
80        limit: Option<usize>,
81    ) -> anyhow::Result<Self> {
82        let (final_price_precision, final_size_precision) =
83            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
84                // Both precisions provided, use them directly
85                (price_prec, size_prec)
86            } else {
87                // One or both precisions missing, detect only the missing ones
88                let mut reader = create_csv_reader(&filepath)?;
89                let mut record = StringRecord::new();
90                let (detected_price, detected_size) =
91                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
92                (
93                    price_precision.unwrap_or(detected_price),
94                    size_precision.unwrap_or(detected_size),
95                )
96            };
97
98        let reader = create_csv_reader(filepath)?;
99
100        Ok(Self {
101            reader,
102            record: StringRecord::new(),
103            buffer: Vec::with_capacity(chunk_size),
104            chunk_size,
105            instrument_id,
106            price_precision: final_price_precision,
107            size_precision: final_size_precision,
108            last_ts_event: UnixNanos::default(),
109            last_is_snapshot: false,
110            limit,
111            deltas_emitted: 0,
112            pending_record: None,
113        })
114    }
115
116    fn detect_precision_from_sample(
117        reader: &mut Reader<Box<dyn std::io::Read>>,
118        record: &mut StringRecord,
119        sample_size: usize,
120    ) -> anyhow::Result<(u8, u8)> {
121        let mut max_price_precision = 0u8;
122        let mut max_size_precision = 0u8;
123        let mut records_scanned = 0;
124
125        while records_scanned < sample_size {
126            match reader.read_record(record) {
127                Ok(true) => {
128                    if let Ok(data) = record.deserialize::<TardisBookUpdateRecord>(None) {
129                        max_price_precision = max_price_precision.max(infer_precision(data.price));
130                        max_size_precision = max_size_precision.max(infer_precision(data.amount));
131                        records_scanned += 1;
132                    }
133                }
134                Ok(false) => break,             // End of file
135                Err(_) => records_scanned += 1, // Skip malformed records
136            }
137        }
138
139        Ok((max_price_precision, max_size_precision))
140    }
141}
142
143impl Iterator for DeltaStreamIterator {
144    type Item = anyhow::Result<Vec<OrderBookDelta>>;
145
146    fn next(&mut self) -> Option<Self::Item> {
147        if let Some(limit) = self.limit
148            && self.deltas_emitted >= limit
149        {
150            return None;
151        }
152
153        self.buffer.clear();
154
155        loop {
156            if self.buffer.len() >= self.chunk_size {
157                break;
158            }
159
160            if let Some(limit) = self.limit
161                && self.deltas_emitted >= limit
162            {
163                break;
164            }
165
166            // Use pending record from previous iteration, or read new
167            let data = if let Some(pending) = self.pending_record.take() {
168                pending
169            } else {
170                match self.reader.read_record(&mut self.record) {
171                    Ok(true) => match self.record.deserialize::<TardisBookUpdateRecord>(None) {
172                        Ok(data) => data,
173                        Err(e) => {
174                            return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
175                        }
176                    },
177                    Ok(false) => {
178                        if self.buffer.is_empty() {
179                            return None;
180                        }
181                        if let Some(last_delta) = self.buffer.last_mut() {
182                            last_delta.flags = RecordFlag::F_LAST.value();
183                        }
184                        return Some(Ok(self.buffer.clone()));
185                    }
186                    Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
187                }
188            };
189
190            // Insert CLEAR on snapshot boundary to reset order book state
191            if data.is_snapshot && !self.last_is_snapshot {
192                let clear_instrument_id = self
193                    .instrument_id
194                    .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
195                let ts_event = parse_timestamp(data.timestamp);
196                let ts_init = parse_timestamp(data.local_timestamp);
197
198                if self.last_ts_event != ts_event
199                    && let Some(last_delta) = self.buffer.last_mut()
200                {
201                    last_delta.flags = RecordFlag::F_LAST.value();
202                }
203                self.last_ts_event = ts_event;
204
205                let clear_delta = OrderBookDelta::clear(clear_instrument_id, 0, ts_event, ts_init);
206                self.buffer.push(clear_delta);
207                self.deltas_emitted += 1;
208
209                // Defer real delta to next chunk if constraints reached
210                if self.buffer.len() >= self.chunk_size
211                    || self.limit.is_some_and(|l| self.deltas_emitted >= l)
212                {
213                    self.last_is_snapshot = data.is_snapshot;
214                    self.pending_record = Some(data);
215                    break;
216                }
217            }
218            self.last_is_snapshot = data.is_snapshot;
219
220            let delta = match parse_delta_record(
221                &data,
222                self.price_precision,
223                self.size_precision,
224                self.instrument_id,
225            ) {
226                Ok(d) => d,
227                Err(e) => {
228                    tracing::warn!("Skipping invalid delta record: {e}");
229                    continue;
230                }
231            };
232
233            if self.last_ts_event != delta.ts_event
234                && let Some(last_delta) = self.buffer.last_mut()
235            {
236                last_delta.flags = RecordFlag::F_LAST.value();
237            }
238
239            self.last_ts_event = delta.ts_event;
240
241            self.buffer.push(delta);
242            self.deltas_emitted += 1;
243        }
244
245        if self.buffer.is_empty() {
246            None
247        } else {
248            // Only set F_LAST when limit reached (stream ending), not on chunk
249            // boundary where more same-timestamp deltas may follow
250            if let Some(limit) = self.limit
251                && self.deltas_emitted >= limit
252                && let Some(last_delta) = self.buffer.last_mut()
253            {
254                last_delta.flags = RecordFlag::F_LAST.value();
255            }
256            Some(Ok(self.buffer.clone()))
257        }
258    }
259}
260
261/// Streams [`OrderBookDelta`]s from a Tardis format CSV at the given `filepath`,
262/// yielding chunks of the specified size.
263///
264/// # Precision Inference Warning
265///
266/// When using streaming with precision inference (not providing explicit precisions),
267/// the inferred precision may differ from bulk loading the entire file. This is because
268/// precision inference works within chunk boundaries, and different chunks may contain
269/// values with different precision requirements. For deterministic precision behavior,
270/// provide explicit `price_precision` and `size_precision` parameters.
271///
272/// # Errors
273///
274/// Returns an error if the file cannot be opened, read, or parsed as CSV.
275pub fn stream_deltas<P: AsRef<Path>>(
276    filepath: P,
277    chunk_size: usize,
278    price_precision: Option<u8>,
279    size_precision: Option<u8>,
280    instrument_id: Option<InstrumentId>,
281    limit: Option<usize>,
282) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDelta>>>> {
283    DeltaStreamIterator::new(
284        filepath,
285        chunk_size,
286        price_precision,
287        size_precision,
288        instrument_id,
289        limit,
290    )
291}
292
293////////////////////////////////////////////////////////////////////////////////
294// Vec<Py<PyAny>> (OrderBookDeltas as PyCapsule) Streaming
295////////////////////////////////////////////////////////////////////////////////
296
297#[cfg(feature = "python")]
298/// Streaming iterator over CSV records that yields chunks of parsed data.
299struct BatchedDeltasStreamIterator {
300    reader: Reader<Box<dyn std::io::Read>>,
301    record: StringRecord,
302    buffer: Vec<Py<PyAny>>,
303    current_batch: Vec<OrderBookDelta>,
304    pending_batches: Vec<Vec<OrderBookDelta>>,
305    chunk_size: usize,
306    instrument_id: InstrumentId,
307    price_precision: u8,
308    size_precision: u8,
309    last_ts_event: UnixNanos,
310    limit: Option<usize>,
311    records_processed: usize,
312}
313
314#[cfg(feature = "python")]
315impl BatchedDeltasStreamIterator {
316    /// Creates a new [`DeltaStreamIterator`].
317    ///
318    /// # Errors
319    ///
320    /// Returns an error if the file cannot be opened or read.
321    fn new<P: AsRef<Path>>(
322        filepath: P,
323        chunk_size: usize,
324        price_precision: Option<u8>,
325        size_precision: Option<u8>,
326        instrument_id: Option<InstrumentId>,
327        limit: Option<usize>,
328    ) -> anyhow::Result<Self> {
329        let mut reader = create_csv_reader(&filepath)?;
330        let mut record = StringRecord::new();
331
332        let first_record = if reader.read_record(&mut record)? {
333            record.deserialize::<TardisBookUpdateRecord>(None)?
334        } else {
335            anyhow::bail!("CSV file is empty");
336        };
337
338        let final_instrument_id = instrument_id
339            .unwrap_or_else(|| parse_instrument_id(&first_record.exchange, first_record.symbol));
340
341        let (final_price_precision, final_size_precision) =
342            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
343                // Both precisions provided, use them directly
344                (price_prec, size_prec)
345            } else {
346                // One or both precisions missing, detect from sample including first record
347                let (detected_price, detected_size) =
348                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
349                (
350                    price_precision.unwrap_or(detected_price),
351                    size_precision.unwrap_or(detected_size),
352                )
353            };
354
355        let reader = create_csv_reader(filepath)?;
356
357        Ok(Self {
358            reader,
359            record: StringRecord::new(),
360            buffer: Vec::with_capacity(chunk_size),
361            current_batch: Vec::new(),
362            pending_batches: Vec::with_capacity(chunk_size),
363            chunk_size,
364            instrument_id: final_instrument_id,
365            price_precision: final_price_precision,
366            size_precision: final_size_precision,
367            last_ts_event: UnixNanos::default(),
368            limit,
369            records_processed: 0,
370        })
371    }
372
373    fn detect_precision_from_sample(
374        reader: &mut Reader<Box<dyn std::io::Read>>,
375        record: &mut StringRecord,
376        sample_size: usize,
377    ) -> anyhow::Result<(u8, u8)> {
378        let mut max_price_precision = 0u8;
379        let mut max_size_precision = 0u8;
380        let mut records_scanned = 0;
381
382        while records_scanned < sample_size {
383            match reader.read_record(record) {
384                Ok(true) => {
385                    if let Ok(data) = record.deserialize::<TardisBookUpdateRecord>(None) {
386                        max_price_precision = max_price_precision.max(infer_precision(data.price));
387                        max_size_precision = max_size_precision.max(infer_precision(data.amount));
388                        records_scanned += 1;
389                    }
390                }
391                Ok(false) => break,             // End of file
392                Err(_) => records_scanned += 1, // Skip malformed records
393            }
394        }
395
396        Ok((max_price_precision, max_size_precision))
397    }
398}
399
400#[cfg(feature = "python")]
401impl Iterator for BatchedDeltasStreamIterator {
402    type Item = anyhow::Result<Vec<Py<PyAny>>>;
403
404    fn next(&mut self) -> Option<Self::Item> {
405        if let Some(limit) = self.limit
406            && self.records_processed >= limit
407        {
408            return None;
409        }
410
411        self.buffer.clear();
412        let mut batches_created = 0;
413
414        while batches_created < self.chunk_size {
415            match self.reader.read_record(&mut self.record) {
416                Ok(true) => {
417                    let delta = match self.record.deserialize::<TardisBookUpdateRecord>(None) {
418                        Ok(data) => match parse_delta_record(
419                            &data,
420                            self.price_precision,
421                            self.size_precision,
422                            Some(self.instrument_id),
423                        ) {
424                            Ok(d) => d,
425                            Err(e) => {
426                                tracing::warn!("Skipping invalid delta record: {e}");
427                                continue;
428                            }
429                        },
430                        Err(e) => {
431                            return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
432                        }
433                    };
434
435                    if self.last_ts_event != delta.ts_event && !self.current_batch.is_empty() {
436                        // Set F_LAST on the last delta of the completed batch
437                        if let Some(last_delta) = self.current_batch.last_mut() {
438                            last_delta.flags = RecordFlag::F_LAST.value();
439                        }
440                        self.pending_batches
441                            .push(std::mem::take(&mut self.current_batch));
442                        batches_created += 1;
443                    }
444
445                    self.last_ts_event = delta.ts_event;
446                    self.current_batch.push(delta);
447                    self.records_processed += 1;
448
449                    if let Some(limit) = self.limit
450                        && self.records_processed >= limit
451                    {
452                        break;
453                    }
454                }
455                Ok(false) => {
456                    // End of file
457                    break;
458                }
459                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
460            }
461        }
462
463        if !self.current_batch.is_empty() && batches_created < self.chunk_size {
464            // Ensure the last delta of the last batch has F_LAST set
465            if let Some(last_delta) = self.current_batch.last_mut() {
466                last_delta.flags = RecordFlag::F_LAST.value();
467            }
468            self.pending_batches
469                .push(std::mem::take(&mut self.current_batch));
470        }
471
472        if self.pending_batches.is_empty() {
473            None
474        } else {
475            // Create all capsules in a single GIL acquisition
476            Python::attach(|py| {
477                for batch in self.pending_batches.drain(..) {
478                    let deltas = OrderBookDeltas::new(self.instrument_id, batch);
479                    let deltas = OrderBookDeltas_API::new(deltas);
480                    let capsule = data_to_pycapsule(py, Data::Deltas(deltas));
481                    self.buffer.push(capsule);
482                }
483            });
484            Some(Ok(std::mem::take(&mut self.buffer)))
485        }
486    }
487}
488
489#[cfg(feature = "python")]
490/// Streams [`Vec<Py<PyAny>>`]s (`PyCapsule`) from a Tardis format CSV at the given `filepath`,
491/// yielding chunks of the specified size.
492///
493/// # Errors
494///
495/// Returns an error if the file cannot be opened, read, or parsed as CSV.
496pub fn stream_batched_deltas<P: AsRef<Path>>(
497    filepath: P,
498    chunk_size: usize,
499    price_precision: Option<u8>,
500    size_precision: Option<u8>,
501    instrument_id: Option<InstrumentId>,
502    limit: Option<usize>,
503) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<Py<PyAny>>>>> {
504    BatchedDeltasStreamIterator::new(
505        filepath,
506        chunk_size,
507        price_precision,
508        size_precision,
509        instrument_id,
510        limit,
511    )
512}
513
514////////////////////////////////////////////////////////////////////////////////
515// Quote Streaming
516////////////////////////////////////////////////////////////////////////////////
517
518/// An iterator for streaming [`QuoteTick`]s from a Tardis CSV file in chunks.
519struct QuoteStreamIterator {
520    reader: Reader<Box<dyn Read>>,
521    record: StringRecord,
522    buffer: Vec<QuoteTick>,
523    chunk_size: usize,
524    instrument_id: Option<InstrumentId>,
525    price_precision: u8,
526    size_precision: u8,
527    limit: Option<usize>,
528    records_processed: usize,
529}
530
531impl QuoteStreamIterator {
532    /// Creates a new [`QuoteStreamIterator`].
533    ///
534    /// # Errors
535    ///
536    /// Returns an error if the file cannot be opened or read.
537    pub fn new<P: AsRef<Path>>(
538        filepath: P,
539        chunk_size: usize,
540        price_precision: Option<u8>,
541        size_precision: Option<u8>,
542        instrument_id: Option<InstrumentId>,
543        limit: Option<usize>,
544    ) -> anyhow::Result<Self> {
545        let (final_price_precision, final_size_precision) =
546            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
547                // Both precisions provided, use them directly
548                (price_prec, size_prec)
549            } else {
550                // One or both precisions missing, detect only the missing ones
551                let mut reader = create_csv_reader(&filepath)?;
552                let mut record = StringRecord::new();
553                let (detected_price, detected_size) =
554                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
555                (
556                    price_precision.unwrap_or(detected_price),
557                    size_precision.unwrap_or(detected_size),
558                )
559            };
560
561        let reader = create_csv_reader(filepath)?;
562
563        Ok(Self {
564            reader,
565            record: StringRecord::new(),
566            buffer: Vec::with_capacity(chunk_size),
567            chunk_size,
568            instrument_id,
569            price_precision: final_price_precision,
570            size_precision: final_size_precision,
571            limit,
572            records_processed: 0,
573        })
574    }
575
576    fn detect_precision_from_sample(
577        reader: &mut Reader<Box<dyn std::io::Read>>,
578        record: &mut StringRecord,
579        sample_size: usize,
580    ) -> anyhow::Result<(u8, u8)> {
581        let mut max_price_precision = 2u8;
582        let mut max_size_precision = 0u8;
583        let mut records_scanned = 0;
584
585        while records_scanned < sample_size {
586            match reader.read_record(record) {
587                Ok(true) => {
588                    if let Ok(data) = record.deserialize::<TardisQuoteRecord>(None) {
589                        if let Some(bid_price_val) = data.bid_price {
590                            max_price_precision =
591                                max_price_precision.max(infer_precision(bid_price_val));
592                        }
593                        if let Some(ask_price_val) = data.ask_price {
594                            max_price_precision =
595                                max_price_precision.max(infer_precision(ask_price_val));
596                        }
597                        if let Some(bid_amount_val) = data.bid_amount {
598                            max_size_precision =
599                                max_size_precision.max(infer_precision(bid_amount_val));
600                        }
601                        if let Some(ask_amount_val) = data.ask_amount {
602                            max_size_precision =
603                                max_size_precision.max(infer_precision(ask_amount_val));
604                        }
605                        records_scanned += 1;
606                    }
607                }
608                Ok(false) => break,             // End of file
609                Err(_) => records_scanned += 1, // Skip malformed records
610            }
611        }
612
613        Ok((max_price_precision, max_size_precision))
614    }
615}
616
617impl Iterator for QuoteStreamIterator {
618    type Item = anyhow::Result<Vec<QuoteTick>>;
619
620    fn next(&mut self) -> Option<Self::Item> {
621        if let Some(limit) = self.limit
622            && self.records_processed >= limit
623        {
624            return None;
625        }
626
627        self.buffer.clear();
628        let mut records_read = 0;
629
630        while records_read < self.chunk_size {
631            match self.reader.read_record(&mut self.record) {
632                Ok(true) => match self.record.deserialize::<TardisQuoteRecord>(None) {
633                    Ok(data) => {
634                        let quote = parse_quote_record(
635                            &data,
636                            self.price_precision,
637                            self.size_precision,
638                            self.instrument_id,
639                        );
640
641                        self.buffer.push(quote);
642                        records_read += 1;
643                        self.records_processed += 1;
644
645                        if let Some(limit) = self.limit
646                            && self.records_processed >= limit
647                        {
648                            break;
649                        }
650                    }
651                    Err(e) => {
652                        return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
653                    }
654                },
655                Ok(false) => {
656                    if self.buffer.is_empty() {
657                        return None;
658                    }
659                    return Some(Ok(self.buffer.clone()));
660                }
661                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
662            }
663        }
664
665        if self.buffer.is_empty() {
666            None
667        } else {
668            Some(Ok(self.buffer.clone()))
669        }
670    }
671}
672
673/// Streams [`QuoteTick`]s from a Tardis format CSV at the given `filepath`,
674/// yielding chunks of the specified size.
675///
676/// # Precision Inference Warning
677///
678/// When using streaming with precision inference (not providing explicit precisions),
679/// the inferred precision may differ from bulk loading the entire file. This is because
680/// precision inference works within chunk boundaries, and different chunks may contain
681/// values with different precision requirements. For deterministic precision behavior,
682/// provide explicit `price_precision` and `size_precision` parameters.
683///
684/// # Errors
685///
686/// Returns an error if the file cannot be opened, read, or parsed as CSV.
687pub fn stream_quotes<P: AsRef<Path>>(
688    filepath: P,
689    chunk_size: usize,
690    price_precision: Option<u8>,
691    size_precision: Option<u8>,
692    instrument_id: Option<InstrumentId>,
693    limit: Option<usize>,
694) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<QuoteTick>>>> {
695    QuoteStreamIterator::new(
696        filepath,
697        chunk_size,
698        price_precision,
699        size_precision,
700        instrument_id,
701        limit,
702    )
703}
704
705////////////////////////////////////////////////////////////////////////////////
706// Trade Streaming
707////////////////////////////////////////////////////////////////////////////////
708
709/// An iterator for streaming [`TradeTick`]s from a Tardis CSV file in chunks.
710struct TradeStreamIterator {
711    reader: Reader<Box<dyn Read>>,
712    record: StringRecord,
713    buffer: Vec<TradeTick>,
714    chunk_size: usize,
715    instrument_id: Option<InstrumentId>,
716    price_precision: u8,
717    size_precision: u8,
718    limit: Option<usize>,
719    records_processed: usize,
720}
721
722impl TradeStreamIterator {
723    /// Creates a new [`TradeStreamIterator`].
724    ///
725    /// # Errors
726    ///
727    /// Returns an error if the file cannot be opened or read.
728    pub fn new<P: AsRef<Path>>(
729        filepath: P,
730        chunk_size: usize,
731        price_precision: Option<u8>,
732        size_precision: Option<u8>,
733        instrument_id: Option<InstrumentId>,
734        limit: Option<usize>,
735    ) -> anyhow::Result<Self> {
736        let (final_price_precision, final_size_precision) =
737            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
738                // Both precisions provided, use them directly
739                (price_prec, size_prec)
740            } else {
741                // One or both precisions missing, detect only the missing ones
742                let mut reader = create_csv_reader(&filepath)?;
743                let mut record = StringRecord::new();
744                let (detected_price, detected_size) =
745                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
746                (
747                    price_precision.unwrap_or(detected_price),
748                    size_precision.unwrap_or(detected_size),
749                )
750            };
751
752        let reader = create_csv_reader(filepath)?;
753
754        Ok(Self {
755            reader,
756            record: StringRecord::new(),
757            buffer: Vec::with_capacity(chunk_size),
758            chunk_size,
759            instrument_id,
760            price_precision: final_price_precision,
761            size_precision: final_size_precision,
762            limit,
763            records_processed: 0,
764        })
765    }
766
767    fn detect_precision_from_sample(
768        reader: &mut Reader<Box<dyn std::io::Read>>,
769        record: &mut StringRecord,
770        sample_size: usize,
771    ) -> anyhow::Result<(u8, u8)> {
772        let mut max_price_precision = 2u8;
773        let mut max_size_precision = 0u8;
774        let mut records_scanned = 0;
775
776        while records_scanned < sample_size {
777            match reader.read_record(record) {
778                Ok(true) => {
779                    if let Ok(data) = record.deserialize::<TardisTradeRecord>(None) {
780                        max_price_precision = max_price_precision.max(infer_precision(data.price));
781                        max_size_precision = max_size_precision.max(infer_precision(data.amount));
782                        records_scanned += 1;
783                    }
784                }
785                Ok(false) => break,             // End of file
786                Err(_) => records_scanned += 1, // Skip malformed records
787            }
788        }
789
790        Ok((max_price_precision, max_size_precision))
791    }
792}
793
794impl Iterator for TradeStreamIterator {
795    type Item = anyhow::Result<Vec<TradeTick>>;
796
797    fn next(&mut self) -> Option<Self::Item> {
798        if let Some(limit) = self.limit
799            && self.records_processed >= limit
800        {
801            return None;
802        }
803
804        self.buffer.clear();
805        let mut records_read = 0;
806
807        while records_read < self.chunk_size {
808            match self.reader.read_record(&mut self.record) {
809                Ok(true) => match self.record.deserialize::<TardisTradeRecord>(None) {
810                    Ok(data) => {
811                        let size = Quantity::new(data.amount, self.size_precision);
812
813                        if size.is_positive() {
814                            let trade = parse_trade_record(
815                                &data,
816                                size,
817                                self.price_precision,
818                                self.instrument_id,
819                            );
820
821                            self.buffer.push(trade);
822                            records_read += 1;
823                            self.records_processed += 1;
824
825                            if let Some(limit) = self.limit
826                                && self.records_processed >= limit
827                            {
828                                break;
829                            }
830                        } else {
831                            log::warn!("Skipping zero-sized trade: {data:?}");
832                        }
833                    }
834                    Err(e) => {
835                        return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
836                    }
837                },
838                Ok(false) => {
839                    if self.buffer.is_empty() {
840                        return None;
841                    }
842                    return Some(Ok(self.buffer.clone()));
843                }
844                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
845            }
846        }
847
848        if self.buffer.is_empty() {
849            None
850        } else {
851            Some(Ok(self.buffer.clone()))
852        }
853    }
854}
855
856/// Streams [`TradeTick`]s from a Tardis format CSV at the given `filepath`,
857/// yielding chunks of the specified size.
858///
859/// # Precision Inference Warning
860///
861/// When using streaming with precision inference (not providing explicit precisions),
862/// the inferred precision may differ from bulk loading the entire file. This is because
863/// precision inference works within chunk boundaries, and different chunks may contain
864/// values with different precision requirements. For deterministic precision behavior,
865/// provide explicit `price_precision` and `size_precision` parameters.
866///
867/// # Errors
868///
869/// Returns an error if the file cannot be opened, read, or parsed as CSV.
870pub fn stream_trades<P: AsRef<Path>>(
871    filepath: P,
872    chunk_size: usize,
873    price_precision: Option<u8>,
874    size_precision: Option<u8>,
875    instrument_id: Option<InstrumentId>,
876    limit: Option<usize>,
877) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<TradeTick>>>> {
878    TradeStreamIterator::new(
879        filepath,
880        chunk_size,
881        price_precision,
882        size_precision,
883        instrument_id,
884        limit,
885    )
886}
887
888////////////////////////////////////////////////////////////////////////////////
889// Depth10 Streaming
890////////////////////////////////////////////////////////////////////////////////
891
892/// An iterator for streaming [`OrderBookDepth10`]s from a Tardis CSV file in chunks.
893struct Depth10StreamIterator {
894    reader: Reader<Box<dyn Read>>,
895    record: StringRecord,
896    buffer: Vec<OrderBookDepth10>,
897    chunk_size: usize,
898    levels: u8,
899    instrument_id: Option<InstrumentId>,
900    price_precision: u8,
901    size_precision: u8,
902    limit: Option<usize>,
903    records_processed: usize,
904}
905
906impl Depth10StreamIterator {
907    /// Creates a new [`Depth10StreamIterator`].
908    ///
909    /// # Errors
910    ///
911    /// Returns an error if the file cannot be opened or read, or if `levels` is not 5 or 25.
912    pub fn new<P: AsRef<Path>>(
913        filepath: P,
914        chunk_size: usize,
915        levels: u8,
916        price_precision: Option<u8>,
917        size_precision: Option<u8>,
918        instrument_id: Option<InstrumentId>,
919        limit: Option<usize>,
920    ) -> anyhow::Result<Self> {
921        anyhow::ensure!(
922            levels == 5 || levels == 25,
923            "Invalid levels: {levels}. Must be 5 or 25."
924        );
925
926        let (final_price_precision, final_size_precision) =
927            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
928                // Both precisions provided, use them directly
929                (price_prec, size_prec)
930            } else {
931                // One or both precisions missing, detect only the missing ones
932                let mut reader = create_csv_reader(&filepath)?;
933                let mut record = StringRecord::new();
934                let (detected_price, detected_size) =
935                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
936                (
937                    price_precision.unwrap_or(detected_price),
938                    size_precision.unwrap_or(detected_size),
939                )
940            };
941
942        let reader = create_csv_reader(filepath)?;
943
944        Ok(Self {
945            reader,
946            record: StringRecord::new(),
947            buffer: Vec::with_capacity(chunk_size),
948            chunk_size,
949            levels,
950            instrument_id,
951            price_precision: final_price_precision,
952            size_precision: final_size_precision,
953            limit,
954            records_processed: 0,
955        })
956    }
957
958    fn process_snapshot5(&mut self, data: TardisOrderBookSnapshot5Record) -> OrderBookDepth10 {
959        let instrument_id = self
960            .instrument_id
961            .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
962
963        let mut bids = [NULL_ORDER; DEPTH10_LEN];
964        let mut asks = [NULL_ORDER; DEPTH10_LEN];
965        let mut bid_counts = [0_u32; DEPTH10_LEN];
966        let mut ask_counts = [0_u32; DEPTH10_LEN];
967
968        // Process first 5 levels from snapshot5 data
969        for i in 0..5 {
970            let (bid_price, bid_amount) = match i {
971                0 => (data.bids_0_price, data.bids_0_amount),
972                1 => (data.bids_1_price, data.bids_1_amount),
973                2 => (data.bids_2_price, data.bids_2_amount),
974                3 => (data.bids_3_price, data.bids_3_amount),
975                4 => (data.bids_4_price, data.bids_4_amount),
976                _ => unreachable!(),
977            };
978
979            let (ask_price, ask_amount) = match i {
980                0 => (data.asks_0_price, data.asks_0_amount),
981                1 => (data.asks_1_price, data.asks_1_amount),
982                2 => (data.asks_2_price, data.asks_2_amount),
983                3 => (data.asks_3_price, data.asks_3_amount),
984                4 => (data.asks_4_price, data.asks_4_amount),
985                _ => unreachable!(),
986            };
987
988            let (bid_order, bid_count) = create_book_order(
989                OrderSide::Buy,
990                bid_price,
991                bid_amount,
992                self.price_precision,
993                self.size_precision,
994            );
995            bids[i] = bid_order;
996            bid_counts[i] = bid_count;
997
998            let (ask_order, ask_count) = create_book_order(
999                OrderSide::Sell,
1000                ask_price,
1001                ask_amount,
1002                self.price_precision,
1003                self.size_precision,
1004            );
1005            asks[i] = ask_order;
1006            ask_counts[i] = ask_count;
1007        }
1008
1009        let flags = RecordFlag::F_SNAPSHOT.value();
1010        let sequence = 0;
1011        let ts_event = parse_timestamp(data.timestamp);
1012        let ts_init = parse_timestamp(data.local_timestamp);
1013
1014        OrderBookDepth10::new(
1015            instrument_id,
1016            bids,
1017            asks,
1018            bid_counts,
1019            ask_counts,
1020            flags,
1021            sequence,
1022            ts_event,
1023            ts_init,
1024        )
1025    }
1026
1027    fn process_snapshot25(&mut self, data: TardisOrderBookSnapshot25Record) -> OrderBookDepth10 {
1028        let instrument_id = self
1029            .instrument_id
1030            .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
1031
1032        let mut bids = [NULL_ORDER; DEPTH10_LEN];
1033        let mut asks = [NULL_ORDER; DEPTH10_LEN];
1034        let mut bid_counts = [0_u32; DEPTH10_LEN];
1035        let mut ask_counts = [0_u32; DEPTH10_LEN];
1036
1037        // Process first 10 levels from snapshot25 data
1038        for i in 0..DEPTH10_LEN {
1039            let (bid_price, bid_amount) = match i {
1040                0 => (data.bids_0_price, data.bids_0_amount),
1041                1 => (data.bids_1_price, data.bids_1_amount),
1042                2 => (data.bids_2_price, data.bids_2_amount),
1043                3 => (data.bids_3_price, data.bids_3_amount),
1044                4 => (data.bids_4_price, data.bids_4_amount),
1045                5 => (data.bids_5_price, data.bids_5_amount),
1046                6 => (data.bids_6_price, data.bids_6_amount),
1047                7 => (data.bids_7_price, data.bids_7_amount),
1048                8 => (data.bids_8_price, data.bids_8_amount),
1049                9 => (data.bids_9_price, data.bids_9_amount),
1050                _ => unreachable!(),
1051            };
1052
1053            let (ask_price, ask_amount) = match i {
1054                0 => (data.asks_0_price, data.asks_0_amount),
1055                1 => (data.asks_1_price, data.asks_1_amount),
1056                2 => (data.asks_2_price, data.asks_2_amount),
1057                3 => (data.asks_3_price, data.asks_3_amount),
1058                4 => (data.asks_4_price, data.asks_4_amount),
1059                5 => (data.asks_5_price, data.asks_5_amount),
1060                6 => (data.asks_6_price, data.asks_6_amount),
1061                7 => (data.asks_7_price, data.asks_7_amount),
1062                8 => (data.asks_8_price, data.asks_8_amount),
1063                9 => (data.asks_9_price, data.asks_9_amount),
1064                _ => unreachable!(),
1065            };
1066
1067            let (bid_order, bid_count) = create_book_order(
1068                OrderSide::Buy,
1069                bid_price,
1070                bid_amount,
1071                self.price_precision,
1072                self.size_precision,
1073            );
1074            bids[i] = bid_order;
1075            bid_counts[i] = bid_count;
1076
1077            let (ask_order, ask_count) = create_book_order(
1078                OrderSide::Sell,
1079                ask_price,
1080                ask_amount,
1081                self.price_precision,
1082                self.size_precision,
1083            );
1084            asks[i] = ask_order;
1085            ask_counts[i] = ask_count;
1086        }
1087
1088        let flags = RecordFlag::F_SNAPSHOT.value();
1089        let sequence = 0;
1090        let ts_event = parse_timestamp(data.timestamp);
1091        let ts_init = parse_timestamp(data.local_timestamp);
1092
1093        OrderBookDepth10::new(
1094            instrument_id,
1095            bids,
1096            asks,
1097            bid_counts,
1098            ask_counts,
1099            flags,
1100            sequence,
1101            ts_event,
1102            ts_init,
1103        )
1104    }
1105
1106    fn detect_precision_from_sample(
1107        reader: &mut Reader<Box<dyn std::io::Read>>,
1108        record: &mut StringRecord,
1109        sample_size: usize,
1110    ) -> anyhow::Result<(u8, u8)> {
1111        let mut max_price_precision = 2u8;
1112        let mut max_size_precision = 0u8;
1113        let mut records_scanned = 0;
1114
1115        while records_scanned < sample_size {
1116            match reader.read_record(record) {
1117                Ok(true) => {
1118                    // Try to deserialize as snapshot5 record first
1119                    if let Ok(data) = record.deserialize::<TardisOrderBookSnapshot5Record>(None) {
1120                        if let Some(bid_price) = data.bids_0_price {
1121                            max_price_precision =
1122                                max_price_precision.max(infer_precision(bid_price));
1123                        }
1124                        if let Some(ask_price) = data.asks_0_price {
1125                            max_price_precision =
1126                                max_price_precision.max(infer_precision(ask_price));
1127                        }
1128                        if let Some(bid_amount) = data.bids_0_amount {
1129                            max_size_precision =
1130                                max_size_precision.max(infer_precision(bid_amount));
1131                        }
1132                        if let Some(ask_amount) = data.asks_0_amount {
1133                            max_size_precision =
1134                                max_size_precision.max(infer_precision(ask_amount));
1135                        }
1136                        records_scanned += 1;
1137                    } else if let Ok(data) =
1138                        record.deserialize::<TardisOrderBookSnapshot25Record>(None)
1139                    {
1140                        if let Some(bid_price) = data.bids_0_price {
1141                            max_price_precision =
1142                                max_price_precision.max(infer_precision(bid_price));
1143                        }
1144                        if let Some(ask_price) = data.asks_0_price {
1145                            max_price_precision =
1146                                max_price_precision.max(infer_precision(ask_price));
1147                        }
1148                        if let Some(bid_amount) = data.bids_0_amount {
1149                            max_size_precision =
1150                                max_size_precision.max(infer_precision(bid_amount));
1151                        }
1152                        if let Some(ask_amount) = data.asks_0_amount {
1153                            max_size_precision =
1154                                max_size_precision.max(infer_precision(ask_amount));
1155                        }
1156                        records_scanned += 1;
1157                    }
1158                }
1159                Ok(false) => break,             // End of file
1160                Err(_) => records_scanned += 1, // Skip malformed records
1161            }
1162        }
1163
1164        Ok((max_price_precision, max_size_precision))
1165    }
1166}
1167
1168impl Iterator for Depth10StreamIterator {
1169    type Item = anyhow::Result<Vec<OrderBookDepth10>>;
1170
1171    fn next(&mut self) -> Option<Self::Item> {
1172        if let Some(limit) = self.limit
1173            && self.records_processed >= limit
1174        {
1175            return None;
1176        }
1177
1178        if !self.buffer.is_empty() {
1179            let chunk = self.buffer.split_off(0);
1180            return Some(Ok(chunk));
1181        }
1182
1183        self.buffer.clear();
1184        let mut records_read = 0;
1185
1186        while records_read < self.chunk_size {
1187            match self.reader.read_record(&mut self.record) {
1188                Ok(true) => {
1189                    let result = match self.levels {
1190                        5 => self
1191                            .record
1192                            .deserialize::<TardisOrderBookSnapshot5Record>(None)
1193                            .map(|data| self.process_snapshot5(data)),
1194                        25 => self
1195                            .record
1196                            .deserialize::<TardisOrderBookSnapshot25Record>(None)
1197                            .map(|data| self.process_snapshot25(data)),
1198                        _ => return Some(Err(anyhow::anyhow!("Invalid levels: {}", self.levels))),
1199                    };
1200
1201                    match result {
1202                        Ok(depth) => {
1203                            self.buffer.push(depth);
1204                            records_read += 1;
1205                            self.records_processed += 1;
1206
1207                            if let Some(limit) = self.limit
1208                                && self.records_processed >= limit
1209                            {
1210                                break;
1211                            }
1212                        }
1213                        Err(e) => {
1214                            return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
1215                        }
1216                    }
1217                }
1218                Ok(false) => {
1219                    if self.buffer.is_empty() {
1220                        return None;
1221                    }
1222                    let chunk = self.buffer.split_off(0);
1223                    return Some(Ok(chunk));
1224                }
1225                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
1226            }
1227        }
1228
1229        if self.buffer.is_empty() {
1230            None
1231        } else {
1232            let chunk = self.buffer.split_off(0);
1233            Some(Ok(chunk))
1234        }
1235    }
1236}
1237
1238/// Streams [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
1239/// yielding chunks of the specified size.
1240///
1241/// # Precision Inference Warning
1242///
1243/// When using streaming with precision inference (not providing explicit precisions),
1244/// the inferred precision may differ from bulk loading the entire file. This is because
1245/// precision inference works within chunk boundaries, and different chunks may contain
1246/// values with different precision requirements. For deterministic precision behavior,
1247/// provide explicit `price_precision` and `size_precision` parameters.
1248///
1249/// # Errors
1250///
1251/// Returns an error if the file cannot be opened, read, or parsed as CSV.
1252pub fn stream_depth10_from_snapshot5<P: AsRef<Path>>(
1253    filepath: P,
1254    chunk_size: usize,
1255    price_precision: Option<u8>,
1256    size_precision: Option<u8>,
1257    instrument_id: Option<InstrumentId>,
1258    limit: Option<usize>,
1259) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDepth10>>>> {
1260    Depth10StreamIterator::new(
1261        filepath,
1262        chunk_size,
1263        5,
1264        price_precision,
1265        size_precision,
1266        instrument_id,
1267        limit,
1268    )
1269}
1270
1271/// Streams [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
1272/// yielding chunks of the specified size.
1273///
1274/// # Precision Inference Warning
1275///
1276/// When using streaming with precision inference (not providing explicit precisions),
1277/// the inferred precision may differ from bulk loading the entire file. This is because
1278/// precision inference works within chunk boundaries, and different chunks may contain
1279/// values with different precision requirements. For deterministic precision behavior,
1280/// provide explicit `price_precision` and `size_precision` parameters.
1281///
1282/// # Errors
1283///
1284/// Returns an error if the file cannot be opened, read, or parsed as CSV.
1285pub fn stream_depth10_from_snapshot25<P: AsRef<Path>>(
1286    filepath: P,
1287    chunk_size: usize,
1288    price_precision: Option<u8>,
1289    size_precision: Option<u8>,
1290    instrument_id: Option<InstrumentId>,
1291    limit: Option<usize>,
1292) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDepth10>>>> {
1293    Depth10StreamIterator::new(
1294        filepath,
1295        chunk_size,
1296        25,
1297        price_precision,
1298        size_precision,
1299        instrument_id,
1300        limit,
1301    )
1302}
1303
1304////////////////////////////////////////////////////////////////////////////////
1305// FundingRateUpdate Streaming
1306////////////////////////////////////////////////////////////////////////////////
1307
1308use nautilus_model::data::FundingRateUpdate;
1309
1310use crate::csv::record::TardisDerivativeTickerRecord;
1311
1312/// An iterator for streaming [`FundingRateUpdate`]s from a Tardis CSV file in chunks.
1313struct FundingRateStreamIterator {
1314    reader: Reader<Box<dyn Read>>,
1315    record: StringRecord,
1316    buffer: Vec<FundingRateUpdate>,
1317    chunk_size: usize,
1318    instrument_id: Option<InstrumentId>,
1319    limit: Option<usize>,
1320    records_processed: usize,
1321}
1322
1323impl FundingRateStreamIterator {
1324    /// Creates a new [`FundingRateStreamIterator`].
1325    ///
1326    /// # Errors
1327    ///
1328    /// Returns an error if the file cannot be opened or read.
1329    fn new<P: AsRef<Path>>(
1330        filepath: P,
1331        chunk_size: usize,
1332        instrument_id: Option<InstrumentId>,
1333        limit: Option<usize>,
1334    ) -> anyhow::Result<Self> {
1335        let reader = create_csv_reader(filepath)?;
1336
1337        Ok(Self {
1338            reader,
1339            record: StringRecord::new(),
1340            buffer: Vec::with_capacity(chunk_size),
1341            chunk_size,
1342            instrument_id,
1343            limit,
1344            records_processed: 0,
1345        })
1346    }
1347}
1348
1349impl Iterator for FundingRateStreamIterator {
1350    type Item = anyhow::Result<Vec<FundingRateUpdate>>;
1351
1352    fn next(&mut self) -> Option<Self::Item> {
1353        if let Some(limit) = self.limit
1354            && self.records_processed >= limit
1355        {
1356            return None;
1357        }
1358
1359        if !self.buffer.is_empty() {
1360            let chunk = self.buffer.split_off(0);
1361            return Some(Ok(chunk));
1362        }
1363
1364        self.buffer.clear();
1365        let mut records_read = 0;
1366
1367        while records_read < self.chunk_size {
1368            match self.reader.read_record(&mut self.record) {
1369                Ok(true) => {
1370                    let result = self
1371                        .record
1372                        .deserialize::<TardisDerivativeTickerRecord>(None)
1373                        .map_err(anyhow::Error::from)
1374                        .map(|data| parse_derivative_ticker_record(&data, self.instrument_id));
1375
1376                    match result {
1377                        Ok(Some(funding_rate)) => {
1378                            self.buffer.push(funding_rate);
1379                            records_read += 1;
1380                            self.records_processed += 1;
1381
1382                            if let Some(limit) = self.limit
1383                                && self.records_processed >= limit
1384                            {
1385                                break;
1386                            }
1387                        }
1388                        Ok(None) => {
1389                            // Skip this record as it has no funding data
1390                            self.records_processed += 1;
1391                        }
1392                        Err(e) => {
1393                            return Some(Err(anyhow::anyhow!(
1394                                "Failed to parse funding rate record: {e}"
1395                            )));
1396                        }
1397                    }
1398                }
1399                Ok(false) => {
1400                    if self.buffer.is_empty() {
1401                        return None;
1402                    }
1403                    let chunk = self.buffer.split_off(0);
1404                    return Some(Ok(chunk));
1405                }
1406                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
1407            }
1408        }
1409
1410        if self.buffer.is_empty() {
1411            None
1412        } else {
1413            let chunk = self.buffer.split_off(0);
1414            Some(Ok(chunk))
1415        }
1416    }
1417}
1418
1419/// Streams [`FundingRateUpdate`]s from a Tardis derivative ticker CSV file,
1420/// yielding chunks of the specified size.
1421///
1422/// This function parses the `funding_rate`, `predicted_funding_rate`, and `funding_timestamp`
1423/// fields from derivative ticker data to create funding rate updates.
1424///
1425/// # Errors
1426///
1427/// Returns an error if the file cannot be opened, read, or parsed as CSV.
1428pub fn stream_funding_rates<P: AsRef<Path>>(
1429    filepath: P,
1430    chunk_size: usize,
1431    instrument_id: Option<InstrumentId>,
1432    limit: Option<usize>,
1433) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<FundingRateUpdate>>>> {
1434    FundingRateStreamIterator::new(filepath, chunk_size, instrument_id, limit)
1435}
1436
1437#[cfg(test)]
1438mod tests {
1439    use nautilus_model::{
1440        enums::{AggressorSide, BookAction},
1441        identifiers::TradeId,
1442        types::Price,
1443    };
1444    use rstest::*;
1445
1446    use super::*;
1447    use crate::{csv::load::load_deltas, parse::parse_price, tests::get_test_data_path};
1448
1449    #[rstest]
1450    #[case(0.0, 0)]
1451    #[case(42.0, 0)]
1452    #[case(0.1, 1)]
1453    #[case(0.25, 2)]
1454    #[case(123.0001, 4)]
1455    #[case(-42.987654321,       9)]
1456    #[case(1.234_567_890_123, 12)]
1457    fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
1458        assert_eq!(infer_precision(input), expected);
1459    }
1460
1461    #[rstest]
1462    pub fn test_stream_deltas_chunked() {
1463        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1464binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
1465binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
1466binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
1467binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
1468binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
1469
1470        let temp_file = std::env::temp_dir().join("test_stream_deltas.csv");
1471        std::fs::write(&temp_file, csv_data).unwrap();
1472
1473        let stream = stream_deltas(&temp_file, 2, Some(4), Some(1), None, None).unwrap();
1474        let chunks: Vec<_> = stream.collect();
1475
1476        // 5 data rows + 1 CLEAR = 6 deltas, in chunks of 2
1477        assert_eq!(chunks.len(), 3);
1478
1479        let chunk1 = chunks[0].as_ref().unwrap();
1480        assert_eq!(chunk1.len(), 2);
1481        assert_eq!(chunk1[0].action, BookAction::Clear); // CLEAR first
1482        assert_eq!(chunk1[1].order.price.precision, 4); // First data delta
1483
1484        let chunk2 = chunks[1].as_ref().unwrap();
1485        assert_eq!(chunk2.len(), 2);
1486        assert_eq!(chunk2[0].order.price.precision, 4);
1487        assert_eq!(chunk2[1].order.price.precision, 4);
1488
1489        let chunk3 = chunks[2].as_ref().unwrap();
1490        assert_eq!(chunk3.len(), 2);
1491        assert_eq!(chunk3[0].order.price.precision, 4);
1492        assert_eq!(chunk3[1].order.price.precision, 4);
1493
1494        let total_deltas: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1495        assert_eq!(total_deltas, 6);
1496
1497        std::fs::remove_file(&temp_file).ok();
1498    }
1499
1500    #[rstest]
1501    pub fn test_stream_quotes_chunked() {
1502        let csv_data =
1503            "exchange,symbol,timestamp,local_timestamp,ask_amount,ask_price,bid_price,bid_amount
1504binance,BTCUSDT,1640995200000000,1640995200100000,1.0,50000.0,49999.0,1.5
1505binance,BTCUSDT,1640995201000000,1640995201100000,2.0,50000.5,49999.5,2.5
1506binance,BTCUSDT,1640995202000000,1640995202100000,1.5,50000.12,49999.12,1.8
1507binance,BTCUSDT,1640995203000000,1640995203100000,3.0,50000.123,49999.123,3.2
1508binance,BTCUSDT,1640995204000000,1640995204100000,0.5,50000.1234,49999.1234,0.8";
1509
1510        let temp_file = std::env::temp_dir().join("test_stream_quotes.csv");
1511        std::fs::write(&temp_file, csv_data).unwrap();
1512
1513        let stream = stream_quotes(&temp_file, 2, Some(4), Some(1), None, None).unwrap();
1514        let chunks: Vec<_> = stream.collect();
1515
1516        assert_eq!(chunks.len(), 3);
1517
1518        let chunk1 = chunks[0].as_ref().unwrap();
1519        assert_eq!(chunk1.len(), 2);
1520        assert_eq!(chunk1[0].bid_price.precision, 4);
1521        assert_eq!(chunk1[1].bid_price.precision, 4);
1522
1523        let chunk2 = chunks[1].as_ref().unwrap();
1524        assert_eq!(chunk2.len(), 2);
1525        assert_eq!(chunk2[0].bid_price.precision, 4);
1526        assert_eq!(chunk2[1].bid_price.precision, 4);
1527
1528        let chunk3 = chunks[2].as_ref().unwrap();
1529        assert_eq!(chunk3.len(), 1);
1530        assert_eq!(chunk3[0].bid_price.precision, 4);
1531
1532        let total_quotes: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1533        assert_eq!(total_quotes, 5);
1534
1535        std::fs::remove_file(&temp_file).ok();
1536    }
1537
1538    #[rstest]
1539    pub fn test_stream_trades_chunked() {
1540        let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1541binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1542binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,2.0
1543binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1544binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0
1545binance,BTCUSDT,1640995204000000,1640995204100000,trade5,buy,50000.1234,0.5";
1546
1547        let temp_file = std::env::temp_dir().join("test_stream_trades.csv");
1548        std::fs::write(&temp_file, csv_data).unwrap();
1549
1550        let stream = stream_trades(&temp_file, 3, Some(4), Some(1), None, None).unwrap();
1551        let chunks: Vec<_> = stream.collect();
1552
1553        assert_eq!(chunks.len(), 2);
1554
1555        let chunk1 = chunks[0].as_ref().unwrap();
1556        assert_eq!(chunk1.len(), 3);
1557        assert_eq!(chunk1[0].price.precision, 4);
1558        assert_eq!(chunk1[1].price.precision, 4);
1559        assert_eq!(chunk1[2].price.precision, 4);
1560
1561        let chunk2 = chunks[1].as_ref().unwrap();
1562        assert_eq!(chunk2.len(), 2);
1563        assert_eq!(chunk2[0].price.precision, 4);
1564        assert_eq!(chunk2[1].price.precision, 4);
1565
1566        assert_eq!(chunk1[0].aggressor_side, AggressorSide::Buyer);
1567        assert_eq!(chunk1[1].aggressor_side, AggressorSide::Seller);
1568
1569        let total_trades: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1570        assert_eq!(total_trades, 5);
1571
1572        std::fs::remove_file(&temp_file).ok();
1573    }
1574
1575    #[rstest]
1576    pub fn test_stream_trades_with_zero_sized_trade() {
1577        // Test CSV data with one zero-sized trade that should be skipped
1578        let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1579binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1580binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,0.0
1581binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1582binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0";
1583
1584        let temp_file = std::env::temp_dir().join("test_stream_trades_zero_size.csv");
1585        std::fs::write(&temp_file, csv_data).unwrap();
1586
1587        let stream = stream_trades(&temp_file, 3, Some(4), Some(1), None, None).unwrap();
1588        let chunks: Vec<_> = stream.collect();
1589
1590        // Should have 1 chunk with 3 valid trades (zero-sized trade skipped)
1591        assert_eq!(chunks.len(), 1);
1592
1593        let chunk1 = chunks[0].as_ref().unwrap();
1594        assert_eq!(chunk1.len(), 3);
1595
1596        // Verify the trades are the correct ones (not the zero-sized one)
1597        assert_eq!(chunk1[0].size, Quantity::from("1.0"));
1598        assert_eq!(chunk1[1].size, Quantity::from("1.5"));
1599        assert_eq!(chunk1[2].size, Quantity::from("3.0"));
1600
1601        // Verify trade IDs to confirm correct trades were loaded
1602        assert_eq!(chunk1[0].trade_id, TradeId::new("trade1"));
1603        assert_eq!(chunk1[1].trade_id, TradeId::new("trade3"));
1604        assert_eq!(chunk1[2].trade_id, TradeId::new("trade4"));
1605
1606        std::fs::remove_file(&temp_file).ok();
1607    }
1608
1609    #[rstest]
1610    pub fn test_stream_depth10_from_snapshot5_chunked() {
1611        let csv_data = "exchange,symbol,timestamp,local_timestamp,asks[0].price,asks[0].amount,bids[0].price,bids[0].amount,asks[1].price,asks[1].amount,bids[1].price,bids[1].amount,asks[2].price,asks[2].amount,bids[2].price,bids[2].amount,asks[3].price,asks[3].amount,bids[3].price,bids[3].amount,asks[4].price,asks[4].amount,bids[4].price,bids[4].amount
1612binance,BTCUSDT,1640995200000000,1640995200100000,50001.0,1.0,49999.0,1.5,50002.0,2.0,49998.0,2.5,50003.0,3.0,49997.0,3.5,50004.0,4.0,49996.0,4.5,50005.0,5.0,49995.0,5.5
1613binance,BTCUSDT,1640995201000000,1640995201100000,50001.5,1.1,49999.5,1.6,50002.5,2.1,49998.5,2.6,50003.5,3.1,49997.5,3.6,50004.5,4.1,49996.5,4.6,50005.5,5.1,49995.5,5.6
1614binance,BTCUSDT,1640995202000000,1640995202100000,50001.12,1.12,49999.12,1.62,50002.12,2.12,49998.12,2.62,50003.12,3.12,49997.12,3.62,50004.12,4.12,49996.12,4.62,50005.12,5.12,49995.12,5.62";
1615
1616        // Write to temporary file
1617        let temp_file = std::env::temp_dir().join("test_stream_depth10_snapshot5.csv");
1618        std::fs::write(&temp_file, csv_data).unwrap();
1619
1620        // Stream with chunk size of 2
1621        let stream = stream_depth10_from_snapshot5(&temp_file, 2, None, None, None, None).unwrap();
1622        let chunks: Vec<_> = stream.collect();
1623
1624        // Should have 2 chunks: [2 items, 1 item]
1625        assert_eq!(chunks.len(), 2);
1626
1627        // First chunk: 2 depth snapshots
1628        let chunk1 = chunks[0].as_ref().unwrap();
1629        assert_eq!(chunk1.len(), 2);
1630
1631        // Second chunk: 1 depth snapshot
1632        let chunk2 = chunks[1].as_ref().unwrap();
1633        assert_eq!(chunk2.len(), 1);
1634
1635        // Verify depth structure
1636        let first_depth = &chunk1[0];
1637        assert_eq!(first_depth.bids.len(), 10); // Should have 10 levels
1638        assert_eq!(first_depth.asks.len(), 10);
1639
1640        // Verify some specific prices
1641        assert_eq!(first_depth.bids[0].price, parse_price(49999.0, 1));
1642        assert_eq!(first_depth.asks[0].price, parse_price(50001.0, 1));
1643
1644        // Verify total count
1645        let total_depths: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1646        assert_eq!(total_depths, 3);
1647
1648        // Clean up
1649        std::fs::remove_file(&temp_file).ok();
1650    }
1651
1652    #[rstest]
1653    pub fn test_stream_depth10_from_snapshot25_chunked() {
1654        // Create minimal snapshot25 CSV data (first 10 levels only for testing)
1655        let mut header_parts = vec!["exchange", "symbol", "timestamp", "local_timestamp"];
1656
1657        // Add bid and ask levels (we'll only populate first few for testing)
1658        let mut bid_headers = Vec::new();
1659        let mut ask_headers = Vec::new();
1660        for i in 0..25 {
1661            bid_headers.push(format!("bids[{i}].price"));
1662            bid_headers.push(format!("bids[{i}].amount"));
1663        }
1664        for i in 0..25 {
1665            ask_headers.push(format!("asks[{i}].price"));
1666            ask_headers.push(format!("asks[{i}].amount"));
1667        }
1668
1669        for header in &bid_headers {
1670            header_parts.push(header);
1671        }
1672        for header in &ask_headers {
1673            header_parts.push(header);
1674        }
1675
1676        let header = header_parts.join(",");
1677
1678        // Create a row with data for first 5 levels (rest will be empty)
1679        let mut row1_parts = vec![
1680            "binance".to_string(),
1681            "BTCUSDT".to_string(),
1682            "1640995200000000".to_string(),
1683            "1640995200100000".to_string(),
1684        ];
1685
1686        // Add bid levels (first 5 with data, rest empty)
1687        for i in 0..25 {
1688            if i < 5 {
1689                let bid_price = f64::from(i).mul_add(-0.01, 49999.0);
1690                let bid_amount = 1.0 + f64::from(i);
1691                row1_parts.push(bid_price.to_string());
1692                row1_parts.push(bid_amount.to_string());
1693            } else {
1694                row1_parts.push(String::new());
1695                row1_parts.push(String::new());
1696            }
1697        }
1698
1699        // Add ask levels (first 5 with data, rest empty)
1700        for i in 0..25 {
1701            if i < 5 {
1702                let ask_price = f64::from(i).mul_add(0.01, 50000.0);
1703                let ask_amount = 1.0 + f64::from(i);
1704                row1_parts.push(ask_price.to_string());
1705                row1_parts.push(ask_amount.to_string());
1706            } else {
1707                row1_parts.push(String::new());
1708                row1_parts.push(String::new());
1709            }
1710        }
1711
1712        let csv_data = format!("{}\n{}", header, row1_parts.join(","));
1713
1714        // Write to temporary file
1715        let temp_file = std::env::temp_dir().join("test_stream_depth10_snapshot25.csv");
1716        std::fs::write(&temp_file, &csv_data).unwrap();
1717
1718        // Stream with chunk size of 1
1719        let stream = stream_depth10_from_snapshot25(&temp_file, 1, None, None, None, None).unwrap();
1720        let chunks: Vec<_> = stream.collect();
1721
1722        // Should have 1 chunk with 1 item
1723        assert_eq!(chunks.len(), 1);
1724
1725        let chunk1 = chunks[0].as_ref().unwrap();
1726        assert_eq!(chunk1.len(), 1);
1727
1728        // Verify depth structure
1729        let depth = &chunk1[0];
1730        assert_eq!(depth.bids.len(), 10); // Should have 10 levels
1731        assert_eq!(depth.asks.len(), 10);
1732
1733        // Verify first level has data - check whatever we actually get
1734        let actual_bid_price = depth.bids[0].price;
1735        let actual_ask_price = depth.asks[0].price;
1736        assert!(actual_bid_price.as_f64() > 0.0);
1737        assert!(actual_ask_price.as_f64() > 0.0);
1738
1739        // Clean up
1740        std::fs::remove_file(&temp_file).ok();
1741    }
1742
1743    #[rstest]
1744    pub fn test_stream_error_handling() {
1745        // Test with non-existent file
1746        let non_existent = std::path::Path::new("does_not_exist.csv");
1747
1748        let result = stream_deltas(non_existent, 10, None, None, None, None);
1749        assert!(result.is_err());
1750
1751        let result = stream_quotes(non_existent, 10, None, None, None, None);
1752        assert!(result.is_err());
1753
1754        let result = stream_trades(non_existent, 10, None, None, None, None);
1755        assert!(result.is_err());
1756
1757        let result = stream_depth10_from_snapshot5(non_existent, 10, None, None, None, None);
1758        assert!(result.is_err());
1759
1760        let result = stream_depth10_from_snapshot25(non_existent, 10, None, None, None, None);
1761        assert!(result.is_err());
1762    }
1763
1764    #[rstest]
1765    pub fn test_stream_empty_file() {
1766        // Test with empty CSV file
1767        let temp_file = std::env::temp_dir().join("test_empty.csv");
1768        std::fs::write(&temp_file, "").unwrap();
1769
1770        let stream = stream_deltas(&temp_file, 10, None, None, None, None).unwrap();
1771        assert_eq!(stream.count(), 0);
1772
1773        // Clean up
1774        std::fs::remove_file(&temp_file).ok();
1775    }
1776
1777    #[rstest]
1778    pub fn test_stream_precision_consistency() {
1779        // Test that streaming produces same results as bulk loading for precision inference
1780        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1781binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
1782binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
1783binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
1784binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0";
1785
1786        let temp_file = std::env::temp_dir().join("test_precision_consistency.csv");
1787        std::fs::write(&temp_file, csv_data).unwrap();
1788
1789        // Load all at once
1790        let bulk_deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
1791
1792        // Stream in chunks and collect
1793        let stream = stream_deltas(&temp_file, 2, None, None, None, None).unwrap();
1794        let streamed_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
1795
1796        // Should have same number of deltas
1797        assert_eq!(bulk_deltas.len(), streamed_deltas.len());
1798
1799        // Compare key properties (precision inference will be different due to chunking)
1800        for (bulk, streamed) in bulk_deltas.iter().zip(streamed_deltas.iter()) {
1801            assert_eq!(bulk.instrument_id, streamed.instrument_id);
1802            assert_eq!(bulk.action, streamed.action);
1803            assert_eq!(bulk.order.side, streamed.order.side);
1804            assert_eq!(bulk.ts_event, streamed.ts_event);
1805            assert_eq!(bulk.ts_init, streamed.ts_init);
1806            // Note: precision may differ between bulk and streaming due to chunk boundaries
1807        }
1808
1809        // Clean up
1810        std::fs::remove_file(&temp_file).ok();
1811    }
1812
1813    #[rstest]
1814    pub fn test_stream_trades_from_local_file() {
1815        let filepath = get_test_data_path("csv/trades_1.csv");
1816        let mut stream = stream_trades(filepath, 1, Some(1), Some(0), None, None).unwrap();
1817
1818        let chunk1 = stream.next().unwrap().unwrap();
1819        assert_eq!(chunk1.len(), 1);
1820        assert_eq!(chunk1[0].price, Price::from("8531.5"));
1821
1822        let chunk2 = stream.next().unwrap().unwrap();
1823        assert_eq!(chunk2.len(), 1);
1824        assert_eq!(chunk2[0].size, Quantity::from("1000"));
1825
1826        assert!(stream.next().is_none());
1827    }
1828
1829    #[rstest]
1830    pub fn test_stream_deltas_from_local_file() {
1831        let filepath = get_test_data_path("csv/deltas_1.csv");
1832        let mut stream = stream_deltas(filepath, 1, Some(1), Some(0), None, None).unwrap();
1833
1834        // With chunk_size=1, each delta gets its own chunk
1835        // First chunk: CLEAR
1836        let chunk1 = stream.next().unwrap().unwrap();
1837        assert_eq!(chunk1.len(), 1);
1838        assert_eq!(chunk1[0].action, BookAction::Clear);
1839
1840        // Second chunk: first data delta
1841        let chunk2 = stream.next().unwrap().unwrap();
1842        assert_eq!(chunk2.len(), 1);
1843        assert_eq!(chunk2[0].order.price, Price::from("6421.5"));
1844
1845        // Third chunk: second data delta
1846        let chunk3 = stream.next().unwrap().unwrap();
1847        assert_eq!(chunk3.len(), 1);
1848        assert_eq!(chunk3[0].order.size, Quantity::from("10000"));
1849
1850        assert!(stream.next().is_none());
1851    }
1852
1853    #[rstest]
1854    pub fn test_stream_deltas_with_limit() {
1855        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1856binance,BTCUSDT,1640995200000000,1640995200100000,false,bid,50000.0,1.0
1857binance,BTCUSDT,1640995201000000,1640995201100000,false,ask,50001.0,2.0
1858binance,BTCUSDT,1640995202000000,1640995202100000,false,bid,49999.0,1.5
1859binance,BTCUSDT,1640995203000000,1640995203100000,false,ask,50002.0,3.0
1860binance,BTCUSDT,1640995204000000,1640995204100000,false,bid,49998.0,0.5";
1861
1862        let temp_file = std::env::temp_dir().join("test_stream_deltas_limit.csv");
1863        std::fs::write(&temp_file, csv_data).unwrap();
1864
1865        // Test with limit of 3 records
1866        let stream = stream_deltas(&temp_file, 2, Some(4), Some(1), None, Some(3)).unwrap();
1867        let chunks: Vec<_> = stream.collect();
1868
1869        // Should have 2 chunks: [2 items, 1 item] = 3 total (limited)
1870        assert_eq!(chunks.len(), 2);
1871        let chunk1 = chunks[0].as_ref().unwrap();
1872        assert_eq!(chunk1.len(), 2);
1873        let chunk2 = chunks[1].as_ref().unwrap();
1874        assert_eq!(chunk2.len(), 1);
1875
1876        // Total should be exactly 3 records due to limit
1877        let total_deltas: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1878        assert_eq!(total_deltas, 3);
1879
1880        std::fs::remove_file(&temp_file).ok();
1881    }
1882
1883    #[rstest]
1884    pub fn test_stream_quotes_with_limit() {
1885        let csv_data =
1886            "exchange,symbol,timestamp,local_timestamp,ask_price,ask_amount,bid_price,bid_amount
1887binance,BTCUSDT,1640995200000000,1640995200100000,50001.0,1.0,50000.0,1.5
1888binance,BTCUSDT,1640995201000000,1640995201100000,50002.0,2.0,49999.0,2.5
1889binance,BTCUSDT,1640995202000000,1640995202100000,50003.0,1.5,49998.0,3.0
1890binance,BTCUSDT,1640995203000000,1640995203100000,50004.0,3.0,49997.0,3.5";
1891
1892        let temp_file = std::env::temp_dir().join("test_stream_quotes_limit.csv");
1893        std::fs::write(&temp_file, csv_data).unwrap();
1894
1895        // Test with limit of 2 records
1896        let stream = stream_quotes(&temp_file, 2, Some(4), Some(1), None, Some(2)).unwrap();
1897        let chunks: Vec<_> = stream.collect();
1898
1899        // Should have 1 chunk with 2 items (limited)
1900        assert_eq!(chunks.len(), 1);
1901        let chunk1 = chunks[0].as_ref().unwrap();
1902        assert_eq!(chunk1.len(), 2);
1903
1904        // Verify we get exactly 2 records
1905        let total_quotes: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1906        assert_eq!(total_quotes, 2);
1907
1908        std::fs::remove_file(&temp_file).ok();
1909    }
1910
1911    #[rstest]
1912    pub fn test_stream_trades_with_limit() {
1913        let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1914binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1915binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,2.0
1916binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1917binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0
1918binance,BTCUSDT,1640995204000000,1640995204100000,trade5,buy,50000.1234,0.5";
1919
1920        let temp_file = std::env::temp_dir().join("test_stream_trades_limit.csv");
1921        std::fs::write(&temp_file, csv_data).unwrap();
1922
1923        // Test with limit of 3 records
1924        let stream = stream_trades(&temp_file, 2, Some(4), Some(1), None, Some(3)).unwrap();
1925        let chunks: Vec<_> = stream.collect();
1926
1927        // Should have 2 chunks: [2 items, 1 item] = 3 total (limited)
1928        assert_eq!(chunks.len(), 2);
1929        let chunk1 = chunks[0].as_ref().unwrap();
1930        assert_eq!(chunk1.len(), 2);
1931        let chunk2 = chunks[1].as_ref().unwrap();
1932        assert_eq!(chunk2.len(), 1);
1933
1934        // Verify we get exactly 3 records
1935        let total_trades: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1936        assert_eq!(total_trades, 3);
1937
1938        std::fs::remove_file(&temp_file).ok();
1939    }
1940
1941    #[rstest]
1942    pub fn test_depth10_invalid_levels_error_at_construction() {
1943        let temp_file = std::env::temp_dir().join("test_depth10_invalid_levels.csv");
1944        std::fs::write(&temp_file, "exchange,symbol,timestamp,local_timestamp\n").unwrap();
1945
1946        let result = Depth10StreamIterator::new(&temp_file, 10, 10, None, None, None, None);
1947        assert!(result.is_err());
1948        let err_msg = result.err().unwrap().to_string();
1949        assert!(
1950            err_msg.contains("Invalid levels"),
1951            "Error should mention 'Invalid levels': {err_msg}"
1952        );
1953
1954        let result = Depth10StreamIterator::new(&temp_file, 10, 3, None, None, None, None);
1955        assert!(result.is_err());
1956
1957        let result = Depth10StreamIterator::new(&temp_file, 10, 5, None, None, None, None);
1958        assert!(result.is_ok());
1959
1960        let result = Depth10StreamIterator::new(&temp_file, 10, 25, None, None, None, None);
1961        assert!(result.is_ok());
1962
1963        std::fs::remove_file(&temp_file).ok();
1964    }
1965
1966    #[rstest]
1967    pub fn test_stream_deltas_with_mid_snapshot_inserts_clear() {
1968        // CSV with:
1969        // - Initial snapshot (is_snapshot=true) at start
1970        // - Some deltas (is_snapshot=false)
1971        // - Mid-day snapshot (is_snapshot=true) - should trigger CLEAR
1972        // - Back to deltas (is_snapshot=false)
1973        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1974binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1975binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
1976binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
1977binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
1978binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,bid,50100.0,3.0
1979binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,ask,50101.0,4.0
1980binance-futures,BTCUSDT,1640995301000000,1640995301100000,false,bid,50099.0,1.0";
1981
1982        let temp_file = std::env::temp_dir().join("test_stream_deltas_mid_snapshot.csv");
1983        std::fs::write(&temp_file, csv_data).unwrap();
1984
1985        let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, None).unwrap();
1986        let all_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
1987
1988        let clear_count = all_deltas
1989            .iter()
1990            .filter(|d| d.action == BookAction::Clear)
1991            .count();
1992
1993        // Should have 2 CLEAR deltas: initial snapshot + mid-day snapshot
1994        assert_eq!(
1995            clear_count, 2,
1996            "Expected 2 CLEAR deltas (initial + mid-day snapshot), got {clear_count}"
1997        );
1998
1999        // Verify CLEAR positions:
2000        // 0=CLEAR, 1=Add, 2=Add, 3=Update, 4=Update, 5=CLEAR, 6=Add, 7=Add, 8=Update
2001        assert_eq!(all_deltas[0].action, BookAction::Clear);
2002        assert_eq!(all_deltas[5].action, BookAction::Clear);
2003
2004        // CLEAR deltas should NOT have F_LAST when followed by same-timestamp deltas
2005        assert_eq!(
2006            all_deltas[0].flags & RecordFlag::F_LAST.value(),
2007            0,
2008            "CLEAR at index 0 should not have F_LAST flag"
2009        );
2010        assert_eq!(
2011            all_deltas[5].flags & RecordFlag::F_LAST.value(),
2012            0,
2013            "CLEAR at index 5 should not have F_LAST flag"
2014        );
2015
2016        std::fs::remove_file(&temp_file).ok();
2017    }
2018
2019    #[rstest]
2020    pub fn test_load_deltas_with_mid_snapshot_inserts_clear() {
2021        let filepath = get_test_data_path("csv/deltas_with_snapshot.csv");
2022        let deltas = load_deltas(&filepath, Some(1), Some(1), None, None).unwrap();
2023
2024        let clear_count = deltas
2025            .iter()
2026            .filter(|d| d.action == BookAction::Clear)
2027            .count();
2028
2029        // Should have 2 CLEAR deltas: initial snapshot + mid-day snapshot
2030        assert_eq!(
2031            clear_count, 2,
2032            "Expected 2 CLEAR deltas (initial + mid-day snapshot), got {clear_count}"
2033        );
2034
2035        assert_eq!(deltas[0].action, BookAction::Clear);
2036
2037        let second_clear_idx = deltas
2038            .iter()
2039            .enumerate()
2040            .filter(|(_, d)| d.action == BookAction::Clear)
2041            .nth(1)
2042            .map(|(i, _)| i)
2043            .expect("Should have second CLEAR");
2044
2045        // 0=CLEAR, 1=Add, 2=Add, 3=Update, 4=Update, 5=Delete, 6=CLEAR
2046        assert_eq!(
2047            second_clear_idx, 6,
2048            "Second CLEAR should be at index 6, got {second_clear_idx}"
2049        );
2050
2051        // CLEAR deltas should NOT have F_LAST when followed by same-timestamp deltas
2052        assert_eq!(
2053            deltas[0].flags & RecordFlag::F_LAST.value(),
2054            0,
2055            "CLEAR at index 0 should not have F_LAST flag"
2056        );
2057        assert_eq!(
2058            deltas[6].flags & RecordFlag::F_LAST.value(),
2059            0,
2060            "CLEAR at index 6 should not have F_LAST flag"
2061        );
2062    }
2063
2064    #[rstest]
2065    fn test_stream_deltas_chunk_size_respects_clear() {
2066        // Test that chunk_size applies to total emitted deltas (including CLEARs)
2067        // With chunk_size=1, a snapshot boundary should emit CLEAR in one chunk
2068        // and the real delta in the next chunk
2069        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2070binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2071binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0";
2072
2073        let temp_file = std::env::temp_dir().join("test_stream_chunk_size_clear.csv");
2074        std::fs::write(&temp_file, csv_data).unwrap();
2075
2076        // chunk_size=1 should produce separate chunks for CLEAR and real deltas
2077        let stream = stream_deltas(&temp_file, 1, Some(1), Some(1), None, None).unwrap();
2078        let chunks: Vec<_> = stream.collect();
2079
2080        // Should have 3 chunks: [CLEAR], [data], [data]
2081        assert_eq!(chunks.len(), 3, "Expected 3 chunks with chunk_size=1");
2082        assert_eq!(chunks[0].as_ref().unwrap().len(), 1);
2083        assert_eq!(chunks[1].as_ref().unwrap().len(), 1);
2084        assert_eq!(chunks[2].as_ref().unwrap().len(), 1);
2085
2086        // First chunk should be CLEAR
2087        assert_eq!(chunks[0].as_ref().unwrap()[0].action, BookAction::Clear);
2088        // Second and third chunks should be data deltas
2089        assert_eq!(chunks[1].as_ref().unwrap()[0].action, BookAction::Add);
2090        assert_eq!(chunks[2].as_ref().unwrap()[0].action, BookAction::Add);
2091
2092        std::fs::remove_file(&temp_file).ok();
2093    }
2094
2095    #[rstest]
2096    fn test_stream_deltas_limit_stops_at_clear() {
2097        // Test that limit=1 with snapshot data returns only the CLEAR delta
2098        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2099binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2100binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0";
2101
2102        let temp_file = std::env::temp_dir().join("test_stream_limit_stops_at_clear.csv");
2103        std::fs::write(&temp_file, csv_data).unwrap();
2104
2105        // limit=1 should only get the CLEAR delta
2106        let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, Some(1)).unwrap();
2107        let all_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2108
2109        assert_eq!(all_deltas.len(), 1);
2110        assert_eq!(all_deltas[0].action, BookAction::Clear);
2111
2112        std::fs::remove_file(&temp_file).ok();
2113    }
2114
2115    #[rstest]
2116    fn test_stream_deltas_limit_includes_clear() {
2117        // Test that limit counts total emitted deltas (including CLEARs)
2118        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2119binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2120binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
2121binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
2122binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
2123binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5";
2124
2125        let temp_file = std::env::temp_dir().join("test_stream_limit_includes_clear.csv");
2126        std::fs::write(&temp_file, csv_data).unwrap();
2127
2128        // limit=4 should get exactly 4 deltas: 1 CLEAR + 3 data deltas
2129        let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, Some(4)).unwrap();
2130        let all_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2131
2132        assert_eq!(all_deltas.len(), 4);
2133        assert_eq!(all_deltas[0].action, BookAction::Clear);
2134        assert_eq!(all_deltas[1].action, BookAction::Add);
2135        assert_eq!(all_deltas[2].action, BookAction::Add);
2136        assert_eq!(all_deltas[3].action, BookAction::Update);
2137
2138        std::fs::remove_file(&temp_file).ok();
2139    }
2140
2141    #[rstest]
2142    fn test_stream_deltas_limit_sets_f_last() {
2143        // Test that F_LAST is set on the final delta when limit is reached
2144        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2145binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2146binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
2147binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
2148binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
2149binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5";
2150
2151        let temp_file = std::env::temp_dir().join("test_stream_limit_f_last.csv");
2152        std::fs::write(&temp_file, csv_data).unwrap();
2153
2154        // limit=3 should get 3 deltas with F_LAST on the last one
2155        let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, Some(3)).unwrap();
2156        let chunks: Vec<_> = stream.collect();
2157
2158        // Should have 1 chunk with 3 deltas
2159        assert_eq!(chunks.len(), 1);
2160        let deltas = chunks[0].as_ref().unwrap();
2161        assert_eq!(deltas.len(), 3);
2162
2163        // Final delta should have F_LAST flag
2164        assert_eq!(
2165            deltas[2].flags & RecordFlag::F_LAST.value(),
2166            RecordFlag::F_LAST.value(),
2167            "Final delta should have F_LAST flag when limit is reached"
2168        );
2169
2170        std::fs::remove_file(&temp_file).ok();
2171    }
2172
2173    #[rstest]
2174    fn test_stream_deltas_chunk_boundary_no_f_last() {
2175        // Test that F_LAST is NOT set when only chunk_size boundary is hit (more data follows)
2176        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2177binance-futures,BTCUSDT,1640995200000000,1640995200100000,false,bid,50000.0,1.0
2178binance-futures,BTCUSDT,1640995200000000,1640995200100000,false,ask,50001.0,2.0
2179binance-futures,BTCUSDT,1640995200000000,1640995200100000,false,bid,49999.0,0.5";
2180
2181        let temp_file = std::env::temp_dir().join("test_stream_chunk_no_f_last.csv");
2182        std::fs::write(&temp_file, csv_data).unwrap();
2183
2184        // chunk_size=2, no limit - first chunk should NOT have F_LAST (more data follows)
2185        let mut stream = stream_deltas(&temp_file, 2, Some(1), Some(1), None, None).unwrap();
2186
2187        let chunk1 = stream.next().unwrap().unwrap();
2188        assert_eq!(chunk1.len(), 2);
2189
2190        // First chunk's last delta should NOT have F_LAST (more data follows with same timestamp)
2191        assert_eq!(
2192            chunk1[1].flags & RecordFlag::F_LAST.value(),
2193            0,
2194            "Mid-stream chunk should not have F_LAST flag"
2195        );
2196
2197        // Second chunk exists and has F_LAST (end of file)
2198        let chunk2 = stream.next().unwrap().unwrap();
2199        assert_eq!(chunk2.len(), 1);
2200        assert_eq!(
2201            chunk2[0].flags & RecordFlag::F_LAST.value(),
2202            RecordFlag::F_LAST.value(),
2203            "Final chunk at EOF should have F_LAST flag"
2204        );
2205
2206        std::fs::remove_file(&temp_file).ok();
2207    }
2208}