Skip to main content

nautilus_tardis/csv/
stream.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{io::Read, path::Path};
17
18use csv::{Reader, StringRecord};
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21    data::{DEPTH10_LEN, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
22    enums::{OrderSide, RecordFlag},
23    identifiers::InstrumentId,
24    types::Quantity,
25};
26#[cfg(feature = "python")]
27use nautilus_model::{
28    data::{Data, OrderBookDeltas, OrderBookDeltas_API},
29    python::data::data_to_pycapsule,
30};
31#[cfg(feature = "python")]
32use pyo3::{Py, PyAny, Python};
33
34use crate::{
35    csv::{
36        create_book_order, create_csv_reader, infer_precision, parse_delta_record,
37        parse_derivative_ticker_record, parse_quote_record, parse_trade_record,
38        record::{
39            TardisBookUpdateRecord, TardisOrderBookSnapshot5Record,
40            TardisOrderBookSnapshot25Record, TardisQuoteRecord, TardisTradeRecord,
41        },
42    },
43    parse::{parse_instrument_id, parse_timestamp},
44};
45
46////////////////////////////////////////////////////////////////////////////////
47// OrderBookDelta Streaming
48////////////////////////////////////////////////////////////////////////////////
49
50/// Streaming iterator over CSV records that yields chunks of parsed data.
51struct DeltaStreamIterator {
52    reader: Reader<Box<dyn std::io::Read>>,
53    record: StringRecord,
54    buffer: Vec<OrderBookDelta>,
55    chunk_size: usize,
56    instrument_id: Option<InstrumentId>,
57    price_precision: u8,
58    size_precision: u8,
59    last_ts_event: UnixNanos,
60    last_is_snapshot: bool,
61    limit: Option<usize>,
62    deltas_emitted: usize,
63
64    /// Pending record to process in next iteration (when CLEAR filled the chunk).
65    pending_record: Option<TardisBookUpdateRecord>,
66}
67
68impl DeltaStreamIterator {
69    /// Creates a new [`DeltaStreamIterator`].
70    ///
71    /// # Errors
72    ///
73    /// Returns an error if the file cannot be opened or read.
74    fn new<P: AsRef<Path>>(
75        filepath: P,
76        chunk_size: usize,
77        price_precision: Option<u8>,
78        size_precision: Option<u8>,
79        instrument_id: Option<InstrumentId>,
80        limit: Option<usize>,
81    ) -> anyhow::Result<Self> {
82        let (final_price_precision, final_size_precision) =
83            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
84                // Both precisions provided, use them directly
85                (price_prec, size_prec)
86            } else {
87                // One or both precisions missing, detect only the missing ones
88                let mut reader = create_csv_reader(&filepath)?;
89                let mut record = StringRecord::new();
90                let (detected_price, detected_size) =
91                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
92                (
93                    price_precision.unwrap_or(detected_price),
94                    size_precision.unwrap_or(detected_size),
95                )
96            };
97
98        let reader = create_csv_reader(filepath)?;
99
100        Ok(Self {
101            reader,
102            record: StringRecord::new(),
103            buffer: Vec::with_capacity(chunk_size),
104            chunk_size,
105            instrument_id,
106            price_precision: final_price_precision,
107            size_precision: final_size_precision,
108            last_ts_event: UnixNanos::default(),
109            last_is_snapshot: false,
110            limit,
111            deltas_emitted: 0,
112            pending_record: None,
113        })
114    }
115
116    fn detect_precision_from_sample(
117        reader: &mut Reader<Box<dyn std::io::Read>>,
118        record: &mut StringRecord,
119        sample_size: usize,
120    ) -> anyhow::Result<(u8, u8)> {
121        let mut max_price_precision = 0u8;
122        let mut max_size_precision = 0u8;
123        let mut records_scanned = 0;
124
125        while records_scanned < sample_size {
126            match reader.read_record(record) {
127                Ok(true) => {
128                    if let Ok(data) = record.deserialize::<TardisBookUpdateRecord>(None) {
129                        max_price_precision = max_price_precision.max(infer_precision(data.price));
130                        max_size_precision = max_size_precision.max(infer_precision(data.amount));
131                        records_scanned += 1;
132                    }
133                }
134                Ok(false) => break,             // End of file
135                Err(_) => records_scanned += 1, // Skip malformed records
136            }
137        }
138
139        Ok((max_price_precision, max_size_precision))
140    }
141}
142
143impl Iterator for DeltaStreamIterator {
144    type Item = anyhow::Result<Vec<OrderBookDelta>>;
145
146    fn next(&mut self) -> Option<Self::Item> {
147        if let Some(limit) = self.limit
148            && self.deltas_emitted >= limit
149        {
150            return None;
151        }
152
153        self.buffer.clear();
154
155        loop {
156            if self.buffer.len() >= self.chunk_size {
157                break;
158            }
159
160            if let Some(limit) = self.limit
161                && self.deltas_emitted >= limit
162            {
163                break;
164            }
165
166            // Use pending record from previous iteration, or read new
167            let data = if let Some(pending) = self.pending_record.take() {
168                pending
169            } else {
170                match self.reader.read_record(&mut self.record) {
171                    Ok(true) => match self.record.deserialize::<TardisBookUpdateRecord>(None) {
172                        Ok(data) => data,
173                        Err(e) => {
174                            return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
175                        }
176                    },
177                    Ok(false) => {
178                        if self.buffer.is_empty() {
179                            return None;
180                        }
181                        if let Some(last_delta) = self.buffer.last_mut() {
182                            last_delta.flags = RecordFlag::F_LAST.value();
183                        }
184                        return Some(Ok(self.buffer.clone()));
185                    }
186                    Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
187                }
188            };
189
190            // Insert CLEAR on snapshot boundary to reset order book state
191            if data.is_snapshot && !self.last_is_snapshot {
192                let clear_instrument_id = self
193                    .instrument_id
194                    .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
195                let ts_event = parse_timestamp(data.timestamp);
196                let ts_init = parse_timestamp(data.local_timestamp);
197
198                if self.last_ts_event != ts_event
199                    && let Some(last_delta) = self.buffer.last_mut()
200                {
201                    last_delta.flags = RecordFlag::F_LAST.value();
202                }
203                self.last_ts_event = ts_event;
204
205                let clear_delta = OrderBookDelta::clear(clear_instrument_id, 0, ts_event, ts_init);
206                self.buffer.push(clear_delta);
207                self.deltas_emitted += 1;
208
209                // Defer real delta to next chunk if constraints reached
210                if self.buffer.len() >= self.chunk_size
211                    || self.limit.is_some_and(|l| self.deltas_emitted >= l)
212                {
213                    self.last_is_snapshot = data.is_snapshot;
214                    self.pending_record = Some(data);
215                    break;
216                }
217            }
218            self.last_is_snapshot = data.is_snapshot;
219
220            let delta = match parse_delta_record(
221                &data,
222                self.price_precision,
223                self.size_precision,
224                self.instrument_id,
225            ) {
226                Ok(d) => d,
227                Err(e) => {
228                    log::warn!("Skipping invalid delta record: {e}");
229                    continue;
230                }
231            };
232
233            if self.last_ts_event != delta.ts_event
234                && let Some(last_delta) = self.buffer.last_mut()
235            {
236                last_delta.flags = RecordFlag::F_LAST.value();
237            }
238
239            self.last_ts_event = delta.ts_event;
240
241            self.buffer.push(delta);
242            self.deltas_emitted += 1;
243        }
244
245        if self.buffer.is_empty() {
246            None
247        } else {
248            // Only set F_LAST when limit reached (stream ending), not on chunk
249            // boundary where more same-timestamp deltas may follow
250            if let Some(limit) = self.limit
251                && self.deltas_emitted >= limit
252                && let Some(last_delta) = self.buffer.last_mut()
253            {
254                last_delta.flags = RecordFlag::F_LAST.value();
255            }
256            Some(Ok(self.buffer.clone()))
257        }
258    }
259}
260
261/// Streams [`OrderBookDelta`]s from a Tardis format CSV at the given `filepath`,
262/// yielding chunks of the specified size.
263///
264/// # Precision Inference Warning
265///
266/// When using streaming with precision inference (not providing explicit precisions),
267/// the inferred precision may differ from bulk loading the entire file. This is because
268/// precision inference works within chunk boundaries, and different chunks may contain
269/// values with different precision requirements. For deterministic precision behavior,
270/// provide explicit `price_precision` and `size_precision` parameters.
271///
272/// # Errors
273///
274/// Returns an error if the file cannot be opened, read, or parsed as CSV.
275pub fn stream_deltas<P: AsRef<Path>>(
276    filepath: P,
277    chunk_size: usize,
278    price_precision: Option<u8>,
279    size_precision: Option<u8>,
280    instrument_id: Option<InstrumentId>,
281    limit: Option<usize>,
282) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDelta>>>> {
283    DeltaStreamIterator::new(
284        filepath,
285        chunk_size,
286        price_precision,
287        size_precision,
288        instrument_id,
289        limit,
290    )
291}
292
293////////////////////////////////////////////////////////////////////////////////
294// Vec<Py<PyAny>> (OrderBookDeltas as PyCapsule) Streaming
295////////////////////////////////////////////////////////////////////////////////
296
297#[cfg(feature = "python")]
298/// Streaming iterator over CSV records that yields chunks of parsed data.
299struct BatchedDeltasStreamIterator {
300    reader: Reader<Box<dyn std::io::Read>>,
301    record: StringRecord,
302    buffer: Vec<Py<PyAny>>,
303    current_batch: Vec<OrderBookDelta>,
304    pending_batches: Vec<Vec<OrderBookDelta>>,
305    chunk_size: usize,
306    instrument_id: InstrumentId,
307    price_precision: u8,
308    size_precision: u8,
309    last_ts_event: UnixNanos,
310    last_is_snapshot: bool,
311    limit: Option<usize>,
312    deltas_emitted: usize,
313}
314
315#[cfg(feature = "python")]
316impl BatchedDeltasStreamIterator {
317    /// Creates a new [`DeltaStreamIterator`].
318    ///
319    /// # Errors
320    ///
321    /// Returns an error if the file cannot be opened or read.
322    fn new<P: AsRef<Path>>(
323        filepath: P,
324        chunk_size: usize,
325        price_precision: Option<u8>,
326        size_precision: Option<u8>,
327        instrument_id: Option<InstrumentId>,
328        limit: Option<usize>,
329    ) -> anyhow::Result<Self> {
330        let mut reader = create_csv_reader(&filepath)?;
331        let mut record = StringRecord::new();
332
333        let first_record = if reader.read_record(&mut record)? {
334            record.deserialize::<TardisBookUpdateRecord>(None)?
335        } else {
336            anyhow::bail!("CSV file is empty");
337        };
338
339        let final_instrument_id = instrument_id
340            .unwrap_or_else(|| parse_instrument_id(&first_record.exchange, first_record.symbol));
341
342        let (final_price_precision, final_size_precision) =
343            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
344                // Both precisions provided, use them directly
345                (price_prec, size_prec)
346            } else {
347                // One or both precisions missing, detect from sample including first record
348                let (detected_price, detected_size) =
349                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
350                (
351                    price_precision.unwrap_or(detected_price),
352                    size_precision.unwrap_or(detected_size),
353                )
354            };
355
356        let reader = create_csv_reader(filepath)?;
357
358        Ok(Self {
359            reader,
360            record: StringRecord::new(),
361            buffer: Vec::with_capacity(chunk_size),
362            current_batch: Vec::new(),
363            pending_batches: Vec::with_capacity(chunk_size),
364            chunk_size,
365            instrument_id: final_instrument_id,
366            price_precision: final_price_precision,
367            size_precision: final_size_precision,
368            last_ts_event: UnixNanos::default(),
369            last_is_snapshot: false,
370            limit,
371            deltas_emitted: 0,
372        })
373    }
374
375    fn detect_precision_from_sample(
376        reader: &mut Reader<Box<dyn std::io::Read>>,
377        record: &mut StringRecord,
378        sample_size: usize,
379    ) -> anyhow::Result<(u8, u8)> {
380        let mut max_price_precision = 0u8;
381        let mut max_size_precision = 0u8;
382        let mut records_scanned = 0;
383
384        while records_scanned < sample_size {
385            match reader.read_record(record) {
386                Ok(true) => {
387                    if let Ok(data) = record.deserialize::<TardisBookUpdateRecord>(None) {
388                        max_price_precision = max_price_precision.max(infer_precision(data.price));
389                        max_size_precision = max_size_precision.max(infer_precision(data.amount));
390                        records_scanned += 1;
391                    }
392                }
393                Ok(false) => break,             // End of file
394                Err(_) => records_scanned += 1, // Skip malformed records
395            }
396        }
397
398        Ok((max_price_precision, max_size_precision))
399    }
400
401    fn fill_pending_batches(&mut self) -> Option<anyhow::Result<()>> {
402        self.pending_batches.clear();
403        let mut batches_created = 0;
404
405        while batches_created < self.chunk_size {
406            if let Some(limit) = self.limit
407                && self.deltas_emitted >= limit
408            {
409                break;
410            }
411
412            match self.reader.read_record(&mut self.record) {
413                Ok(true) => {
414                    let data = match self.record.deserialize::<TardisBookUpdateRecord>(None) {
415                        Ok(data) => data,
416                        Err(e) => {
417                            return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
418                        }
419                    };
420
421                    let ts_event = parse_timestamp(data.timestamp);
422                    let ts_init = parse_timestamp(data.local_timestamp);
423
424                    // Parse before any state changes so invalid records
425                    // don't corrupt batch boundaries or snapshot tracking
426                    let delta = match parse_delta_record(
427                        &data,
428                        self.price_precision,
429                        self.size_precision,
430                        Some(self.instrument_id),
431                    ) {
432                        Ok(d) => d,
433                        Err(e) => {
434                            log::warn!("Skipping invalid delta record: {e}");
435                            continue;
436                        }
437                    };
438
439                    if self.last_ts_event != ts_event && !self.current_batch.is_empty() {
440                        // Set F_LAST on the last delta of the completed batch
441                        if let Some(last_delta) = self.current_batch.last_mut() {
442                            last_delta.flags = RecordFlag::F_LAST.value();
443                        }
444                        self.pending_batches
445                            .push(std::mem::take(&mut self.current_batch));
446                        batches_created += 1;
447                    }
448
449                    self.last_ts_event = ts_event;
450
451                    // Insert CLEAR on snapshot boundary to reset order book state
452                    if data.is_snapshot && !self.last_is_snapshot {
453                        let clear_delta =
454                            OrderBookDelta::clear(self.instrument_id, 0, ts_event, ts_init);
455                        self.current_batch.push(clear_delta);
456                        self.deltas_emitted += 1;
457
458                        if let Some(limit) = self.limit
459                            && self.deltas_emitted >= limit
460                        {
461                            self.last_is_snapshot = data.is_snapshot;
462                            break;
463                        }
464                    }
465                    self.last_is_snapshot = data.is_snapshot;
466
467                    self.current_batch.push(delta);
468                    self.deltas_emitted += 1;
469
470                    if let Some(limit) = self.limit
471                        && self.deltas_emitted >= limit
472                    {
473                        break;
474                    }
475                }
476                Ok(false) => {
477                    // End of file
478                    break;
479                }
480                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
481            }
482        }
483
484        if !self.current_batch.is_empty() && batches_created < self.chunk_size {
485            // Ensure the last delta of the last batch has F_LAST set
486            if let Some(last_delta) = self.current_batch.last_mut() {
487                last_delta.flags = RecordFlag::F_LAST.value();
488            }
489            self.pending_batches
490                .push(std::mem::take(&mut self.current_batch));
491        }
492
493        if self.pending_batches.is_empty() {
494            None
495        } else {
496            Some(Ok(()))
497        }
498    }
499}
500
501#[cfg(feature = "python")]
502impl Iterator for BatchedDeltasStreamIterator {
503    type Item = anyhow::Result<Vec<Py<PyAny>>>;
504
505    fn next(&mut self) -> Option<Self::Item> {
506        if let Some(limit) = self.limit
507            && self.deltas_emitted >= limit
508        {
509            return None;
510        }
511
512        self.buffer.clear();
513        if let Some(Err(e)) = self.fill_pending_batches() {
514            return Some(Err(e));
515        }
516
517        if self.pending_batches.is_empty() {
518            None
519        } else {
520            // Create all capsules in a single GIL acquisition
521            Python::attach(|py| {
522                for batch in self.pending_batches.drain(..) {
523                    let deltas = OrderBookDeltas::new(self.instrument_id, batch);
524                    let deltas = OrderBookDeltas_API::new(deltas);
525                    let capsule = data_to_pycapsule(py, Data::Deltas(deltas));
526                    self.buffer.push(capsule);
527                }
528            });
529            Some(Ok(std::mem::take(&mut self.buffer)))
530        }
531    }
532}
533
534#[cfg(feature = "python")]
535/// Streams [`Vec<Py<PyAny>>`]s (`PyCapsule`) from a Tardis format CSV at the given `filepath`,
536/// yielding chunks of the specified size.
537///
538/// # Errors
539///
540/// Returns an error if the file cannot be opened, read, or parsed as CSV.
541pub fn stream_batched_deltas<P: AsRef<Path>>(
542    filepath: P,
543    chunk_size: usize,
544    price_precision: Option<u8>,
545    size_precision: Option<u8>,
546    instrument_id: Option<InstrumentId>,
547    limit: Option<usize>,
548) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<Py<PyAny>>>>> {
549    BatchedDeltasStreamIterator::new(
550        filepath,
551        chunk_size,
552        price_precision,
553        size_precision,
554        instrument_id,
555        limit,
556    )
557}
558
559////////////////////////////////////////////////////////////////////////////////
560// Quote Streaming
561////////////////////////////////////////////////////////////////////////////////
562
563/// An iterator for streaming [`QuoteTick`]s from a Tardis CSV file in chunks.
564struct QuoteStreamIterator {
565    reader: Reader<Box<dyn Read>>,
566    record: StringRecord,
567    buffer: Vec<QuoteTick>,
568    chunk_size: usize,
569    instrument_id: Option<InstrumentId>,
570    price_precision: u8,
571    size_precision: u8,
572    limit: Option<usize>,
573    records_processed: usize,
574}
575
576impl QuoteStreamIterator {
577    /// Creates a new [`QuoteStreamIterator`].
578    ///
579    /// # Errors
580    ///
581    /// Returns an error if the file cannot be opened or read.
582    pub fn new<P: AsRef<Path>>(
583        filepath: P,
584        chunk_size: usize,
585        price_precision: Option<u8>,
586        size_precision: Option<u8>,
587        instrument_id: Option<InstrumentId>,
588        limit: Option<usize>,
589    ) -> anyhow::Result<Self> {
590        let (final_price_precision, final_size_precision) =
591            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
592                // Both precisions provided, use them directly
593                (price_prec, size_prec)
594            } else {
595                // One or both precisions missing, detect only the missing ones
596                let mut reader = create_csv_reader(&filepath)?;
597                let mut record = StringRecord::new();
598                let (detected_price, detected_size) =
599                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
600                (
601                    price_precision.unwrap_or(detected_price),
602                    size_precision.unwrap_or(detected_size),
603                )
604            };
605
606        let reader = create_csv_reader(filepath)?;
607
608        Ok(Self {
609            reader,
610            record: StringRecord::new(),
611            buffer: Vec::with_capacity(chunk_size),
612            chunk_size,
613            instrument_id,
614            price_precision: final_price_precision,
615            size_precision: final_size_precision,
616            limit,
617            records_processed: 0,
618        })
619    }
620
621    fn detect_precision_from_sample(
622        reader: &mut Reader<Box<dyn std::io::Read>>,
623        record: &mut StringRecord,
624        sample_size: usize,
625    ) -> anyhow::Result<(u8, u8)> {
626        let mut max_price_precision = 2u8;
627        let mut max_size_precision = 0u8;
628        let mut records_scanned = 0;
629
630        while records_scanned < sample_size {
631            match reader.read_record(record) {
632                Ok(true) => {
633                    if let Ok(data) = record.deserialize::<TardisQuoteRecord>(None) {
634                        if let Some(bid_price_val) = data.bid_price {
635                            max_price_precision =
636                                max_price_precision.max(infer_precision(bid_price_val));
637                        }
638                        if let Some(ask_price_val) = data.ask_price {
639                            max_price_precision =
640                                max_price_precision.max(infer_precision(ask_price_val));
641                        }
642                        if let Some(bid_amount_val) = data.bid_amount {
643                            max_size_precision =
644                                max_size_precision.max(infer_precision(bid_amount_val));
645                        }
646                        if let Some(ask_amount_val) = data.ask_amount {
647                            max_size_precision =
648                                max_size_precision.max(infer_precision(ask_amount_val));
649                        }
650                        records_scanned += 1;
651                    }
652                }
653                Ok(false) => break,             // End of file
654                Err(_) => records_scanned += 1, // Skip malformed records
655            }
656        }
657
658        Ok((max_price_precision, max_size_precision))
659    }
660}
661
662impl Iterator for QuoteStreamIterator {
663    type Item = anyhow::Result<Vec<QuoteTick>>;
664
665    fn next(&mut self) -> Option<Self::Item> {
666        if let Some(limit) = self.limit
667            && self.records_processed >= limit
668        {
669            return None;
670        }
671
672        self.buffer.clear();
673        let mut records_read = 0;
674
675        while records_read < self.chunk_size {
676            match self.reader.read_record(&mut self.record) {
677                Ok(true) => match self.record.deserialize::<TardisQuoteRecord>(None) {
678                    Ok(data) => {
679                        let quote = parse_quote_record(
680                            &data,
681                            self.price_precision,
682                            self.size_precision,
683                            self.instrument_id,
684                        );
685
686                        self.buffer.push(quote);
687                        records_read += 1;
688                        self.records_processed += 1;
689
690                        if let Some(limit) = self.limit
691                            && self.records_processed >= limit
692                        {
693                            break;
694                        }
695                    }
696                    Err(e) => {
697                        return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
698                    }
699                },
700                Ok(false) => {
701                    if self.buffer.is_empty() {
702                        return None;
703                    }
704                    return Some(Ok(self.buffer.clone()));
705                }
706                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
707            }
708        }
709
710        if self.buffer.is_empty() {
711            None
712        } else {
713            Some(Ok(self.buffer.clone()))
714        }
715    }
716}
717
718/// Streams [`QuoteTick`]s from a Tardis format CSV at the given `filepath`,
719/// yielding chunks of the specified size.
720///
721/// # Precision Inference Warning
722///
723/// When using streaming with precision inference (not providing explicit precisions),
724/// the inferred precision may differ from bulk loading the entire file. This is because
725/// precision inference works within chunk boundaries, and different chunks may contain
726/// values with different precision requirements. For deterministic precision behavior,
727/// provide explicit `price_precision` and `size_precision` parameters.
728///
729/// # Errors
730///
731/// Returns an error if the file cannot be opened, read, or parsed as CSV.
732pub fn stream_quotes<P: AsRef<Path>>(
733    filepath: P,
734    chunk_size: usize,
735    price_precision: Option<u8>,
736    size_precision: Option<u8>,
737    instrument_id: Option<InstrumentId>,
738    limit: Option<usize>,
739) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<QuoteTick>>>> {
740    QuoteStreamIterator::new(
741        filepath,
742        chunk_size,
743        price_precision,
744        size_precision,
745        instrument_id,
746        limit,
747    )
748}
749
750////////////////////////////////////////////////////////////////////////////////
751// Trade Streaming
752////////////////////////////////////////////////////////////////////////////////
753
754/// An iterator for streaming [`TradeTick`]s from a Tardis CSV file in chunks.
755struct TradeStreamIterator {
756    reader: Reader<Box<dyn Read>>,
757    record: StringRecord,
758    buffer: Vec<TradeTick>,
759    chunk_size: usize,
760    instrument_id: Option<InstrumentId>,
761    price_precision: u8,
762    size_precision: u8,
763    limit: Option<usize>,
764    records_processed: usize,
765}
766
767impl TradeStreamIterator {
768    /// Creates a new [`TradeStreamIterator`].
769    ///
770    /// # Errors
771    ///
772    /// Returns an error if the file cannot be opened or read.
773    pub fn new<P: AsRef<Path>>(
774        filepath: P,
775        chunk_size: usize,
776        price_precision: Option<u8>,
777        size_precision: Option<u8>,
778        instrument_id: Option<InstrumentId>,
779        limit: Option<usize>,
780    ) -> anyhow::Result<Self> {
781        let (final_price_precision, final_size_precision) =
782            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
783                // Both precisions provided, use them directly
784                (price_prec, size_prec)
785            } else {
786                // One or both precisions missing, detect only the missing ones
787                let mut reader = create_csv_reader(&filepath)?;
788                let mut record = StringRecord::new();
789                let (detected_price, detected_size) =
790                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
791                (
792                    price_precision.unwrap_or(detected_price),
793                    size_precision.unwrap_or(detected_size),
794                )
795            };
796
797        let reader = create_csv_reader(filepath)?;
798
799        Ok(Self {
800            reader,
801            record: StringRecord::new(),
802            buffer: Vec::with_capacity(chunk_size),
803            chunk_size,
804            instrument_id,
805            price_precision: final_price_precision,
806            size_precision: final_size_precision,
807            limit,
808            records_processed: 0,
809        })
810    }
811
812    fn detect_precision_from_sample(
813        reader: &mut Reader<Box<dyn std::io::Read>>,
814        record: &mut StringRecord,
815        sample_size: usize,
816    ) -> anyhow::Result<(u8, u8)> {
817        let mut max_price_precision = 2u8;
818        let mut max_size_precision = 0u8;
819        let mut records_scanned = 0;
820
821        while records_scanned < sample_size {
822            match reader.read_record(record) {
823                Ok(true) => {
824                    if let Ok(data) = record.deserialize::<TardisTradeRecord>(None) {
825                        max_price_precision = max_price_precision.max(infer_precision(data.price));
826                        max_size_precision = max_size_precision.max(infer_precision(data.amount));
827                        records_scanned += 1;
828                    }
829                }
830                Ok(false) => break,             // End of file
831                Err(_) => records_scanned += 1, // Skip malformed records
832            }
833        }
834
835        Ok((max_price_precision, max_size_precision))
836    }
837}
838
839impl Iterator for TradeStreamIterator {
840    type Item = anyhow::Result<Vec<TradeTick>>;
841
842    fn next(&mut self) -> Option<Self::Item> {
843        if let Some(limit) = self.limit
844            && self.records_processed >= limit
845        {
846            return None;
847        }
848
849        self.buffer.clear();
850        let mut records_read = 0;
851
852        while records_read < self.chunk_size {
853            match self.reader.read_record(&mut self.record) {
854                Ok(true) => match self.record.deserialize::<TardisTradeRecord>(None) {
855                    Ok(data) => {
856                        let size = Quantity::new(data.amount, self.size_precision);
857
858                        if size.is_positive() {
859                            let trade = parse_trade_record(
860                                &data,
861                                size,
862                                self.price_precision,
863                                self.instrument_id,
864                            );
865
866                            self.buffer.push(trade);
867                            records_read += 1;
868                            self.records_processed += 1;
869
870                            if let Some(limit) = self.limit
871                                && self.records_processed >= limit
872                            {
873                                break;
874                            }
875                        } else {
876                            log::warn!("Skipping zero-sized trade: {data:?}");
877                        }
878                    }
879                    Err(e) => {
880                        return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
881                    }
882                },
883                Ok(false) => {
884                    if self.buffer.is_empty() {
885                        return None;
886                    }
887                    return Some(Ok(self.buffer.clone()));
888                }
889                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
890            }
891        }
892
893        if self.buffer.is_empty() {
894            None
895        } else {
896            Some(Ok(self.buffer.clone()))
897        }
898    }
899}
900
901/// Streams [`TradeTick`]s from a Tardis format CSV at the given `filepath`,
902/// yielding chunks of the specified size.
903///
904/// # Precision Inference Warning
905///
906/// When using streaming with precision inference (not providing explicit precisions),
907/// the inferred precision may differ from bulk loading the entire file. This is because
908/// precision inference works within chunk boundaries, and different chunks may contain
909/// values with different precision requirements. For deterministic precision behavior,
910/// provide explicit `price_precision` and `size_precision` parameters.
911///
912/// # Errors
913///
914/// Returns an error if the file cannot be opened, read, or parsed as CSV.
915pub fn stream_trades<P: AsRef<Path>>(
916    filepath: P,
917    chunk_size: usize,
918    price_precision: Option<u8>,
919    size_precision: Option<u8>,
920    instrument_id: Option<InstrumentId>,
921    limit: Option<usize>,
922) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<TradeTick>>>> {
923    TradeStreamIterator::new(
924        filepath,
925        chunk_size,
926        price_precision,
927        size_precision,
928        instrument_id,
929        limit,
930    )
931}
932
933////////////////////////////////////////////////////////////////////////////////
934// Depth10 Streaming
935////////////////////////////////////////////////////////////////////////////////
936
937/// An iterator for streaming [`OrderBookDepth10`]s from a Tardis CSV file in chunks.
938struct Depth10StreamIterator {
939    reader: Reader<Box<dyn Read>>,
940    record: StringRecord,
941    buffer: Vec<OrderBookDepth10>,
942    chunk_size: usize,
943    levels: u8,
944    instrument_id: Option<InstrumentId>,
945    price_precision: u8,
946    size_precision: u8,
947    limit: Option<usize>,
948    records_processed: usize,
949}
950
951impl Depth10StreamIterator {
952    /// Creates a new [`Depth10StreamIterator`].
953    ///
954    /// # Errors
955    ///
956    /// Returns an error if the file cannot be opened or read, or if `levels` is not 5 or 25.
957    pub fn new<P: AsRef<Path>>(
958        filepath: P,
959        chunk_size: usize,
960        levels: u8,
961        price_precision: Option<u8>,
962        size_precision: Option<u8>,
963        instrument_id: Option<InstrumentId>,
964        limit: Option<usize>,
965    ) -> anyhow::Result<Self> {
966        anyhow::ensure!(
967            levels == 5 || levels == 25,
968            "Invalid levels: {levels}. Must be 5 or 25."
969        );
970
971        let (final_price_precision, final_size_precision) =
972            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
973                // Both precisions provided, use them directly
974                (price_prec, size_prec)
975            } else {
976                // One or both precisions missing, detect only the missing ones
977                let mut reader = create_csv_reader(&filepath)?;
978                let mut record = StringRecord::new();
979                let (detected_price, detected_size) =
980                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000)?;
981                (
982                    price_precision.unwrap_or(detected_price),
983                    size_precision.unwrap_or(detected_size),
984                )
985            };
986
987        let reader = create_csv_reader(filepath)?;
988
989        Ok(Self {
990            reader,
991            record: StringRecord::new(),
992            buffer: Vec::with_capacity(chunk_size),
993            chunk_size,
994            levels,
995            instrument_id,
996            price_precision: final_price_precision,
997            size_precision: final_size_precision,
998            limit,
999            records_processed: 0,
1000        })
1001    }
1002
1003    fn process_snapshot5(&mut self, data: TardisOrderBookSnapshot5Record) -> OrderBookDepth10 {
1004        let instrument_id = self
1005            .instrument_id
1006            .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
1007
1008        let mut bids = [NULL_ORDER; DEPTH10_LEN];
1009        let mut asks = [NULL_ORDER; DEPTH10_LEN];
1010        let mut bid_counts = [0_u32; DEPTH10_LEN];
1011        let mut ask_counts = [0_u32; DEPTH10_LEN];
1012
1013        // Process first 5 levels from snapshot5 data
1014        for i in 0..5 {
1015            let (bid_price, bid_amount) = match i {
1016                0 => (data.bids_0_price, data.bids_0_amount),
1017                1 => (data.bids_1_price, data.bids_1_amount),
1018                2 => (data.bids_2_price, data.bids_2_amount),
1019                3 => (data.bids_3_price, data.bids_3_amount),
1020                4 => (data.bids_4_price, data.bids_4_amount),
1021                _ => unreachable!(),
1022            };
1023
1024            let (ask_price, ask_amount) = match i {
1025                0 => (data.asks_0_price, data.asks_0_amount),
1026                1 => (data.asks_1_price, data.asks_1_amount),
1027                2 => (data.asks_2_price, data.asks_2_amount),
1028                3 => (data.asks_3_price, data.asks_3_amount),
1029                4 => (data.asks_4_price, data.asks_4_amount),
1030                _ => unreachable!(),
1031            };
1032
1033            let (bid_order, bid_count) = create_book_order(
1034                OrderSide::Buy,
1035                bid_price,
1036                bid_amount,
1037                self.price_precision,
1038                self.size_precision,
1039            );
1040            bids[i] = bid_order;
1041            bid_counts[i] = bid_count;
1042
1043            let (ask_order, ask_count) = create_book_order(
1044                OrderSide::Sell,
1045                ask_price,
1046                ask_amount,
1047                self.price_precision,
1048                self.size_precision,
1049            );
1050            asks[i] = ask_order;
1051            ask_counts[i] = ask_count;
1052        }
1053
1054        let flags = RecordFlag::F_SNAPSHOT.value();
1055        let sequence = 0;
1056        let ts_event = parse_timestamp(data.timestamp);
1057        let ts_init = parse_timestamp(data.local_timestamp);
1058
1059        OrderBookDepth10::new(
1060            instrument_id,
1061            bids,
1062            asks,
1063            bid_counts,
1064            ask_counts,
1065            flags,
1066            sequence,
1067            ts_event,
1068            ts_init,
1069        )
1070    }
1071
1072    fn process_snapshot25(&mut self, data: TardisOrderBookSnapshot25Record) -> OrderBookDepth10 {
1073        let instrument_id = self
1074            .instrument_id
1075            .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
1076
1077        let mut bids = [NULL_ORDER; DEPTH10_LEN];
1078        let mut asks = [NULL_ORDER; DEPTH10_LEN];
1079        let mut bid_counts = [0_u32; DEPTH10_LEN];
1080        let mut ask_counts = [0_u32; DEPTH10_LEN];
1081
1082        // Process first 10 levels from snapshot25 data
1083        for i in 0..DEPTH10_LEN {
1084            let (bid_price, bid_amount) = match i {
1085                0 => (data.bids_0_price, data.bids_0_amount),
1086                1 => (data.bids_1_price, data.bids_1_amount),
1087                2 => (data.bids_2_price, data.bids_2_amount),
1088                3 => (data.bids_3_price, data.bids_3_amount),
1089                4 => (data.bids_4_price, data.bids_4_amount),
1090                5 => (data.bids_5_price, data.bids_5_amount),
1091                6 => (data.bids_6_price, data.bids_6_amount),
1092                7 => (data.bids_7_price, data.bids_7_amount),
1093                8 => (data.bids_8_price, data.bids_8_amount),
1094                9 => (data.bids_9_price, data.bids_9_amount),
1095                _ => unreachable!(),
1096            };
1097
1098            let (ask_price, ask_amount) = match i {
1099                0 => (data.asks_0_price, data.asks_0_amount),
1100                1 => (data.asks_1_price, data.asks_1_amount),
1101                2 => (data.asks_2_price, data.asks_2_amount),
1102                3 => (data.asks_3_price, data.asks_3_amount),
1103                4 => (data.asks_4_price, data.asks_4_amount),
1104                5 => (data.asks_5_price, data.asks_5_amount),
1105                6 => (data.asks_6_price, data.asks_6_amount),
1106                7 => (data.asks_7_price, data.asks_7_amount),
1107                8 => (data.asks_8_price, data.asks_8_amount),
1108                9 => (data.asks_9_price, data.asks_9_amount),
1109                _ => unreachable!(),
1110            };
1111
1112            let (bid_order, bid_count) = create_book_order(
1113                OrderSide::Buy,
1114                bid_price,
1115                bid_amount,
1116                self.price_precision,
1117                self.size_precision,
1118            );
1119            bids[i] = bid_order;
1120            bid_counts[i] = bid_count;
1121
1122            let (ask_order, ask_count) = create_book_order(
1123                OrderSide::Sell,
1124                ask_price,
1125                ask_amount,
1126                self.price_precision,
1127                self.size_precision,
1128            );
1129            asks[i] = ask_order;
1130            ask_counts[i] = ask_count;
1131        }
1132
1133        let flags = RecordFlag::F_SNAPSHOT.value();
1134        let sequence = 0;
1135        let ts_event = parse_timestamp(data.timestamp);
1136        let ts_init = parse_timestamp(data.local_timestamp);
1137
1138        OrderBookDepth10::new(
1139            instrument_id,
1140            bids,
1141            asks,
1142            bid_counts,
1143            ask_counts,
1144            flags,
1145            sequence,
1146            ts_event,
1147            ts_init,
1148        )
1149    }
1150
1151    fn detect_precision_from_sample(
1152        reader: &mut Reader<Box<dyn std::io::Read>>,
1153        record: &mut StringRecord,
1154        sample_size: usize,
1155    ) -> anyhow::Result<(u8, u8)> {
1156        let mut max_price_precision = 2u8;
1157        let mut max_size_precision = 0u8;
1158        let mut records_scanned = 0;
1159
1160        while records_scanned < sample_size {
1161            match reader.read_record(record) {
1162                Ok(true) => {
1163                    // Try to deserialize as snapshot5 record first
1164                    if let Ok(data) = record.deserialize::<TardisOrderBookSnapshot5Record>(None) {
1165                        if let Some(bid_price) = data.bids_0_price {
1166                            max_price_precision =
1167                                max_price_precision.max(infer_precision(bid_price));
1168                        }
1169                        if let Some(ask_price) = data.asks_0_price {
1170                            max_price_precision =
1171                                max_price_precision.max(infer_precision(ask_price));
1172                        }
1173                        if let Some(bid_amount) = data.bids_0_amount {
1174                            max_size_precision =
1175                                max_size_precision.max(infer_precision(bid_amount));
1176                        }
1177                        if let Some(ask_amount) = data.asks_0_amount {
1178                            max_size_precision =
1179                                max_size_precision.max(infer_precision(ask_amount));
1180                        }
1181                        records_scanned += 1;
1182                    } else if let Ok(data) =
1183                        record.deserialize::<TardisOrderBookSnapshot25Record>(None)
1184                    {
1185                        if let Some(bid_price) = data.bids_0_price {
1186                            max_price_precision =
1187                                max_price_precision.max(infer_precision(bid_price));
1188                        }
1189                        if let Some(ask_price) = data.asks_0_price {
1190                            max_price_precision =
1191                                max_price_precision.max(infer_precision(ask_price));
1192                        }
1193                        if let Some(bid_amount) = data.bids_0_amount {
1194                            max_size_precision =
1195                                max_size_precision.max(infer_precision(bid_amount));
1196                        }
1197                        if let Some(ask_amount) = data.asks_0_amount {
1198                            max_size_precision =
1199                                max_size_precision.max(infer_precision(ask_amount));
1200                        }
1201                        records_scanned += 1;
1202                    }
1203                }
1204                Ok(false) => break,             // End of file
1205                Err(_) => records_scanned += 1, // Skip malformed records
1206            }
1207        }
1208
1209        Ok((max_price_precision, max_size_precision))
1210    }
1211}
1212
1213impl Iterator for Depth10StreamIterator {
1214    type Item = anyhow::Result<Vec<OrderBookDepth10>>;
1215
1216    fn next(&mut self) -> Option<Self::Item> {
1217        if let Some(limit) = self.limit
1218            && self.records_processed >= limit
1219        {
1220            return None;
1221        }
1222
1223        if !self.buffer.is_empty() {
1224            let chunk = self.buffer.split_off(0);
1225            return Some(Ok(chunk));
1226        }
1227
1228        self.buffer.clear();
1229        let mut records_read = 0;
1230
1231        while records_read < self.chunk_size {
1232            match self.reader.read_record(&mut self.record) {
1233                Ok(true) => {
1234                    let result = match self.levels {
1235                        5 => self
1236                            .record
1237                            .deserialize::<TardisOrderBookSnapshot5Record>(None)
1238                            .map(|data| self.process_snapshot5(data)),
1239                        25 => self
1240                            .record
1241                            .deserialize::<TardisOrderBookSnapshot25Record>(None)
1242                            .map(|data| self.process_snapshot25(data)),
1243                        _ => return Some(Err(anyhow::anyhow!("Invalid levels: {}", self.levels))),
1244                    };
1245
1246                    match result {
1247                        Ok(depth) => {
1248                            self.buffer.push(depth);
1249                            records_read += 1;
1250                            self.records_processed += 1;
1251
1252                            if let Some(limit) = self.limit
1253                                && self.records_processed >= limit
1254                            {
1255                                break;
1256                            }
1257                        }
1258                        Err(e) => {
1259                            return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
1260                        }
1261                    }
1262                }
1263                Ok(false) => {
1264                    if self.buffer.is_empty() {
1265                        return None;
1266                    }
1267                    let chunk = self.buffer.split_off(0);
1268                    return Some(Ok(chunk));
1269                }
1270                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
1271            }
1272        }
1273
1274        if self.buffer.is_empty() {
1275            None
1276        } else {
1277            let chunk = self.buffer.split_off(0);
1278            Some(Ok(chunk))
1279        }
1280    }
1281}
1282
1283/// Streams [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
1284/// yielding chunks of the specified size.
1285///
1286/// # Precision Inference Warning
1287///
1288/// When using streaming with precision inference (not providing explicit precisions),
1289/// the inferred precision may differ from bulk loading the entire file. This is because
1290/// precision inference works within chunk boundaries, and different chunks may contain
1291/// values with different precision requirements. For deterministic precision behavior,
1292/// provide explicit `price_precision` and `size_precision` parameters.
1293///
1294/// # Errors
1295///
1296/// Returns an error if the file cannot be opened, read, or parsed as CSV.
1297pub fn stream_depth10_from_snapshot5<P: AsRef<Path>>(
1298    filepath: P,
1299    chunk_size: usize,
1300    price_precision: Option<u8>,
1301    size_precision: Option<u8>,
1302    instrument_id: Option<InstrumentId>,
1303    limit: Option<usize>,
1304) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDepth10>>>> {
1305    Depth10StreamIterator::new(
1306        filepath,
1307        chunk_size,
1308        5,
1309        price_precision,
1310        size_precision,
1311        instrument_id,
1312        limit,
1313    )
1314}
1315
1316/// Streams [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
1317/// yielding chunks of the specified size.
1318///
1319/// # Precision Inference Warning
1320///
1321/// When using streaming with precision inference (not providing explicit precisions),
1322/// the inferred precision may differ from bulk loading the entire file. This is because
1323/// precision inference works within chunk boundaries, and different chunks may contain
1324/// values with different precision requirements. For deterministic precision behavior,
1325/// provide explicit `price_precision` and `size_precision` parameters.
1326///
1327/// # Errors
1328///
1329/// Returns an error if the file cannot be opened, read, or parsed as CSV.
1330pub fn stream_depth10_from_snapshot25<P: AsRef<Path>>(
1331    filepath: P,
1332    chunk_size: usize,
1333    price_precision: Option<u8>,
1334    size_precision: Option<u8>,
1335    instrument_id: Option<InstrumentId>,
1336    limit: Option<usize>,
1337) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDepth10>>>> {
1338    Depth10StreamIterator::new(
1339        filepath,
1340        chunk_size,
1341        25,
1342        price_precision,
1343        size_precision,
1344        instrument_id,
1345        limit,
1346    )
1347}
1348
1349////////////////////////////////////////////////////////////////////////////////
1350// FundingRateUpdate Streaming
1351////////////////////////////////////////////////////////////////////////////////
1352
1353use nautilus_model::data::FundingRateUpdate;
1354
1355use crate::csv::record::TardisDerivativeTickerRecord;
1356
1357/// An iterator for streaming [`FundingRateUpdate`]s from a Tardis CSV file in chunks.
1358struct FundingRateStreamIterator {
1359    reader: Reader<Box<dyn Read>>,
1360    record: StringRecord,
1361    buffer: Vec<FundingRateUpdate>,
1362    chunk_size: usize,
1363    instrument_id: Option<InstrumentId>,
1364    limit: Option<usize>,
1365    records_processed: usize,
1366}
1367
1368impl FundingRateStreamIterator {
1369    /// Creates a new [`FundingRateStreamIterator`].
1370    ///
1371    /// # Errors
1372    ///
1373    /// Returns an error if the file cannot be opened or read.
1374    fn new<P: AsRef<Path>>(
1375        filepath: P,
1376        chunk_size: usize,
1377        instrument_id: Option<InstrumentId>,
1378        limit: Option<usize>,
1379    ) -> anyhow::Result<Self> {
1380        let reader = create_csv_reader(filepath)?;
1381
1382        Ok(Self {
1383            reader,
1384            record: StringRecord::new(),
1385            buffer: Vec::with_capacity(chunk_size),
1386            chunk_size,
1387            instrument_id,
1388            limit,
1389            records_processed: 0,
1390        })
1391    }
1392}
1393
1394impl Iterator for FundingRateStreamIterator {
1395    type Item = anyhow::Result<Vec<FundingRateUpdate>>;
1396
1397    fn next(&mut self) -> Option<Self::Item> {
1398        if let Some(limit) = self.limit
1399            && self.records_processed >= limit
1400        {
1401            return None;
1402        }
1403
1404        if !self.buffer.is_empty() {
1405            let chunk = self.buffer.split_off(0);
1406            return Some(Ok(chunk));
1407        }
1408
1409        self.buffer.clear();
1410        let mut records_read = 0;
1411
1412        while records_read < self.chunk_size {
1413            match self.reader.read_record(&mut self.record) {
1414                Ok(true) => {
1415                    let result = self
1416                        .record
1417                        .deserialize::<TardisDerivativeTickerRecord>(None)
1418                        .map_err(anyhow::Error::from)
1419                        .map(|data| parse_derivative_ticker_record(&data, self.instrument_id));
1420
1421                    match result {
1422                        Ok(Some(funding_rate)) => {
1423                            self.buffer.push(funding_rate);
1424                            records_read += 1;
1425                            self.records_processed += 1;
1426
1427                            if let Some(limit) = self.limit
1428                                && self.records_processed >= limit
1429                            {
1430                                break;
1431                            }
1432                        }
1433                        Ok(None) => {
1434                            // Skip this record as it has no funding data
1435                            self.records_processed += 1;
1436                        }
1437                        Err(e) => {
1438                            return Some(Err(anyhow::anyhow!(
1439                                "Failed to parse funding rate record: {e}"
1440                            )));
1441                        }
1442                    }
1443                }
1444                Ok(false) => {
1445                    if self.buffer.is_empty() {
1446                        return None;
1447                    }
1448                    let chunk = self.buffer.split_off(0);
1449                    return Some(Ok(chunk));
1450                }
1451                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
1452            }
1453        }
1454
1455        if self.buffer.is_empty() {
1456            None
1457        } else {
1458            let chunk = self.buffer.split_off(0);
1459            Some(Ok(chunk))
1460        }
1461    }
1462}
1463
1464/// Streams [`FundingRateUpdate`]s from a Tardis derivative ticker CSV file,
1465/// yielding chunks of the specified size.
1466///
1467/// This function parses the `funding_rate`, `predicted_funding_rate`, and `funding_timestamp`
1468/// fields from derivative ticker data to create funding rate updates.
1469///
1470/// # Errors
1471///
1472/// Returns an error if the file cannot be opened, read, or parsed as CSV.
1473pub fn stream_funding_rates<P: AsRef<Path>>(
1474    filepath: P,
1475    chunk_size: usize,
1476    instrument_id: Option<InstrumentId>,
1477    limit: Option<usize>,
1478) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<FundingRateUpdate>>>> {
1479    FundingRateStreamIterator::new(filepath, chunk_size, instrument_id, limit)
1480}
1481
1482#[cfg(test)]
1483mod tests {
1484    use nautilus_model::{
1485        enums::{AggressorSide, BookAction},
1486        identifiers::TradeId,
1487        types::Price,
1488    };
1489    use rstest::*;
1490
1491    use super::*;
1492    use crate::{common::testing::get_test_data_path, csv::load::load_deltas, parse::parse_price};
1493
1494    #[rstest]
1495    #[case(0.0, 0)]
1496    #[case(42.0, 0)]
1497    #[case(0.1, 1)]
1498    #[case(0.25, 2)]
1499    #[case(123.0001, 4)]
1500    #[case(-42.987654321,       9)]
1501    #[case(1.234_567_890_123, 12)]
1502    fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
1503        assert_eq!(infer_precision(input), expected);
1504    }
1505
1506    #[rstest]
1507    pub fn test_stream_deltas_chunked() {
1508        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1509binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
1510binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
1511binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
1512binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
1513binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
1514
1515        let temp_file = std::env::temp_dir().join("test_stream_deltas.csv");
1516        std::fs::write(&temp_file, csv_data).unwrap();
1517
1518        let stream = stream_deltas(&temp_file, 2, Some(4), Some(1), None, None).unwrap();
1519        let chunks: Vec<_> = stream.collect();
1520
1521        // 5 data rows + 1 CLEAR = 6 deltas, in chunks of 2
1522        assert_eq!(chunks.len(), 3);
1523
1524        let chunk1 = chunks[0].as_ref().unwrap();
1525        assert_eq!(chunk1.len(), 2);
1526        assert_eq!(chunk1[0].action, BookAction::Clear); // CLEAR first
1527        assert_eq!(chunk1[1].order.price.precision, 4); // First data delta
1528
1529        let chunk2 = chunks[1].as_ref().unwrap();
1530        assert_eq!(chunk2.len(), 2);
1531        assert_eq!(chunk2[0].order.price.precision, 4);
1532        assert_eq!(chunk2[1].order.price.precision, 4);
1533
1534        let chunk3 = chunks[2].as_ref().unwrap();
1535        assert_eq!(chunk3.len(), 2);
1536        assert_eq!(chunk3[0].order.price.precision, 4);
1537        assert_eq!(chunk3[1].order.price.precision, 4);
1538
1539        let total_deltas: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1540        assert_eq!(total_deltas, 6);
1541
1542        std::fs::remove_file(&temp_file).ok();
1543    }
1544
1545    #[cfg(feature = "python")]
1546    #[rstest]
1547    pub fn test_stream_batched_deltas_clear_and_limit() {
1548        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1549binance,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
1550binance,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
1551binance,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
1552binance,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
1553binance,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
1554
1555        let temp_file = std::env::temp_dir().join("test_stream_batched_deltas.csv");
1556        std::fs::write(&temp_file, csv_data).unwrap();
1557
1558        // limit=1 should return only the synthetic CLEAR delta
1559        let mut iterator =
1560            BatchedDeltasStreamIterator::new(&temp_file, 10, Some(4), Some(1), None, Some(1))
1561                .unwrap();
1562        iterator.fill_pending_batches().transpose().unwrap();
1563        assert_eq!(iterator.pending_batches.len(), 1);
1564        assert_eq!(iterator.pending_batches[0].len(), 1);
1565        assert_eq!(iterator.pending_batches[0][0].action, BookAction::Clear);
1566
1567        // No limit should return all batches (first batch starts with CLEAR)
1568        let mut iterator =
1569            BatchedDeltasStreamIterator::new(&temp_file, 10, Some(4), Some(1), None, None).unwrap();
1570        iterator.fill_pending_batches().transpose().unwrap();
1571        assert_eq!(iterator.pending_batches.len(), 5);
1572        assert_eq!(iterator.pending_batches[0].len(), 2);
1573        assert_eq!(iterator.pending_batches[0][0].action, BookAction::Clear);
1574        assert_ne!(iterator.pending_batches[0][1].action, BookAction::Clear);
1575        let total_deltas: usize = iterator
1576            .pending_batches
1577            .iter()
1578            .map(|batch| batch.len())
1579            .sum();
1580        assert_eq!(total_deltas, 6);
1581
1582        std::fs::remove_file(&temp_file).ok();
1583    }
1584
1585    #[cfg(feature = "python")]
1586    #[rstest]
1587    pub fn test_stream_batched_deltas_with_mid_snapshot_inserts_clear() {
1588        // CSV with:
1589        // - Initial snapshot (is_snapshot=true) at start
1590        // - Some deltas (is_snapshot=false)
1591        // - Mid-day snapshot (is_snapshot=true) - should trigger CLEAR
1592        // - Back to deltas (is_snapshot=false)
1593        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1594binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1595binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
1596binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
1597binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
1598binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,bid,50100.0,3.0
1599binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,ask,50101.0,4.0
1600binance-futures,BTCUSDT,1640995301000000,1640995301100000,false,bid,50099.0,1.0";
1601
1602        let temp_file = std::env::temp_dir().join("test_stream_batched_mid_snapshot.csv");
1603        std::fs::write(&temp_file, csv_data).unwrap();
1604
1605        let mut iterator =
1606            BatchedDeltasStreamIterator::new(&temp_file, 100, Some(1), Some(1), None, None)
1607                .unwrap();
1608        iterator.fill_pending_batches().transpose().unwrap();
1609
1610        let all_deltas: Vec<_> = iterator.pending_batches.iter().flatten().collect();
1611        let clear_count = all_deltas
1612            .iter()
1613            .filter(|d| d.action == BookAction::Clear)
1614            .count();
1615
1616        // Should have 2 CLEAR deltas: initial snapshot + mid-day snapshot
1617        assert_eq!(
1618            clear_count, 2,
1619            "Expected 2 CLEAR deltas (initial + mid-day snapshot), found {clear_count}"
1620        );
1621
1622        // Verify CLEAR positions:
1623        // 0=CLEAR, 1=Add, 2=Add, 3=Update, 4=Update, 5=CLEAR, 6=Add, 7=Add, 8=Update
1624        assert_eq!(all_deltas[0].action, BookAction::Clear);
1625        assert_eq!(all_deltas[5].action, BookAction::Clear);
1626
1627        // CLEAR deltas should NOT have F_LAST when followed by same-timestamp deltas
1628        assert_eq!(
1629            all_deltas[0].flags & RecordFlag::F_LAST.value(),
1630            0,
1631            "CLEAR at index 0 should not have F_LAST flag"
1632        );
1633        assert_eq!(
1634            all_deltas[5].flags & RecordFlag::F_LAST.value(),
1635            0,
1636            "CLEAR at index 5 should not have F_LAST flag"
1637        );
1638
1639        std::fs::remove_file(&temp_file).ok();
1640    }
1641
1642    #[cfg(feature = "python")]
1643    #[rstest]
1644    pub fn test_stream_batched_deltas_limit_includes_clear() {
1645        // Test that limit counts total emitted deltas (including CLEARs)
1646        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1647binance,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1648binance,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
1649binance,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
1650binance,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5
1651binance,BTCUSDT,1640995204000000,1640995204100000,false,ask,50003.0,1.0";
1652
1653        let temp_file = std::env::temp_dir().join("test_stream_batched_limit_includes_clear.csv");
1654        std::fs::write(&temp_file, csv_data).unwrap();
1655
1656        let mut iterator =
1657            BatchedDeltasStreamIterator::new(&temp_file, 100, Some(1), Some(1), None, Some(4))
1658                .unwrap();
1659        iterator.fill_pending_batches().transpose().unwrap();
1660
1661        let all_deltas: Vec<_> = iterator.pending_batches.iter().flatten().collect();
1662
1663        // limit=4 should get exactly 4 deltas: 1 CLEAR + 3 data deltas
1664        assert_eq!(all_deltas.len(), 4);
1665        assert_eq!(all_deltas[0].action, BookAction::Clear);
1666        assert_eq!(all_deltas[1].action, BookAction::Add);
1667        assert_eq!(all_deltas[2].action, BookAction::Update);
1668        assert_eq!(all_deltas[3].action, BookAction::Update);
1669
1670        std::fs::remove_file(&temp_file).ok();
1671    }
1672
1673    #[cfg(feature = "python")]
1674    #[rstest]
1675    pub fn test_stream_batched_deltas_limit_sets_f_last() {
1676        // Test that F_LAST is set on the final delta when limit is reached
1677        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1678binance,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1679binance,BTCUSDT,1640995201000000,1640995201100000,false,ask,50001.0,2.0
1680binance,BTCUSDT,1640995202000000,1640995202100000,false,bid,49999.0,0.5
1681binance,BTCUSDT,1640995203000000,1640995203100000,false,ask,50002.0,1.5";
1682
1683        let temp_file = std::env::temp_dir().join("test_stream_batched_limit_f_last.csv");
1684        std::fs::write(&temp_file, csv_data).unwrap();
1685
1686        // limit=3 should get 3 deltas with F_LAST on the last one
1687        let mut iterator =
1688            BatchedDeltasStreamIterator::new(&temp_file, 100, Some(1), Some(1), None, Some(3))
1689                .unwrap();
1690        iterator.fill_pending_batches().transpose().unwrap();
1691
1692        let all_deltas: Vec<_> = iterator.pending_batches.iter().flatten().collect();
1693
1694        assert_eq!(all_deltas.len(), 3);
1695        assert_eq!(
1696            all_deltas[2].flags & RecordFlag::F_LAST.value(),
1697            RecordFlag::F_LAST.value(),
1698            "Final delta should have F_LAST flag when limit is reached"
1699        );
1700
1701        std::fs::remove_file(&temp_file).ok();
1702    }
1703
1704    #[cfg(feature = "python")]
1705    #[rstest]
1706    pub fn test_stream_batched_deltas_snapshot_batch_flags() {
1707        // Test that CLEAR is first in batch and only the last delta has F_LAST
1708        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1709binance,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1710binance,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
1711binance,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5";
1712
1713        let temp_file = std::env::temp_dir().join("test_stream_batched_snapshot_batch_flags.csv");
1714        std::fs::write(&temp_file, csv_data).unwrap();
1715
1716        let mut iterator =
1717            BatchedDeltasStreamIterator::new(&temp_file, 100, Some(1), Some(1), None, None)
1718                .unwrap();
1719        iterator.fill_pending_batches().transpose().unwrap();
1720
1721        assert_eq!(iterator.pending_batches.len(), 2);
1722        let first_batch = &iterator.pending_batches[0];
1723
1724        // First batch contains CLEAR + 2 snapshot deltas
1725        assert_eq!(first_batch.len(), 3);
1726        assert_eq!(first_batch[0].action, BookAction::Clear);
1727        assert_eq!(first_batch[0].flags & RecordFlag::F_LAST.value(), 0);
1728        assert_eq!(first_batch[1].flags & RecordFlag::F_LAST.value(), 0);
1729        assert_eq!(
1730            first_batch[2].flags & RecordFlag::F_LAST.value(),
1731            RecordFlag::F_LAST.value()
1732        );
1733
1734        // Second batch should have F_LAST set (end of file)
1735        assert_eq!(iterator.pending_batches[1].len(), 1);
1736        assert_eq!(
1737            iterator.pending_batches[1][0].flags & RecordFlag::F_LAST.value(),
1738            RecordFlag::F_LAST.value()
1739        );
1740
1741        std::fs::remove_file(&temp_file).ok();
1742    }
1743
1744    #[rstest]
1745    pub fn test_stream_quotes_chunked() {
1746        let csv_data =
1747            "exchange,symbol,timestamp,local_timestamp,ask_amount,ask_price,bid_price,bid_amount
1748binance,BTCUSDT,1640995200000000,1640995200100000,1.0,50000.0,49999.0,1.5
1749binance,BTCUSDT,1640995201000000,1640995201100000,2.0,50000.5,49999.5,2.5
1750binance,BTCUSDT,1640995202000000,1640995202100000,1.5,50000.12,49999.12,1.8
1751binance,BTCUSDT,1640995203000000,1640995203100000,3.0,50000.123,49999.123,3.2
1752binance,BTCUSDT,1640995204000000,1640995204100000,0.5,50000.1234,49999.1234,0.8";
1753
1754        let temp_file = std::env::temp_dir().join("test_stream_quotes.csv");
1755        std::fs::write(&temp_file, csv_data).unwrap();
1756
1757        let stream = stream_quotes(&temp_file, 2, Some(4), Some(1), None, None).unwrap();
1758        let chunks: Vec<_> = stream.collect();
1759
1760        assert_eq!(chunks.len(), 3);
1761
1762        let chunk1 = chunks[0].as_ref().unwrap();
1763        assert_eq!(chunk1.len(), 2);
1764        assert_eq!(chunk1[0].bid_price.precision, 4);
1765        assert_eq!(chunk1[1].bid_price.precision, 4);
1766
1767        let chunk2 = chunks[1].as_ref().unwrap();
1768        assert_eq!(chunk2.len(), 2);
1769        assert_eq!(chunk2[0].bid_price.precision, 4);
1770        assert_eq!(chunk2[1].bid_price.precision, 4);
1771
1772        let chunk3 = chunks[2].as_ref().unwrap();
1773        assert_eq!(chunk3.len(), 1);
1774        assert_eq!(chunk3[0].bid_price.precision, 4);
1775
1776        let total_quotes: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1777        assert_eq!(total_quotes, 5);
1778
1779        std::fs::remove_file(&temp_file).ok();
1780    }
1781
1782    #[rstest]
1783    pub fn test_stream_trades_chunked() {
1784        let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1785binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1786binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,2.0
1787binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1788binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0
1789binance,BTCUSDT,1640995204000000,1640995204100000,trade5,buy,50000.1234,0.5";
1790
1791        let temp_file = std::env::temp_dir().join("test_stream_trades.csv");
1792        std::fs::write(&temp_file, csv_data).unwrap();
1793
1794        let stream = stream_trades(&temp_file, 3, Some(4), Some(1), None, None).unwrap();
1795        let chunks: Vec<_> = stream.collect();
1796
1797        assert_eq!(chunks.len(), 2);
1798
1799        let chunk1 = chunks[0].as_ref().unwrap();
1800        assert_eq!(chunk1.len(), 3);
1801        assert_eq!(chunk1[0].price.precision, 4);
1802        assert_eq!(chunk1[1].price.precision, 4);
1803        assert_eq!(chunk1[2].price.precision, 4);
1804
1805        let chunk2 = chunks[1].as_ref().unwrap();
1806        assert_eq!(chunk2.len(), 2);
1807        assert_eq!(chunk2[0].price.precision, 4);
1808        assert_eq!(chunk2[1].price.precision, 4);
1809
1810        assert_eq!(chunk1[0].aggressor_side, AggressorSide::Buyer);
1811        assert_eq!(chunk1[1].aggressor_side, AggressorSide::Seller);
1812
1813        let total_trades: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1814        assert_eq!(total_trades, 5);
1815
1816        std::fs::remove_file(&temp_file).ok();
1817    }
1818
1819    #[rstest]
1820    pub fn test_stream_trades_with_zero_sized_trade() {
1821        // Test CSV data with one zero-sized trade that should be skipped
1822        let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1823binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1824binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,0.0
1825binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1826binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0";
1827
1828        let temp_file = std::env::temp_dir().join("test_stream_trades_zero_size.csv");
1829        std::fs::write(&temp_file, csv_data).unwrap();
1830
1831        let stream = stream_trades(&temp_file, 3, Some(4), Some(1), None, None).unwrap();
1832        let chunks: Vec<_> = stream.collect();
1833
1834        // Should have 1 chunk with 3 valid trades (zero-sized trade skipped)
1835        assert_eq!(chunks.len(), 1);
1836
1837        let chunk1 = chunks[0].as_ref().unwrap();
1838        assert_eq!(chunk1.len(), 3);
1839
1840        // Verify the trades are the correct ones (not the zero-sized one)
1841        assert_eq!(chunk1[0].size, Quantity::from("1.0"));
1842        assert_eq!(chunk1[1].size, Quantity::from("1.5"));
1843        assert_eq!(chunk1[2].size, Quantity::from("3.0"));
1844
1845        // Verify trade IDs to confirm correct trades were loaded
1846        assert_eq!(chunk1[0].trade_id, TradeId::new("trade1"));
1847        assert_eq!(chunk1[1].trade_id, TradeId::new("trade3"));
1848        assert_eq!(chunk1[2].trade_id, TradeId::new("trade4"));
1849
1850        std::fs::remove_file(&temp_file).ok();
1851    }
1852
1853    #[rstest]
1854    pub fn test_stream_depth10_from_snapshot5_chunked() {
1855        let csv_data = "exchange,symbol,timestamp,local_timestamp,asks[0].price,asks[0].amount,bids[0].price,bids[0].amount,asks[1].price,asks[1].amount,bids[1].price,bids[1].amount,asks[2].price,asks[2].amount,bids[2].price,bids[2].amount,asks[3].price,asks[3].amount,bids[3].price,bids[3].amount,asks[4].price,asks[4].amount,bids[4].price,bids[4].amount
1856binance,BTCUSDT,1640995200000000,1640995200100000,50001.0,1.0,49999.0,1.5,50002.0,2.0,49998.0,2.5,50003.0,3.0,49997.0,3.5,50004.0,4.0,49996.0,4.5,50005.0,5.0,49995.0,5.5
1857binance,BTCUSDT,1640995201000000,1640995201100000,50001.5,1.1,49999.5,1.6,50002.5,2.1,49998.5,2.6,50003.5,3.1,49997.5,3.6,50004.5,4.1,49996.5,4.6,50005.5,5.1,49995.5,5.6
1858binance,BTCUSDT,1640995202000000,1640995202100000,50001.12,1.12,49999.12,1.62,50002.12,2.12,49998.12,2.62,50003.12,3.12,49997.12,3.62,50004.12,4.12,49996.12,4.62,50005.12,5.12,49995.12,5.62";
1859
1860        // Write to temporary file
1861        let temp_file = std::env::temp_dir().join("test_stream_depth10_snapshot5.csv");
1862        std::fs::write(&temp_file, csv_data).unwrap();
1863
1864        // Stream with chunk size of 2
1865        let stream = stream_depth10_from_snapshot5(&temp_file, 2, None, None, None, None).unwrap();
1866        let chunks: Vec<_> = stream.collect();
1867
1868        // Should have 2 chunks: [2 items, 1 item]
1869        assert_eq!(chunks.len(), 2);
1870
1871        // First chunk: 2 depth snapshots
1872        let chunk1 = chunks[0].as_ref().unwrap();
1873        assert_eq!(chunk1.len(), 2);
1874
1875        // Second chunk: 1 depth snapshot
1876        let chunk2 = chunks[1].as_ref().unwrap();
1877        assert_eq!(chunk2.len(), 1);
1878
1879        // Verify depth structure
1880        let first_depth = &chunk1[0];
1881        assert_eq!(first_depth.bids.len(), 10); // Should have 10 levels
1882        assert_eq!(first_depth.asks.len(), 10);
1883
1884        // Verify some specific prices
1885        assert_eq!(first_depth.bids[0].price, parse_price(49999.0, 1));
1886        assert_eq!(first_depth.asks[0].price, parse_price(50001.0, 1));
1887
1888        // Verify total count
1889        let total_depths: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1890        assert_eq!(total_depths, 3);
1891
1892        // Clean up
1893        std::fs::remove_file(&temp_file).ok();
1894    }
1895
1896    #[rstest]
1897    pub fn test_stream_depth10_from_snapshot25_chunked() {
1898        // Create minimal snapshot25 CSV data (first 10 levels only for testing)
1899        let mut header_parts = vec!["exchange", "symbol", "timestamp", "local_timestamp"];
1900
1901        // Add bid and ask levels (we'll only populate first few for testing)
1902        let mut bid_headers = Vec::new();
1903        let mut ask_headers = Vec::new();
1904        for i in 0..25 {
1905            bid_headers.push(format!("bids[{i}].price"));
1906            bid_headers.push(format!("bids[{i}].amount"));
1907        }
1908        for i in 0..25 {
1909            ask_headers.push(format!("asks[{i}].price"));
1910            ask_headers.push(format!("asks[{i}].amount"));
1911        }
1912
1913        for header in &bid_headers {
1914            header_parts.push(header);
1915        }
1916        for header in &ask_headers {
1917            header_parts.push(header);
1918        }
1919
1920        let header = header_parts.join(",");
1921
1922        // Create a row with data for first 5 levels (rest will be empty)
1923        let mut row1_parts = vec![
1924            "binance".to_string(),
1925            "BTCUSDT".to_string(),
1926            "1640995200000000".to_string(),
1927            "1640995200100000".to_string(),
1928        ];
1929
1930        // Add bid levels (first 5 with data, rest empty)
1931        for i in 0..25 {
1932            if i < 5 {
1933                let bid_price = f64::from(i).mul_add(-0.01, 49999.0);
1934                let bid_amount = 1.0 + f64::from(i);
1935                row1_parts.push(bid_price.to_string());
1936                row1_parts.push(bid_amount.to_string());
1937            } else {
1938                row1_parts.push(String::new());
1939                row1_parts.push(String::new());
1940            }
1941        }
1942
1943        // Add ask levels (first 5 with data, rest empty)
1944        for i in 0..25 {
1945            if i < 5 {
1946                let ask_price = f64::from(i).mul_add(0.01, 50000.0);
1947                let ask_amount = 1.0 + f64::from(i);
1948                row1_parts.push(ask_price.to_string());
1949                row1_parts.push(ask_amount.to_string());
1950            } else {
1951                row1_parts.push(String::new());
1952                row1_parts.push(String::new());
1953            }
1954        }
1955
1956        let csv_data = format!("{}\n{}", header, row1_parts.join(","));
1957
1958        // Write to temporary file
1959        let temp_file = std::env::temp_dir().join("test_stream_depth10_snapshot25.csv");
1960        std::fs::write(&temp_file, &csv_data).unwrap();
1961
1962        // Stream with chunk size of 1
1963        let stream = stream_depth10_from_snapshot25(&temp_file, 1, None, None, None, None).unwrap();
1964        let chunks: Vec<_> = stream.collect();
1965
1966        // Should have 1 chunk with 1 item
1967        assert_eq!(chunks.len(), 1);
1968
1969        let chunk1 = chunks[0].as_ref().unwrap();
1970        assert_eq!(chunk1.len(), 1);
1971
1972        // Verify depth structure
1973        let depth = &chunk1[0];
1974        assert_eq!(depth.bids.len(), 10); // Should have 10 levels
1975        assert_eq!(depth.asks.len(), 10);
1976
1977        // Verify first level has data - check whatever we actually get
1978        let actual_bid_price = depth.bids[0].price;
1979        let actual_ask_price = depth.asks[0].price;
1980        assert!(actual_bid_price.as_f64() > 0.0);
1981        assert!(actual_ask_price.as_f64() > 0.0);
1982
1983        // Clean up
1984        std::fs::remove_file(&temp_file).ok();
1985    }
1986
1987    #[rstest]
1988    pub fn test_stream_error_handling() {
1989        // Test with non-existent file
1990        let non_existent = std::path::Path::new("does_not_exist.csv");
1991
1992        let result = stream_deltas(non_existent, 10, None, None, None, None);
1993        assert!(result.is_err());
1994
1995        let result = stream_quotes(non_existent, 10, None, None, None, None);
1996        assert!(result.is_err());
1997
1998        let result = stream_trades(non_existent, 10, None, None, None, None);
1999        assert!(result.is_err());
2000
2001        let result = stream_depth10_from_snapshot5(non_existent, 10, None, None, None, None);
2002        assert!(result.is_err());
2003
2004        let result = stream_depth10_from_snapshot25(non_existent, 10, None, None, None, None);
2005        assert!(result.is_err());
2006    }
2007
2008    #[rstest]
2009    pub fn test_stream_empty_file() {
2010        // Test with empty CSV file
2011        let temp_file = std::env::temp_dir().join("test_empty.csv");
2012        std::fs::write(&temp_file, "").unwrap();
2013
2014        let stream = stream_deltas(&temp_file, 10, None, None, None, None).unwrap();
2015        assert_eq!(stream.count(), 0);
2016
2017        // Clean up
2018        std::fs::remove_file(&temp_file).ok();
2019    }
2020
2021    #[rstest]
2022    pub fn test_stream_precision_consistency() {
2023        // Test that streaming produces same results as bulk loading for precision inference
2024        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2025binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
2026binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
2027binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
2028binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0";
2029
2030        let temp_file = std::env::temp_dir().join("test_precision_consistency.csv");
2031        std::fs::write(&temp_file, csv_data).unwrap();
2032
2033        // Load all at once
2034        let bulk_deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
2035
2036        // Stream in chunks and collect
2037        let stream = stream_deltas(&temp_file, 2, None, None, None, None).unwrap();
2038        let streamed_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2039
2040        // Should have same number of deltas
2041        assert_eq!(bulk_deltas.len(), streamed_deltas.len());
2042
2043        // Compare key properties (precision inference will be different due to chunking)
2044        for (bulk, streamed) in bulk_deltas.iter().zip(streamed_deltas.iter()) {
2045            assert_eq!(bulk.instrument_id, streamed.instrument_id);
2046            assert_eq!(bulk.action, streamed.action);
2047            assert_eq!(bulk.order.side, streamed.order.side);
2048            assert_eq!(bulk.ts_event, streamed.ts_event);
2049            assert_eq!(bulk.ts_init, streamed.ts_init);
2050            // Note: precision may differ between bulk and streaming due to chunk boundaries
2051        }
2052
2053        // Clean up
2054        std::fs::remove_file(&temp_file).ok();
2055    }
2056
2057    #[rstest]
2058    pub fn test_stream_trades_from_local_file() {
2059        let filepath = get_test_data_path("csv/trades_1.csv");
2060        let mut stream = stream_trades(filepath, 1, Some(1), Some(0), None, None).unwrap();
2061
2062        let chunk1 = stream.next().unwrap().unwrap();
2063        assert_eq!(chunk1.len(), 1);
2064        assert_eq!(chunk1[0].price, Price::from("8531.5"));
2065
2066        let chunk2 = stream.next().unwrap().unwrap();
2067        assert_eq!(chunk2.len(), 1);
2068        assert_eq!(chunk2[0].size, Quantity::from("1000"));
2069
2070        assert!(stream.next().is_none());
2071    }
2072
2073    #[rstest]
2074    pub fn test_stream_deltas_from_local_file() {
2075        let filepath = get_test_data_path("csv/deltas_1.csv");
2076        let mut stream = stream_deltas(filepath, 1, Some(1), Some(0), None, None).unwrap();
2077
2078        // With chunk_size=1, each delta gets its own chunk
2079        // First chunk: CLEAR
2080        let chunk1 = stream.next().unwrap().unwrap();
2081        assert_eq!(chunk1.len(), 1);
2082        assert_eq!(chunk1[0].action, BookAction::Clear);
2083
2084        // Second chunk: first data delta
2085        let chunk2 = stream.next().unwrap().unwrap();
2086        assert_eq!(chunk2.len(), 1);
2087        assert_eq!(chunk2[0].order.price, Price::from("6421.5"));
2088
2089        // Third chunk: second data delta
2090        let chunk3 = stream.next().unwrap().unwrap();
2091        assert_eq!(chunk3.len(), 1);
2092        assert_eq!(chunk3[0].order.size, Quantity::from("10000"));
2093
2094        assert!(stream.next().is_none());
2095    }
2096
2097    #[rstest]
2098    pub fn test_stream_deltas_with_limit() {
2099        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2100binance,BTCUSDT,1640995200000000,1640995200100000,false,bid,50000.0,1.0
2101binance,BTCUSDT,1640995201000000,1640995201100000,false,ask,50001.0,2.0
2102binance,BTCUSDT,1640995202000000,1640995202100000,false,bid,49999.0,1.5
2103binance,BTCUSDT,1640995203000000,1640995203100000,false,ask,50002.0,3.0
2104binance,BTCUSDT,1640995204000000,1640995204100000,false,bid,49998.0,0.5";
2105
2106        let temp_file = std::env::temp_dir().join("test_stream_deltas_limit.csv");
2107        std::fs::write(&temp_file, csv_data).unwrap();
2108
2109        // Test with limit of 3 records
2110        let stream = stream_deltas(&temp_file, 2, Some(4), Some(1), None, Some(3)).unwrap();
2111        let chunks: Vec<_> = stream.collect();
2112
2113        // Should have 2 chunks: [2 items, 1 item] = 3 total (limited)
2114        assert_eq!(chunks.len(), 2);
2115        let chunk1 = chunks[0].as_ref().unwrap();
2116        assert_eq!(chunk1.len(), 2);
2117        let chunk2 = chunks[1].as_ref().unwrap();
2118        assert_eq!(chunk2.len(), 1);
2119
2120        // Total should be exactly 3 records due to limit
2121        let total_deltas: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
2122        assert_eq!(total_deltas, 3);
2123
2124        std::fs::remove_file(&temp_file).ok();
2125    }
2126
2127    #[rstest]
2128    pub fn test_stream_quotes_with_limit() {
2129        let csv_data =
2130            "exchange,symbol,timestamp,local_timestamp,ask_price,ask_amount,bid_price,bid_amount
2131binance,BTCUSDT,1640995200000000,1640995200100000,50001.0,1.0,50000.0,1.5
2132binance,BTCUSDT,1640995201000000,1640995201100000,50002.0,2.0,49999.0,2.5
2133binance,BTCUSDT,1640995202000000,1640995202100000,50003.0,1.5,49998.0,3.0
2134binance,BTCUSDT,1640995203000000,1640995203100000,50004.0,3.0,49997.0,3.5";
2135
2136        let temp_file = std::env::temp_dir().join("test_stream_quotes_limit.csv");
2137        std::fs::write(&temp_file, csv_data).unwrap();
2138
2139        // Test with limit of 2 records
2140        let stream = stream_quotes(&temp_file, 2, Some(4), Some(1), None, Some(2)).unwrap();
2141        let chunks: Vec<_> = stream.collect();
2142
2143        // Should have 1 chunk with 2 items (limited)
2144        assert_eq!(chunks.len(), 1);
2145        let chunk1 = chunks[0].as_ref().unwrap();
2146        assert_eq!(chunk1.len(), 2);
2147
2148        // Verify we get exactly 2 records
2149        let total_quotes: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
2150        assert_eq!(total_quotes, 2);
2151
2152        std::fs::remove_file(&temp_file).ok();
2153    }
2154
2155    #[rstest]
2156    pub fn test_stream_trades_with_limit() {
2157        let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
2158binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
2159binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,2.0
2160binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
2161binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0
2162binance,BTCUSDT,1640995204000000,1640995204100000,trade5,buy,50000.1234,0.5";
2163
2164        let temp_file = std::env::temp_dir().join("test_stream_trades_limit.csv");
2165        std::fs::write(&temp_file, csv_data).unwrap();
2166
2167        // Test with limit of 3 records
2168        let stream = stream_trades(&temp_file, 2, Some(4), Some(1), None, Some(3)).unwrap();
2169        let chunks: Vec<_> = stream.collect();
2170
2171        // Should have 2 chunks: [2 items, 1 item] = 3 total (limited)
2172        assert_eq!(chunks.len(), 2);
2173        let chunk1 = chunks[0].as_ref().unwrap();
2174        assert_eq!(chunk1.len(), 2);
2175        let chunk2 = chunks[1].as_ref().unwrap();
2176        assert_eq!(chunk2.len(), 1);
2177
2178        // Verify we get exactly 3 records
2179        let total_trades: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
2180        assert_eq!(total_trades, 3);
2181
2182        std::fs::remove_file(&temp_file).ok();
2183    }
2184
2185    #[rstest]
2186    pub fn test_depth10_invalid_levels_error_at_construction() {
2187        let temp_file = std::env::temp_dir().join("test_depth10_invalid_levels.csv");
2188        std::fs::write(&temp_file, "exchange,symbol,timestamp,local_timestamp\n").unwrap();
2189
2190        let result = Depth10StreamIterator::new(&temp_file, 10, 10, None, None, None, None);
2191        assert!(result.is_err());
2192        let err_msg = result.err().unwrap().to_string();
2193        assert!(
2194            err_msg.contains("Invalid levels"),
2195            "Error should mention 'Invalid levels': {err_msg}"
2196        );
2197
2198        let result = Depth10StreamIterator::new(&temp_file, 10, 3, None, None, None, None);
2199        assert!(result.is_err());
2200
2201        let result = Depth10StreamIterator::new(&temp_file, 10, 5, None, None, None, None);
2202        assert!(result.is_ok());
2203
2204        let result = Depth10StreamIterator::new(&temp_file, 10, 25, None, None, None, None);
2205        assert!(result.is_ok());
2206
2207        std::fs::remove_file(&temp_file).ok();
2208    }
2209
2210    #[rstest]
2211    pub fn test_stream_deltas_with_mid_snapshot_inserts_clear() {
2212        // CSV with:
2213        // - Initial snapshot (is_snapshot=true) at start
2214        // - Some deltas (is_snapshot=false)
2215        // - Mid-day snapshot (is_snapshot=true) - should trigger CLEAR
2216        // - Back to deltas (is_snapshot=false)
2217        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2218binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2219binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
2220binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
2221binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
2222binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,bid,50100.0,3.0
2223binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,ask,50101.0,4.0
2224binance-futures,BTCUSDT,1640995301000000,1640995301100000,false,bid,50099.0,1.0";
2225
2226        let temp_file = std::env::temp_dir().join("test_stream_deltas_mid_snapshot.csv");
2227        std::fs::write(&temp_file, csv_data).unwrap();
2228
2229        let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, None).unwrap();
2230        let all_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2231
2232        let clear_count = all_deltas
2233            .iter()
2234            .filter(|d| d.action == BookAction::Clear)
2235            .count();
2236
2237        // Should have 2 CLEAR deltas: initial snapshot + mid-day snapshot
2238        assert_eq!(
2239            clear_count, 2,
2240            "Expected 2 CLEAR deltas (initial + mid-day snapshot), found {clear_count}"
2241        );
2242
2243        // Verify CLEAR positions:
2244        // 0=CLEAR, 1=Add, 2=Add, 3=Update, 4=Update, 5=CLEAR, 6=Add, 7=Add, 8=Update
2245        assert_eq!(all_deltas[0].action, BookAction::Clear);
2246        assert_eq!(all_deltas[5].action, BookAction::Clear);
2247
2248        // CLEAR deltas should NOT have F_LAST when followed by same-timestamp deltas
2249        assert_eq!(
2250            all_deltas[0].flags & RecordFlag::F_LAST.value(),
2251            0,
2252            "CLEAR at index 0 should not have F_LAST flag"
2253        );
2254        assert_eq!(
2255            all_deltas[5].flags & RecordFlag::F_LAST.value(),
2256            0,
2257            "CLEAR at index 5 should not have F_LAST flag"
2258        );
2259
2260        std::fs::remove_file(&temp_file).ok();
2261    }
2262
2263    #[rstest]
2264    pub fn test_load_deltas_with_mid_snapshot_inserts_clear() {
2265        let filepath = get_test_data_path("csv/deltas_with_snapshot.csv");
2266        let deltas = load_deltas(&filepath, Some(1), Some(1), None, None).unwrap();
2267
2268        let clear_count = deltas
2269            .iter()
2270            .filter(|d| d.action == BookAction::Clear)
2271            .count();
2272
2273        // Should have 2 CLEAR deltas: initial snapshot + mid-day snapshot
2274        assert_eq!(
2275            clear_count, 2,
2276            "Expected 2 CLEAR deltas (initial + mid-day snapshot), found {clear_count}"
2277        );
2278
2279        assert_eq!(deltas[0].action, BookAction::Clear);
2280
2281        let second_clear_idx = deltas
2282            .iter()
2283            .enumerate()
2284            .filter(|(_, d)| d.action == BookAction::Clear)
2285            .nth(1)
2286            .map(|(i, _)| i)
2287            .expect("Should have second CLEAR");
2288
2289        // 0=CLEAR, 1=Add, 2=Add, 3=Update, 4=Update, 5=Delete, 6=CLEAR
2290        assert_eq!(
2291            second_clear_idx, 6,
2292            "Second CLEAR should be at index 6, found {second_clear_idx}"
2293        );
2294
2295        // CLEAR deltas should NOT have F_LAST when followed by same-timestamp deltas
2296        assert_eq!(
2297            deltas[0].flags & RecordFlag::F_LAST.value(),
2298            0,
2299            "CLEAR at index 0 should not have F_LAST flag"
2300        );
2301        assert_eq!(
2302            deltas[6].flags & RecordFlag::F_LAST.value(),
2303            0,
2304            "CLEAR at index 6 should not have F_LAST flag"
2305        );
2306    }
2307
2308    #[rstest]
2309    fn test_stream_deltas_chunk_size_respects_clear() {
2310        // Test that chunk_size applies to total emitted deltas (including CLEARs)
2311        // With chunk_size=1, a snapshot boundary should emit CLEAR in one chunk
2312        // and the real delta in the next chunk
2313        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2314binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2315binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0";
2316
2317        let temp_file = std::env::temp_dir().join("test_stream_chunk_size_clear.csv");
2318        std::fs::write(&temp_file, csv_data).unwrap();
2319
2320        // chunk_size=1 should produce separate chunks for CLEAR and real deltas
2321        let stream = stream_deltas(&temp_file, 1, Some(1), Some(1), None, None).unwrap();
2322        let chunks: Vec<_> = stream.collect();
2323
2324        // Should have 3 chunks: [CLEAR], [data], [data]
2325        assert_eq!(chunks.len(), 3, "Expected 3 chunks with chunk_size=1");
2326        assert_eq!(chunks[0].as_ref().unwrap().len(), 1);
2327        assert_eq!(chunks[1].as_ref().unwrap().len(), 1);
2328        assert_eq!(chunks[2].as_ref().unwrap().len(), 1);
2329
2330        // First chunk should be CLEAR
2331        assert_eq!(chunks[0].as_ref().unwrap()[0].action, BookAction::Clear);
2332        // Second and third chunks should be data deltas
2333        assert_eq!(chunks[1].as_ref().unwrap()[0].action, BookAction::Add);
2334        assert_eq!(chunks[2].as_ref().unwrap()[0].action, BookAction::Add);
2335
2336        std::fs::remove_file(&temp_file).ok();
2337    }
2338
2339    #[rstest]
2340    fn test_stream_deltas_limit_stops_at_clear() {
2341        // Test that limit=1 with snapshot data returns only the CLEAR delta
2342        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2343binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2344binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0";
2345
2346        let temp_file = std::env::temp_dir().join("test_stream_limit_stops_at_clear.csv");
2347        std::fs::write(&temp_file, csv_data).unwrap();
2348
2349        // limit=1 should only get the CLEAR delta
2350        let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, Some(1)).unwrap();
2351        let all_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2352
2353        assert_eq!(all_deltas.len(), 1);
2354        assert_eq!(all_deltas[0].action, BookAction::Clear);
2355
2356        std::fs::remove_file(&temp_file).ok();
2357    }
2358
2359    #[rstest]
2360    fn test_stream_deltas_limit_includes_clear() {
2361        // Test that limit counts total emitted deltas (including CLEARs)
2362        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2363binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2364binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
2365binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
2366binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
2367binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5";
2368
2369        let temp_file = std::env::temp_dir().join("test_stream_limit_includes_clear.csv");
2370        std::fs::write(&temp_file, csv_data).unwrap();
2371
2372        // limit=4 should get exactly 4 deltas: 1 CLEAR + 3 data deltas
2373        let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, Some(4)).unwrap();
2374        let all_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2375
2376        assert_eq!(all_deltas.len(), 4);
2377        assert_eq!(all_deltas[0].action, BookAction::Clear);
2378        assert_eq!(all_deltas[1].action, BookAction::Add);
2379        assert_eq!(all_deltas[2].action, BookAction::Add);
2380        assert_eq!(all_deltas[3].action, BookAction::Update);
2381
2382        std::fs::remove_file(&temp_file).ok();
2383    }
2384
2385    #[rstest]
2386    fn test_stream_deltas_limit_sets_f_last() {
2387        // Test that F_LAST is set on the final delta when limit is reached
2388        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2389binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2390binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
2391binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
2392binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
2393binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5";
2394
2395        let temp_file = std::env::temp_dir().join("test_stream_limit_f_last.csv");
2396        std::fs::write(&temp_file, csv_data).unwrap();
2397
2398        // limit=3 should get 3 deltas with F_LAST on the last one
2399        let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, Some(3)).unwrap();
2400        let chunks: Vec<_> = stream.collect();
2401
2402        // Should have 1 chunk with 3 deltas
2403        assert_eq!(chunks.len(), 1);
2404        let deltas = chunks[0].as_ref().unwrap();
2405        assert_eq!(deltas.len(), 3);
2406
2407        // Final delta should have F_LAST flag
2408        assert_eq!(
2409            deltas[2].flags & RecordFlag::F_LAST.value(),
2410            RecordFlag::F_LAST.value(),
2411            "Final delta should have F_LAST flag when limit is reached"
2412        );
2413
2414        std::fs::remove_file(&temp_file).ok();
2415    }
2416
2417    #[rstest]
2418    fn test_stream_deltas_chunk_boundary_no_f_last() {
2419        // Test that F_LAST is NOT set when only chunk_size boundary is hit (more data follows)
2420        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2421binance-futures,BTCUSDT,1640995200000000,1640995200100000,false,bid,50000.0,1.0
2422binance-futures,BTCUSDT,1640995200000000,1640995200100000,false,ask,50001.0,2.0
2423binance-futures,BTCUSDT,1640995200000000,1640995200100000,false,bid,49999.0,0.5";
2424
2425        let temp_file = std::env::temp_dir().join("test_stream_chunk_no_f_last.csv");
2426        std::fs::write(&temp_file, csv_data).unwrap();
2427
2428        // chunk_size=2, no limit - first chunk should NOT have F_LAST (more data follows)
2429        let mut stream = stream_deltas(&temp_file, 2, Some(1), Some(1), None, None).unwrap();
2430
2431        let chunk1 = stream.next().unwrap().unwrap();
2432        assert_eq!(chunk1.len(), 2);
2433
2434        // First chunk's last delta should NOT have F_LAST (more data follows with same timestamp)
2435        assert_eq!(
2436            chunk1[1].flags & RecordFlag::F_LAST.value(),
2437            0,
2438            "Mid-stream chunk should not have F_LAST flag"
2439        );
2440
2441        // Second chunk exists and has F_LAST (end of file)
2442        let chunk2 = stream.next().unwrap().unwrap();
2443        assert_eq!(chunk2.len(), 1);
2444        assert_eq!(
2445            chunk2[0].flags & RecordFlag::F_LAST.value(),
2446            RecordFlag::F_LAST.value(),
2447            "Final chunk at EOF should have F_LAST flag"
2448        );
2449
2450        std::fs::remove_file(&temp_file).ok();
2451    }
2452}