nautilus_backtest/
data_iterator.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16#![allow(dead_code)]
17#![allow(clippy::module_name_repetitions)]
18
19use std::collections::{BinaryHeap, HashMap};
20
21use nautilus_core::UnixNanos;
22use nautilus_model::data::{Data, HasTsInit};
23
24/// Internal convenience struct to keep heap entries ordered by `(ts_init, priority)`.
25#[derive(Debug, Eq, PartialEq)]
26struct HeapEntry {
27    ts: UnixNanos,
28    priority: i32,
29    index: usize,
30}
31
32impl Ord for HeapEntry {
33    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
34        // min-heap on ts, then priority sign (+/-) then index
35        self.ts
36            .cmp(&other.ts)
37            .then_with(|| self.priority.cmp(&other.priority))
38            .then_with(|| self.index.cmp(&other.index))
39            .reverse() // BinaryHeap is max by default -> reverse for min behaviour
40    }
41}
42
43impl PartialOrd for HeapEntry {
44    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
45        Some(self.cmp(other))
46    }
47}
48
49/// Multi-stream, time-ordered data iterator used by the backtest engine.
50#[derive(Debug, Default)]
51pub struct BacktestDataIterator {
52    streams: HashMap<i32, Vec<Data>>, // key: priority, value: Vec<Data>
53    names: HashMap<i32, String>,      // priority -> name
54    priorities: HashMap<String, i32>, // name -> priority
55    indices: HashMap<i32, usize>,     // cursor per stream
56    heap: BinaryHeap<HeapEntry>,
57    single_priority: Option<i32>,
58    next_priority_counter: i32, // monotonically increasing counter used to assign priorities
59}
60
61impl BacktestDataIterator {
62    /// Create an empty [`BacktestDataIterator`].
63    #[must_use]
64    pub fn new() -> Self {
65        Self {
66            streams: HashMap::new(),
67            names: HashMap::new(),
68            priorities: HashMap::new(),
69            indices: HashMap::new(),
70            heap: BinaryHeap::new(),
71            single_priority: None,
72            next_priority_counter: 0,
73        }
74    }
75
76    /// Add (or replace) a named data stream.  `append_data=true` gives the stream
77    /// lower priority when timestamps tie, mirroring the original behaviour.
78    pub fn add_data(&mut self, name: &str, mut data: Vec<Data>, append_data: bool) {
79        if data.is_empty() {
80            return;
81        }
82
83        // Ensure sorted by ts_init
84        data.sort_by_key(nautilus_model::data::HasTsInit::ts_init);
85
86        let priority = if let Some(p) = self.priorities.get(name) {
87            // Replace existing stream – remove previous traces then re-insert below.
88            *p
89        } else {
90            self.next_priority_counter += 1;
91            let sign = if append_data { 1 } else { -1 };
92            sign * self.next_priority_counter
93        };
94
95        // Remove old state if any
96        self.remove_data(name, true);
97
98        self.streams.insert(priority, data);
99        self.names.insert(priority, name.to_string());
100        self.priorities.insert(name.to_string(), priority);
101        self.indices.insert(priority, 0);
102
103        self.rebuild_heap();
104    }
105
106    /// Remove a stream.  `complete_remove` also discards placeholder generator
107    /// (not implemented yet).
108    pub fn remove_data(&mut self, name: &str, complete_remove: bool) {
109        if let Some(priority) = self.priorities.remove(name) {
110            self.streams.remove(&priority);
111            self.indices.remove(&priority);
112            self.names.remove(&priority);
113
114            // Rebuild heap sans removed priority
115            self.heap.retain(|e| e.priority != priority);
116
117            if self.heap.is_empty() {
118                self.single_priority = None;
119            }
120        }
121        if complete_remove {
122            // Placeholder for future generator cleanup
123        }
124    }
125
126    /// Move cursor of stream to `index` (0-based).
127    pub fn set_index(&mut self, name: &str, index: usize) {
128        if let Some(priority) = self.priorities.get(name) {
129            self.indices.insert(*priority, index);
130            self.rebuild_heap();
131        }
132    }
133
134    /// Return next Data element across all streams in chronological order.
135    #[allow(clippy::should_implement_trait)]
136    pub fn next(&mut self) -> Option<Data> {
137        // Fast path for single stream
138        if let Some(p) = self.single_priority {
139            let data = self.streams.get_mut(&p)?;
140            let idx = self.indices.get_mut(&p)?;
141            if *idx >= data.len() {
142                return None;
143            }
144            let element = data[*idx].clone();
145            *idx += 1;
146            return Some(element);
147        }
148
149        // Multi-stream path using heap
150        let entry = self.heap.pop()?;
151        let stream_vec = self.streams.get(&entry.priority)?;
152        let element = stream_vec[entry.index].clone();
153
154        // Advance cursor and push next entry
155        let next_index = entry.index + 1;
156        self.indices.insert(entry.priority, next_index);
157        if next_index < stream_vec.len() {
158            self.heap.push(HeapEntry {
159                ts: stream_vec[next_index].ts_init(),
160                priority: entry.priority,
161                index: next_index,
162            });
163        }
164
165        Some(element)
166    }
167
168    #[must_use]
169    pub fn is_done(&self) -> bool {
170        if let Some(p) = self.single_priority {
171            if let Some(idx) = self.indices.get(&p)
172                && let Some(vec) = self.streams.get(&p)
173            {
174                return *idx >= vec.len();
175            }
176            true
177        } else {
178            self.heap.is_empty()
179        }
180    }
181
182    fn rebuild_heap(&mut self) {
183        self.heap.clear();
184
185        // Determine if we’re in single-stream mode
186        if self.streams.len() == 1 {
187            self.single_priority = self.streams.keys().next().copied();
188            return;
189        }
190        self.single_priority = None;
191
192        for (&priority, vec) in &self.streams {
193            let idx = *self.indices.get(&priority).unwrap_or(&0);
194            if idx < vec.len() {
195                self.heap.push(HeapEntry {
196                    ts: vec[idx].ts_init(),
197                    priority,
198                    index: idx,
199                });
200            }
201        }
202    }
203}
204
205////////////////////////////////////////////////////////////////////////////////
206// Tests
207////////////////////////////////////////////////////////////////////////////////
208
209#[cfg(test)]
210mod tests {
211    use nautilus_model::{
212        data::QuoteTick,
213        identifiers::InstrumentId,
214        types::{Price, Quantity},
215    };
216    use rstest::rstest;
217
218    use super::*;
219
220    fn quote(id: &str, ts: u64) -> Data {
221        let inst = InstrumentId::from(id);
222        Data::Quote(QuoteTick::new(
223            inst,
224            Price::from("1.0"),
225            Price::from("1.0"),
226            Quantity::from(100),
227            Quantity::from(100),
228            ts.into(),
229            ts.into(),
230        ))
231    }
232
233    #[rstest]
234    fn test_single_stream() {
235        let mut it = BacktestDataIterator::new();
236        let stream = vec![quote("BTC-PERP.BINANCE", 1), quote("BTC-PERP.BINANCE", 3)];
237        it.add_data("main", stream, true);
238        assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(1));
239        assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(3));
240        assert!(it.next().is_none());
241    }
242
243    #[rstest]
244    fn test_two_stream_merge() {
245        let mut it = BacktestDataIterator::new();
246        it.add_data("s1", vec![quote("A.B", 1), quote("A.B", 4)], true);
247        it.add_data("s2", vec![quote("C.D", 2), quote("C.D", 3)], false);
248
249        let mut ts = Vec::new();
250        while let Some(d) = it.next() {
251            ts.push(d.ts_init());
252        }
253        assert_eq!(
254            ts,
255            vec![
256                UnixNanos::from(1),
257                UnixNanos::from(2),
258                UnixNanos::from(3),
259                UnixNanos::from(4)
260            ]
261        );
262    }
263}