Skip to main content

nautilus_backtest/
data_iterator.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::collections::BinaryHeap;
17
18use ahash::AHashMap;
19use nautilus_core::UnixNanos;
20use nautilus_model::data::{Data, HasTsInit};
21
22/// Internal convenience struct to keep heap entries ordered by `(ts_init, priority)`.
23#[derive(Debug, Eq, PartialEq)]
24struct HeapEntry {
25    ts: UnixNanos,
26    priority: i32,
27    index: usize,
28}
29
30impl Ord for HeapEntry {
31    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
32        // min-heap on ts, then priority sign (+/-) then index
33        self.ts
34            .cmp(&other.ts)
35            .then_with(|| self.priority.cmp(&other.priority))
36            .then_with(|| self.index.cmp(&other.index))
37            .reverse() // BinaryHeap is max by default -> reverse for min behaviour
38    }
39}
40
41impl PartialOrd for HeapEntry {
42    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
43        Some(self.cmp(other))
44    }
45}
46
47/// Multi-stream, time-ordered data iterator used by the backtest engine.
48#[derive(Debug, Default)]
49pub struct BacktestDataIterator {
50    streams: AHashMap<i32, Vec<Data>>, // key: priority, value: Vec<Data>
51    names: AHashMap<i32, String>,      // priority -> name
52    priorities: AHashMap<String, i32>, // name -> priority
53    indices: AHashMap<i32, usize>,     // cursor per stream
54    heap: BinaryHeap<HeapEntry>,
55    single_priority: Option<i32>,
56    next_priority_counter: i32, // monotonically increasing counter used to assign priorities
57}
58
59impl BacktestDataIterator {
60    /// Create an empty [`BacktestDataIterator`].
61    #[must_use]
62    pub fn new() -> Self {
63        Self {
64            streams: AHashMap::new(),
65            names: AHashMap::new(),
66            priorities: AHashMap::new(),
67            indices: AHashMap::new(),
68            heap: BinaryHeap::new(),
69            single_priority: None,
70            next_priority_counter: 0,
71        }
72    }
73
74    /// Add (or replace) a named data stream.  `append_data=true` gives the stream
75    /// lower priority when timestamps tie, mirroring the original behaviour.
76    pub fn add_data(&mut self, name: &str, mut data: Vec<Data>, append_data: bool) {
77        if data.is_empty() {
78            return;
79        }
80
81        // Ensure sorted by ts_init
82        data.sort_by_key(HasTsInit::ts_init);
83
84        let priority = if let Some(p) = self.priorities.get(name) {
85            // Replace existing stream – remove previous traces then re-insert below.
86            *p
87        } else {
88            self.next_priority_counter += 1;
89            let sign = if append_data { 1 } else { -1 };
90            sign * self.next_priority_counter
91        };
92
93        // Remove old state if any
94        self.remove_data(name, true);
95
96        self.streams.insert(priority, data);
97        self.names.insert(priority, name.to_string());
98        self.priorities.insert(name.to_string(), priority);
99        self.indices.insert(priority, 0);
100
101        self.rebuild_heap();
102    }
103
104    /// Remove a stream.  `complete_remove` also discards placeholder generator
105    /// (not implemented yet).
106    pub fn remove_data(&mut self, name: &str, complete_remove: bool) {
107        if let Some(priority) = self.priorities.remove(name) {
108            self.streams.remove(&priority);
109            self.indices.remove(&priority);
110            self.names.remove(&priority);
111
112            // Rebuild heap sans removed priority
113            self.heap.retain(|e| e.priority != priority);
114
115            if self.heap.is_empty() {
116                self.single_priority = None;
117            }
118        }
119        if complete_remove {
120            // Placeholder for future generator cleanup
121        }
122    }
123
124    /// Move cursor of stream to `index` (0-based).
125    pub fn set_index(&mut self, name: &str, index: usize) {
126        if let Some(priority) = self.priorities.get(name) {
127            self.indices.insert(*priority, index);
128            self.rebuild_heap();
129        }
130    }
131
132    /// Reset all stream cursors to the beginning.
133    pub fn reset_all_cursors(&mut self) {
134        for idx in self.indices.values_mut() {
135            *idx = 0;
136        }
137        self.rebuild_heap();
138    }
139
140    /// Return next Data element across all streams in chronological order.
141    #[allow(clippy::should_implement_trait)]
142    pub fn next(&mut self) -> Option<Data> {
143        // Fast path for single stream
144        if let Some(p) = self.single_priority {
145            let data = self.streams.get_mut(&p)?;
146            let idx = self.indices.get_mut(&p)?;
147            if *idx >= data.len() {
148                return None;
149            }
150            let element = data[*idx].clone();
151            *idx += 1;
152            return Some(element);
153        }
154
155        // Multi-stream path using heap
156        let entry = self.heap.pop()?;
157        let stream_vec = self.streams.get(&entry.priority)?;
158        let element = stream_vec[entry.index].clone();
159
160        // Advance cursor and push next entry
161        let next_index = entry.index + 1;
162        self.indices.insert(entry.priority, next_index);
163        if next_index < stream_vec.len() {
164            self.heap.push(HeapEntry {
165                ts: stream_vec[next_index].ts_init(),
166                priority: entry.priority,
167                index: next_index,
168            });
169        }
170
171        Some(element)
172    }
173
174    #[must_use]
175    pub fn is_done(&self) -> bool {
176        if let Some(p) = self.single_priority {
177            if let Some(idx) = self.indices.get(&p)
178                && let Some(vec) = self.streams.get(&p)
179            {
180                return *idx >= vec.len();
181            }
182            true
183        } else {
184            self.heap.is_empty()
185        }
186    }
187
188    fn rebuild_heap(&mut self) {
189        self.heap.clear();
190
191        // Determine if we’re in single-stream mode
192        if self.streams.len() == 1 {
193            self.single_priority = self.streams.keys().next().copied();
194            return;
195        }
196        self.single_priority = None;
197
198        for (&priority, vec) in &self.streams {
199            let idx = *self.indices.get(&priority).unwrap_or(&0);
200            if idx < vec.len() {
201                self.heap.push(HeapEntry {
202                    ts: vec[idx].ts_init(),
203                    priority,
204                    index: idx,
205                });
206            }
207        }
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use nautilus_model::{
214        data::QuoteTick,
215        identifiers::InstrumentId,
216        types::{Price, Quantity},
217    };
218    use rstest::rstest;
219
220    use super::*;
221
222    fn quote(id: &str, ts: u64) -> Data {
223        let inst = InstrumentId::from(id);
224        Data::Quote(QuoteTick::new(
225            inst,
226            Price::from("1.0"),
227            Price::from("1.0"),
228            Quantity::from(100),
229            Quantity::from(100),
230            ts.into(),
231            ts.into(),
232        ))
233    }
234
235    fn collect_ts(it: &mut BacktestDataIterator) -> Vec<u64> {
236        let mut ts = Vec::new();
237        while let Some(d) = it.next() {
238            ts.push(d.ts_init().as_u64());
239        }
240        ts
241    }
242
243    #[rstest]
244    fn test_single_stream_yields_in_order() {
245        let mut it = BacktestDataIterator::new();
246        it.add_data(
247            "s",
248            vec![quote("A.B", 100), quote("A.B", 200), quote("A.B", 300)],
249            true,
250        );
251
252        assert_eq!(collect_ts(&mut it), vec![100, 200, 300]);
253        assert!(it.is_done());
254    }
255
256    #[rstest]
257    fn test_single_stream_exhaustion_returns_none() {
258        let mut it = BacktestDataIterator::new();
259        it.add_data("s", vec![quote("A.B", 1), quote("A.B", 3)], true);
260        assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(1));
261        assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(3));
262        assert!(it.next().is_none());
263    }
264
265    #[rstest]
266    fn test_single_stream_sorts_unsorted_input() {
267        let mut it = BacktestDataIterator::new();
268        it.add_data(
269            "s",
270            vec![quote("A.B", 300), quote("A.B", 100), quote("A.B", 200)],
271            true,
272        );
273
274        assert_eq!(collect_ts(&mut it), vec![100, 200, 300]);
275    }
276
277    #[rstest]
278    fn test_two_stream_merge_chronological() {
279        let mut it = BacktestDataIterator::new();
280        it.add_data("s1", vec![quote("A.B", 1), quote("A.B", 4)], true);
281        it.add_data("s2", vec![quote("C.D", 2), quote("C.D", 3)], false);
282
283        assert_eq!(collect_ts(&mut it), vec![1, 2, 3, 4]);
284    }
285
286    #[rstest]
287    fn test_three_stream_merge_sorted() {
288        let mut it = BacktestDataIterator::new();
289        let data_len = 5;
290        let d0: Vec<Data> = (0..data_len).map(|k| quote("A.B", 3 * k)).collect();
291        let d1: Vec<Data> = (0..data_len).map(|k| quote("C.D", 3 * k + 1)).collect();
292        let d2: Vec<Data> = (0..data_len).map(|k| quote("E.F", 3 * k + 2)).collect();
293        it.add_data("d0", d0, true);
294        it.add_data("d1", d1, true);
295        it.add_data("d2", d2, true);
296
297        let ts = collect_ts(&mut it);
298        assert_eq!(ts.len(), 15);
299        for i in 0..ts.len() - 1 {
300            assert!(ts[i] <= ts[i + 1], "Not sorted at index {i}");
301        }
302    }
303
304    #[rstest]
305    fn test_multiple_streams_merge_order() {
306        let mut it = BacktestDataIterator::new();
307        it.add_data("s1", vec![quote("A.B", 100), quote("A.B", 300)], true);
308        it.add_data("s2", vec![quote("C.D", 200), quote("C.D", 400)], true);
309
310        assert_eq!(collect_ts(&mut it), vec![100, 200, 300, 400]);
311    }
312
313    #[rstest]
314    fn test_append_data_priority_default_fifo() {
315        let mut it = BacktestDataIterator::new();
316        it.add_data("a", vec![quote("A.B", 100)], true);
317        it.add_data("b", vec![quote("C.D", 100)], true);
318
319        // Both at same timestamp, FIFO order (a before b)
320        let ts = collect_ts(&mut it);
321        assert_eq!(ts, vec![100, 100]);
322    }
323
324    #[rstest]
325    fn test_prepend_priority_wins_ties() {
326        let mut it = BacktestDataIterator::new();
327        // "a" is appended (lower priority), "b" is prepended (higher priority)
328        it.add_data("a", vec![quote("A.B", 100)], true);
329        it.add_data("b", vec![quote("C.D", 100)], false);
330
331        // "b" (prepend) should come first despite being added second
332        let first = it.next().unwrap();
333        let second = it.next().unwrap();
334        // Prepend stream (negative priority) wins ties over append (positive)
335        assert_eq!(first.instrument_id(), InstrumentId::from("C.D"));
336        assert_eq!(second.instrument_id(), InstrumentId::from("A.B"));
337    }
338
339    #[rstest]
340    fn test_is_done_empty_iterator() {
341        let it = BacktestDataIterator::new();
342        assert!(it.is_done());
343    }
344
345    #[rstest]
346    fn test_is_done_after_consumption() {
347        let mut it = BacktestDataIterator::new();
348        it.add_data("s", vec![quote("A.B", 1)], true);
349
350        assert!(!it.is_done());
351        it.next();
352        assert!(it.is_done());
353    }
354
355    #[rstest]
356    fn test_is_done_multi_stream() {
357        let mut it = BacktestDataIterator::new();
358        it.add_data("s1", vec![quote("A.B", 1)], true);
359        it.add_data("s2", vec![quote("C.D", 2)], true);
360
361        assert!(!it.is_done());
362        it.next();
363        assert!(!it.is_done());
364        it.next();
365        assert!(it.is_done());
366    }
367
368    #[rstest]
369    fn test_partial_consumption_then_complete() {
370        let mut it = BacktestDataIterator::new();
371        it.add_data(
372            "s",
373            vec![
374                quote("A.B", 0),
375                quote("A.B", 1),
376                quote("A.B", 2),
377                quote("A.B", 3),
378            ],
379            true,
380        );
381
382        assert_eq!(it.next().unwrap().ts_init().as_u64(), 0);
383        assert_eq!(it.next().unwrap().ts_init().as_u64(), 1);
384
385        let remaining = collect_ts(&mut it);
386        assert_eq!(remaining, vec![2, 3]);
387        assert!(it.is_done());
388    }
389
390    #[rstest]
391    fn test_remove_stream_reduces_output() {
392        let mut it = BacktestDataIterator::new();
393        it.add_data("a", vec![quote("A.B", 1)], true);
394        it.add_data("b", vec![quote("C.D", 2)], true);
395
396        it.remove_data("a", false);
397
398        assert_eq!(collect_ts(&mut it), vec![2]);
399    }
400
401    #[rstest]
402    fn test_remove_all_streams_yields_empty() {
403        let mut it = BacktestDataIterator::new();
404        it.add_data("x", vec![quote("A.B", 1)], true);
405        it.add_data("y", vec![quote("C.D", 2)], true);
406
407        it.remove_data("x", false);
408        it.remove_data("y", false);
409
410        assert!(it.next().is_none());
411        assert!(it.is_done());
412    }
413
414    #[rstest]
415    fn test_remove_nonexistent_stream_is_noop() {
416        let mut it = BacktestDataIterator::new();
417        it.add_data("s", vec![quote("A.B", 1)], true);
418
419        it.remove_data("nonexistent", false);
420
421        assert_eq!(collect_ts(&mut it), vec![1]);
422    }
423
424    #[rstest]
425    fn test_remove_after_full_consumption() {
426        let mut it = BacktestDataIterator::new();
427        it.add_data("s", vec![quote("A.B", 1), quote("A.B", 2)], true);
428
429        collect_ts(&mut it);
430
431        it.remove_data("s", true);
432        assert!(it.is_done());
433    }
434
435    #[rstest]
436    fn test_set_index_rewinds_stream() {
437        let mut it = BacktestDataIterator::new();
438        it.add_data(
439            "s",
440            vec![quote("A.B", 10), quote("A.B", 20), quote("A.B", 30)],
441            true,
442        );
443
444        assert_eq!(it.next().unwrap().ts_init().as_u64(), 10);
445
446        it.set_index("s", 0);
447
448        assert_eq!(collect_ts(&mut it), vec![10, 20, 30]);
449    }
450
451    #[rstest]
452    fn test_set_index_skips_forward() {
453        let mut it = BacktestDataIterator::new();
454        it.add_data(
455            "s",
456            vec![quote("A.B", 10), quote("A.B", 20), quote("A.B", 30)],
457            true,
458        );
459
460        it.set_index("s", 2);
461
462        assert_eq!(collect_ts(&mut it), vec![30]);
463    }
464
465    #[rstest]
466    fn test_set_index_nonexistent_stream_is_noop() {
467        let mut it = BacktestDataIterator::new();
468        it.add_data("s", vec![quote("A.B", 1)], true);
469
470        it.set_index("nonexistent", 0);
471
472        assert_eq!(collect_ts(&mut it), vec![1]);
473    }
474
475    #[rstest]
476    fn test_reset_all_cursors_single_stream() {
477        let mut it = BacktestDataIterator::new();
478        it.add_data("s", vec![quote("A.B", 1), quote("A.B", 2)], true);
479
480        collect_ts(&mut it);
481        assert!(it.is_done());
482
483        it.reset_all_cursors();
484        assert!(!it.is_done());
485        assert_eq!(collect_ts(&mut it), vec![1, 2]);
486    }
487
488    #[rstest]
489    fn test_reset_all_cursors_multi_stream() {
490        let mut it = BacktestDataIterator::new();
491        it.add_data("s1", vec![quote("A.B", 1), quote("A.B", 3)], true);
492        it.add_data("s2", vec![quote("C.D", 2), quote("C.D", 4)], true);
493
494        collect_ts(&mut it);
495        assert!(it.is_done());
496
497        it.reset_all_cursors();
498        assert_eq!(collect_ts(&mut it), vec![1, 2, 3, 4]);
499    }
500
501    #[rstest]
502    fn test_readding_data_replaces_stream() {
503        let mut it = BacktestDataIterator::new();
504        it.add_data("X", vec![quote("A.B", 1), quote("A.B", 2)], true);
505        it.add_data("X", vec![quote("A.B", 10)], true);
506
507        assert_eq!(collect_ts(&mut it), vec![10]);
508    }
509
510    #[rstest]
511    fn test_add_empty_data_is_noop() {
512        let mut it = BacktestDataIterator::new();
513        it.add_data("empty", vec![], true);
514
515        assert!(it.is_done());
516        assert!(it.next().is_none());
517    }
518
519    #[rstest]
520    fn test_empty_iterator_returns_none() {
521        let mut it = BacktestDataIterator::new();
522        assert!(it.next().is_none());
523        assert!(it.is_done());
524    }
525
526    #[rstest]
527    fn test_multiple_add_data_calls_with_different_names() {
528        let mut it = BacktestDataIterator::new();
529        it.add_data("batch_0", vec![quote("A.B", 1), quote("A.B", 3)], true);
530        it.add_data("batch_1", vec![quote("A.B", 2), quote("A.B", 4)], true);
531
532        assert_eq!(collect_ts(&mut it), vec![1, 2, 3, 4]);
533    }
534}