nautilus_backtest/
data_iterator.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16#![allow(dead_code)]
17#![allow(clippy::module_name_repetitions)]
18
19use std::collections::BinaryHeap;
20
21use ahash::AHashMap;
22use nautilus_core::UnixNanos;
23use nautilus_model::data::{Data, HasTsInit};
24
25/// Internal convenience struct to keep heap entries ordered by `(ts_init, priority)`.
26#[derive(Debug, Eq, PartialEq)]
27struct HeapEntry {
28    ts: UnixNanos,
29    priority: i32,
30    index: usize,
31}
32
33impl Ord for HeapEntry {
34    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
35        // min-heap on ts, then priority sign (+/-) then index
36        self.ts
37            .cmp(&other.ts)
38            .then_with(|| self.priority.cmp(&other.priority))
39            .then_with(|| self.index.cmp(&other.index))
40            .reverse() // BinaryHeap is max by default -> reverse for min behaviour
41    }
42}
43
44impl PartialOrd for HeapEntry {
45    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
46        Some(self.cmp(other))
47    }
48}
49
50/// Multi-stream, time-ordered data iterator used by the backtest engine.
51#[derive(Debug, Default)]
52pub struct BacktestDataIterator {
53    streams: AHashMap<i32, Vec<Data>>, // key: priority, value: Vec<Data>
54    names: AHashMap<i32, String>,      // priority -> name
55    priorities: AHashMap<String, i32>, // name -> priority
56    indices: AHashMap<i32, usize>,     // cursor per stream
57    heap: BinaryHeap<HeapEntry>,
58    single_priority: Option<i32>,
59    next_priority_counter: i32, // monotonically increasing counter used to assign priorities
60}
61
62impl BacktestDataIterator {
63    /// Create an empty [`BacktestDataIterator`].
64    #[must_use]
65    pub fn new() -> Self {
66        Self {
67            streams: AHashMap::new(),
68            names: AHashMap::new(),
69            priorities: AHashMap::new(),
70            indices: AHashMap::new(),
71            heap: BinaryHeap::new(),
72            single_priority: None,
73            next_priority_counter: 0,
74        }
75    }
76
77    /// Add (or replace) a named data stream.  `append_data=true` gives the stream
78    /// lower priority when timestamps tie, mirroring the original behaviour.
79    pub fn add_data(&mut self, name: &str, mut data: Vec<Data>, append_data: bool) {
80        if data.is_empty() {
81            return;
82        }
83
84        // Ensure sorted by ts_init
85        data.sort_by_key(HasTsInit::ts_init);
86
87        let priority = if let Some(p) = self.priorities.get(name) {
88            // Replace existing stream – remove previous traces then re-insert below.
89            *p
90        } else {
91            self.next_priority_counter += 1;
92            let sign = if append_data { 1 } else { -1 };
93            sign * self.next_priority_counter
94        };
95
96        // Remove old state if any
97        self.remove_data(name, true);
98
99        self.streams.insert(priority, data);
100        self.names.insert(priority, name.to_string());
101        self.priorities.insert(name.to_string(), priority);
102        self.indices.insert(priority, 0);
103
104        self.rebuild_heap();
105    }
106
107    /// Remove a stream.  `complete_remove` also discards placeholder generator
108    /// (not implemented yet).
109    pub fn remove_data(&mut self, name: &str, complete_remove: bool) {
110        if let Some(priority) = self.priorities.remove(name) {
111            self.streams.remove(&priority);
112            self.indices.remove(&priority);
113            self.names.remove(&priority);
114
115            // Rebuild heap sans removed priority
116            self.heap.retain(|e| e.priority != priority);
117
118            if self.heap.is_empty() {
119                self.single_priority = None;
120            }
121        }
122        if complete_remove {
123            // Placeholder for future generator cleanup
124        }
125    }
126
127    /// Move cursor of stream to `index` (0-based).
128    pub fn set_index(&mut self, name: &str, index: usize) {
129        if let Some(priority) = self.priorities.get(name) {
130            self.indices.insert(*priority, index);
131            self.rebuild_heap();
132        }
133    }
134
135    /// Return next Data element across all streams in chronological order.
136    #[allow(clippy::should_implement_trait)]
137    pub fn next(&mut self) -> Option<Data> {
138        // Fast path for single stream
139        if let Some(p) = self.single_priority {
140            let data = self.streams.get_mut(&p)?;
141            let idx = self.indices.get_mut(&p)?;
142            if *idx >= data.len() {
143                return None;
144            }
145            let element = data[*idx].clone();
146            *idx += 1;
147            return Some(element);
148        }
149
150        // Multi-stream path using heap
151        let entry = self.heap.pop()?;
152        let stream_vec = self.streams.get(&entry.priority)?;
153        let element = stream_vec[entry.index].clone();
154
155        // Advance cursor and push next entry
156        let next_index = entry.index + 1;
157        self.indices.insert(entry.priority, next_index);
158        if next_index < stream_vec.len() {
159            self.heap.push(HeapEntry {
160                ts: stream_vec[next_index].ts_init(),
161                priority: entry.priority,
162                index: next_index,
163            });
164        }
165
166        Some(element)
167    }
168
169    #[must_use]
170    pub fn is_done(&self) -> bool {
171        if let Some(p) = self.single_priority {
172            if let Some(idx) = self.indices.get(&p)
173                && let Some(vec) = self.streams.get(&p)
174            {
175                return *idx >= vec.len();
176            }
177            true
178        } else {
179            self.heap.is_empty()
180        }
181    }
182
183    fn rebuild_heap(&mut self) {
184        self.heap.clear();
185
186        // Determine if we’re in single-stream mode
187        if self.streams.len() == 1 {
188            self.single_priority = self.streams.keys().next().copied();
189            return;
190        }
191        self.single_priority = None;
192
193        for (&priority, vec) in &self.streams {
194            let idx = *self.indices.get(&priority).unwrap_or(&0);
195            if idx < vec.len() {
196                self.heap.push(HeapEntry {
197                    ts: vec[idx].ts_init(),
198                    priority,
199                    index: idx,
200                });
201            }
202        }
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use nautilus_model::{
209        data::QuoteTick,
210        identifiers::InstrumentId,
211        types::{Price, Quantity},
212    };
213    use rstest::rstest;
214
215    use super::*;
216
217    fn quote(id: &str, ts: u64) -> Data {
218        let inst = InstrumentId::from(id);
219        Data::Quote(QuoteTick::new(
220            inst,
221            Price::from("1.0"),
222            Price::from("1.0"),
223            Quantity::from(100),
224            Quantity::from(100),
225            ts.into(),
226            ts.into(),
227        ))
228    }
229
230    #[rstest]
231    fn test_single_stream() {
232        let mut it = BacktestDataIterator::new();
233        let stream = vec![quote("BTC-PERP.BINANCE", 1), quote("BTC-PERP.BINANCE", 3)];
234        it.add_data("main", stream, true);
235        assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(1));
236        assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(3));
237        assert!(it.next().is_none());
238    }
239
240    #[rstest]
241    fn test_two_stream_merge() {
242        let mut it = BacktestDataIterator::new();
243        it.add_data("s1", vec![quote("A.B", 1), quote("A.B", 4)], true);
244        it.add_data("s2", vec![quote("C.D", 2), quote("C.D", 3)], false);
245
246        let mut ts = Vec::new();
247        while let Some(d) = it.next() {
248            ts.push(d.ts_init());
249        }
250        assert_eq!(
251            ts,
252            vec![
253                UnixNanos::from(1),
254                UnixNanos::from(2),
255                UnixNanos::from(3),
256                UnixNanos::from(4)
257            ]
258        );
259    }
260}