nautilus_backtest/
data_iterator.rs1#![allow(dead_code)]
17#![allow(clippy::module_name_repetitions)]
18
19use std::collections::BinaryHeap;
20
21use ahash::AHashMap;
22use nautilus_core::UnixNanos;
23use nautilus_model::data::{Data, HasTsInit};
24
25#[derive(Debug, Eq, PartialEq)]
27struct HeapEntry {
28 ts: UnixNanos,
29 priority: i32,
30 index: usize,
31}
32
33impl Ord for HeapEntry {
34 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
35 self.ts
37 .cmp(&other.ts)
38 .then_with(|| self.priority.cmp(&other.priority))
39 .then_with(|| self.index.cmp(&other.index))
40 .reverse() }
42}
43
44impl PartialOrd for HeapEntry {
45 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
46 Some(self.cmp(other))
47 }
48}
49
50#[derive(Debug, Default)]
52pub struct BacktestDataIterator {
53 streams: AHashMap<i32, Vec<Data>>, names: AHashMap<i32, String>, priorities: AHashMap<String, i32>, indices: AHashMap<i32, usize>, heap: BinaryHeap<HeapEntry>,
58 single_priority: Option<i32>,
59 next_priority_counter: i32, }
61
62impl BacktestDataIterator {
63 #[must_use]
65 pub fn new() -> Self {
66 Self {
67 streams: AHashMap::new(),
68 names: AHashMap::new(),
69 priorities: AHashMap::new(),
70 indices: AHashMap::new(),
71 heap: BinaryHeap::new(),
72 single_priority: None,
73 next_priority_counter: 0,
74 }
75 }
76
77 pub fn add_data(&mut self, name: &str, mut data: Vec<Data>, append_data: bool) {
80 if data.is_empty() {
81 return;
82 }
83
84 data.sort_by_key(HasTsInit::ts_init);
86
87 let priority = if let Some(p) = self.priorities.get(name) {
88 *p
90 } else {
91 self.next_priority_counter += 1;
92 let sign = if append_data { 1 } else { -1 };
93 sign * self.next_priority_counter
94 };
95
96 self.remove_data(name, true);
98
99 self.streams.insert(priority, data);
100 self.names.insert(priority, name.to_string());
101 self.priorities.insert(name.to_string(), priority);
102 self.indices.insert(priority, 0);
103
104 self.rebuild_heap();
105 }
106
107 pub fn remove_data(&mut self, name: &str, complete_remove: bool) {
110 if let Some(priority) = self.priorities.remove(name) {
111 self.streams.remove(&priority);
112 self.indices.remove(&priority);
113 self.names.remove(&priority);
114
115 self.heap.retain(|e| e.priority != priority);
117
118 if self.heap.is_empty() {
119 self.single_priority = None;
120 }
121 }
122 if complete_remove {
123 }
125 }
126
127 pub fn set_index(&mut self, name: &str, index: usize) {
129 if let Some(priority) = self.priorities.get(name) {
130 self.indices.insert(*priority, index);
131 self.rebuild_heap();
132 }
133 }
134
135 #[allow(clippy::should_implement_trait)]
137 pub fn next(&mut self) -> Option<Data> {
138 if let Some(p) = self.single_priority {
140 let data = self.streams.get_mut(&p)?;
141 let idx = self.indices.get_mut(&p)?;
142 if *idx >= data.len() {
143 return None;
144 }
145 let element = data[*idx].clone();
146 *idx += 1;
147 return Some(element);
148 }
149
150 let entry = self.heap.pop()?;
152 let stream_vec = self.streams.get(&entry.priority)?;
153 let element = stream_vec[entry.index].clone();
154
155 let next_index = entry.index + 1;
157 self.indices.insert(entry.priority, next_index);
158 if next_index < stream_vec.len() {
159 self.heap.push(HeapEntry {
160 ts: stream_vec[next_index].ts_init(),
161 priority: entry.priority,
162 index: next_index,
163 });
164 }
165
166 Some(element)
167 }
168
169 #[must_use]
170 pub fn is_done(&self) -> bool {
171 if let Some(p) = self.single_priority {
172 if let Some(idx) = self.indices.get(&p)
173 && let Some(vec) = self.streams.get(&p)
174 {
175 return *idx >= vec.len();
176 }
177 true
178 } else {
179 self.heap.is_empty()
180 }
181 }
182
183 fn rebuild_heap(&mut self) {
184 self.heap.clear();
185
186 if self.streams.len() == 1 {
188 self.single_priority = self.streams.keys().next().copied();
189 return;
190 }
191 self.single_priority = None;
192
193 for (&priority, vec) in &self.streams {
194 let idx = *self.indices.get(&priority).unwrap_or(&0);
195 if idx < vec.len() {
196 self.heap.push(HeapEntry {
197 ts: vec[idx].ts_init(),
198 priority,
199 index: idx,
200 });
201 }
202 }
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use nautilus_model::{
209 data::QuoteTick,
210 identifiers::InstrumentId,
211 types::{Price, Quantity},
212 };
213 use rstest::rstest;
214
215 use super::*;
216
217 fn quote(id: &str, ts: u64) -> Data {
218 let inst = InstrumentId::from(id);
219 Data::Quote(QuoteTick::new(
220 inst,
221 Price::from("1.0"),
222 Price::from("1.0"),
223 Quantity::from(100),
224 Quantity::from(100),
225 ts.into(),
226 ts.into(),
227 ))
228 }
229
230 #[rstest]
231 fn test_single_stream() {
232 let mut it = BacktestDataIterator::new();
233 let stream = vec![quote("BTC-PERP.BINANCE", 1), quote("BTC-PERP.BINANCE", 3)];
234 it.add_data("main", stream, true);
235 assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(1));
236 assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(3));
237 assert!(it.next().is_none());
238 }
239
240 #[rstest]
241 fn test_two_stream_merge() {
242 let mut it = BacktestDataIterator::new();
243 it.add_data("s1", vec![quote("A.B", 1), quote("A.B", 4)], true);
244 it.add_data("s2", vec![quote("C.D", 2), quote("C.D", 3)], false);
245
246 let mut ts = Vec::new();
247 while let Some(d) = it.next() {
248 ts.push(d.ts_init());
249 }
250 assert_eq!(
251 ts,
252 vec![
253 UnixNanos::from(1),
254 UnixNanos::from(2),
255 UnixNanos::from(3),
256 UnixNanos::from(4)
257 ]
258 );
259 }
260}