1use std::collections::BinaryHeap;
17
18use ahash::AHashMap;
19use nautilus_core::UnixNanos;
20use nautilus_model::data::{Data, HasTsInit};
21
22#[derive(Debug, Eq, PartialEq)]
24struct HeapEntry {
25 ts: UnixNanos,
26 priority: i32,
27 index: usize,
28}
29
30impl Ord for HeapEntry {
31 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
32 self.ts
34 .cmp(&other.ts)
35 .then_with(|| self.priority.cmp(&other.priority))
36 .then_with(|| self.index.cmp(&other.index))
37 .reverse() }
39}
40
41impl PartialOrd for HeapEntry {
42 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
43 Some(self.cmp(other))
44 }
45}
46
47#[derive(Debug, Default)]
49pub struct BacktestDataIterator {
50 streams: AHashMap<i32, Vec<Data>>, names: AHashMap<i32, String>, priorities: AHashMap<String, i32>, indices: AHashMap<i32, usize>, heap: BinaryHeap<HeapEntry>,
55 single_priority: Option<i32>,
56 next_priority_counter: i32, }
58
59impl BacktestDataIterator {
60 #[must_use]
62 pub fn new() -> Self {
63 Self {
64 streams: AHashMap::new(),
65 names: AHashMap::new(),
66 priorities: AHashMap::new(),
67 indices: AHashMap::new(),
68 heap: BinaryHeap::new(),
69 single_priority: None,
70 next_priority_counter: 0,
71 }
72 }
73
74 pub fn add_data(&mut self, name: &str, mut data: Vec<Data>, append_data: bool) {
77 if data.is_empty() {
78 return;
79 }
80
81 data.sort_by_key(HasTsInit::ts_init);
83
84 let priority = if let Some(p) = self.priorities.get(name) {
85 *p
87 } else {
88 self.next_priority_counter += 1;
89 let sign = if append_data { 1 } else { -1 };
90 sign * self.next_priority_counter
91 };
92
93 self.remove_data(name, true);
95
96 self.streams.insert(priority, data);
97 self.names.insert(priority, name.to_string());
98 self.priorities.insert(name.to_string(), priority);
99 self.indices.insert(priority, 0);
100
101 self.rebuild_heap();
102 }
103
104 pub fn remove_data(&mut self, name: &str, complete_remove: bool) {
107 if let Some(priority) = self.priorities.remove(name) {
108 self.streams.remove(&priority);
109 self.indices.remove(&priority);
110 self.names.remove(&priority);
111
112 self.heap.retain(|e| e.priority != priority);
114
115 if self.heap.is_empty() {
116 self.single_priority = None;
117 }
118 }
119 if complete_remove {
120 }
122 }
123
124 pub fn set_index(&mut self, name: &str, index: usize) {
126 if let Some(priority) = self.priorities.get(name) {
127 self.indices.insert(*priority, index);
128 self.rebuild_heap();
129 }
130 }
131
132 pub fn reset_all_cursors(&mut self) {
134 for idx in self.indices.values_mut() {
135 *idx = 0;
136 }
137 self.rebuild_heap();
138 }
139
140 #[allow(clippy::should_implement_trait)]
142 pub fn next(&mut self) -> Option<Data> {
143 if let Some(p) = self.single_priority {
145 let data = self.streams.get_mut(&p)?;
146 let idx = self.indices.get_mut(&p)?;
147 if *idx >= data.len() {
148 return None;
149 }
150 let element = data[*idx].clone();
151 *idx += 1;
152 return Some(element);
153 }
154
155 let entry = self.heap.pop()?;
157 let stream_vec = self.streams.get(&entry.priority)?;
158 let element = stream_vec[entry.index].clone();
159
160 let next_index = entry.index + 1;
162 self.indices.insert(entry.priority, next_index);
163 if next_index < stream_vec.len() {
164 self.heap.push(HeapEntry {
165 ts: stream_vec[next_index].ts_init(),
166 priority: entry.priority,
167 index: next_index,
168 });
169 }
170
171 Some(element)
172 }
173
174 #[must_use]
175 pub fn is_done(&self) -> bool {
176 if let Some(p) = self.single_priority {
177 if let Some(idx) = self.indices.get(&p)
178 && let Some(vec) = self.streams.get(&p)
179 {
180 return *idx >= vec.len();
181 }
182 true
183 } else {
184 self.heap.is_empty()
185 }
186 }
187
188 fn rebuild_heap(&mut self) {
189 self.heap.clear();
190
191 if self.streams.len() == 1 {
193 self.single_priority = self.streams.keys().next().copied();
194 return;
195 }
196 self.single_priority = None;
197
198 for (&priority, vec) in &self.streams {
199 let idx = *self.indices.get(&priority).unwrap_or(&0);
200 if idx < vec.len() {
201 self.heap.push(HeapEntry {
202 ts: vec[idx].ts_init(),
203 priority,
204 index: idx,
205 });
206 }
207 }
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use nautilus_model::{
214 data::QuoteTick,
215 identifiers::InstrumentId,
216 types::{Price, Quantity},
217 };
218 use rstest::rstest;
219
220 use super::*;
221
222 fn quote(id: &str, ts: u64) -> Data {
223 let inst = InstrumentId::from(id);
224 Data::Quote(QuoteTick::new(
225 inst,
226 Price::from("1.0"),
227 Price::from("1.0"),
228 Quantity::from(100),
229 Quantity::from(100),
230 ts.into(),
231 ts.into(),
232 ))
233 }
234
235 fn collect_ts(it: &mut BacktestDataIterator) -> Vec<u64> {
236 let mut ts = Vec::new();
237 while let Some(d) = it.next() {
238 ts.push(d.ts_init().as_u64());
239 }
240 ts
241 }
242
243 #[rstest]
244 fn test_single_stream_yields_in_order() {
245 let mut it = BacktestDataIterator::new();
246 it.add_data(
247 "s",
248 vec![quote("A.B", 100), quote("A.B", 200), quote("A.B", 300)],
249 true,
250 );
251
252 assert_eq!(collect_ts(&mut it), vec![100, 200, 300]);
253 assert!(it.is_done());
254 }
255
256 #[rstest]
257 fn test_single_stream_exhaustion_returns_none() {
258 let mut it = BacktestDataIterator::new();
259 it.add_data("s", vec![quote("A.B", 1), quote("A.B", 3)], true);
260 assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(1));
261 assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(3));
262 assert!(it.next().is_none());
263 }
264
265 #[rstest]
266 fn test_single_stream_sorts_unsorted_input() {
267 let mut it = BacktestDataIterator::new();
268 it.add_data(
269 "s",
270 vec![quote("A.B", 300), quote("A.B", 100), quote("A.B", 200)],
271 true,
272 );
273
274 assert_eq!(collect_ts(&mut it), vec![100, 200, 300]);
275 }
276
277 #[rstest]
278 fn test_two_stream_merge_chronological() {
279 let mut it = BacktestDataIterator::new();
280 it.add_data("s1", vec![quote("A.B", 1), quote("A.B", 4)], true);
281 it.add_data("s2", vec![quote("C.D", 2), quote("C.D", 3)], false);
282
283 assert_eq!(collect_ts(&mut it), vec![1, 2, 3, 4]);
284 }
285
286 #[rstest]
287 fn test_three_stream_merge_sorted() {
288 let mut it = BacktestDataIterator::new();
289 let data_len = 5;
290 let d0: Vec<Data> = (0..data_len).map(|k| quote("A.B", 3 * k)).collect();
291 let d1: Vec<Data> = (0..data_len).map(|k| quote("C.D", 3 * k + 1)).collect();
292 let d2: Vec<Data> = (0..data_len).map(|k| quote("E.F", 3 * k + 2)).collect();
293 it.add_data("d0", d0, true);
294 it.add_data("d1", d1, true);
295 it.add_data("d2", d2, true);
296
297 let ts = collect_ts(&mut it);
298 assert_eq!(ts.len(), 15);
299 for i in 0..ts.len() - 1 {
300 assert!(ts[i] <= ts[i + 1], "Not sorted at index {i}");
301 }
302 }
303
304 #[rstest]
305 fn test_multiple_streams_merge_order() {
306 let mut it = BacktestDataIterator::new();
307 it.add_data("s1", vec![quote("A.B", 100), quote("A.B", 300)], true);
308 it.add_data("s2", vec![quote("C.D", 200), quote("C.D", 400)], true);
309
310 assert_eq!(collect_ts(&mut it), vec![100, 200, 300, 400]);
311 }
312
313 #[rstest]
314 fn test_append_data_priority_default_fifo() {
315 let mut it = BacktestDataIterator::new();
316 it.add_data("a", vec![quote("A.B", 100)], true);
317 it.add_data("b", vec![quote("C.D", 100)], true);
318
319 let ts = collect_ts(&mut it);
321 assert_eq!(ts, vec![100, 100]);
322 }
323
324 #[rstest]
325 fn test_prepend_priority_wins_ties() {
326 let mut it = BacktestDataIterator::new();
327 it.add_data("a", vec![quote("A.B", 100)], true);
329 it.add_data("b", vec![quote("C.D", 100)], false);
330
331 let first = it.next().unwrap();
333 let second = it.next().unwrap();
334 assert_eq!(first.instrument_id(), InstrumentId::from("C.D"));
336 assert_eq!(second.instrument_id(), InstrumentId::from("A.B"));
337 }
338
339 #[rstest]
340 fn test_is_done_empty_iterator() {
341 let it = BacktestDataIterator::new();
342 assert!(it.is_done());
343 }
344
345 #[rstest]
346 fn test_is_done_after_consumption() {
347 let mut it = BacktestDataIterator::new();
348 it.add_data("s", vec![quote("A.B", 1)], true);
349
350 assert!(!it.is_done());
351 it.next();
352 assert!(it.is_done());
353 }
354
355 #[rstest]
356 fn test_is_done_multi_stream() {
357 let mut it = BacktestDataIterator::new();
358 it.add_data("s1", vec![quote("A.B", 1)], true);
359 it.add_data("s2", vec![quote("C.D", 2)], true);
360
361 assert!(!it.is_done());
362 it.next();
363 assert!(!it.is_done());
364 it.next();
365 assert!(it.is_done());
366 }
367
368 #[rstest]
369 fn test_partial_consumption_then_complete() {
370 let mut it = BacktestDataIterator::new();
371 it.add_data(
372 "s",
373 vec![
374 quote("A.B", 0),
375 quote("A.B", 1),
376 quote("A.B", 2),
377 quote("A.B", 3),
378 ],
379 true,
380 );
381
382 assert_eq!(it.next().unwrap().ts_init().as_u64(), 0);
383 assert_eq!(it.next().unwrap().ts_init().as_u64(), 1);
384
385 let remaining = collect_ts(&mut it);
386 assert_eq!(remaining, vec![2, 3]);
387 assert!(it.is_done());
388 }
389
390 #[rstest]
391 fn test_remove_stream_reduces_output() {
392 let mut it = BacktestDataIterator::new();
393 it.add_data("a", vec![quote("A.B", 1)], true);
394 it.add_data("b", vec![quote("C.D", 2)], true);
395
396 it.remove_data("a", false);
397
398 assert_eq!(collect_ts(&mut it), vec![2]);
399 }
400
401 #[rstest]
402 fn test_remove_all_streams_yields_empty() {
403 let mut it = BacktestDataIterator::new();
404 it.add_data("x", vec![quote("A.B", 1)], true);
405 it.add_data("y", vec![quote("C.D", 2)], true);
406
407 it.remove_data("x", false);
408 it.remove_data("y", false);
409
410 assert!(it.next().is_none());
411 assert!(it.is_done());
412 }
413
414 #[rstest]
415 fn test_remove_nonexistent_stream_is_noop() {
416 let mut it = BacktestDataIterator::new();
417 it.add_data("s", vec![quote("A.B", 1)], true);
418
419 it.remove_data("nonexistent", false);
420
421 assert_eq!(collect_ts(&mut it), vec![1]);
422 }
423
424 #[rstest]
425 fn test_remove_after_full_consumption() {
426 let mut it = BacktestDataIterator::new();
427 it.add_data("s", vec![quote("A.B", 1), quote("A.B", 2)], true);
428
429 collect_ts(&mut it);
430
431 it.remove_data("s", true);
432 assert!(it.is_done());
433 }
434
435 #[rstest]
436 fn test_set_index_rewinds_stream() {
437 let mut it = BacktestDataIterator::new();
438 it.add_data(
439 "s",
440 vec![quote("A.B", 10), quote("A.B", 20), quote("A.B", 30)],
441 true,
442 );
443
444 assert_eq!(it.next().unwrap().ts_init().as_u64(), 10);
445
446 it.set_index("s", 0);
447
448 assert_eq!(collect_ts(&mut it), vec![10, 20, 30]);
449 }
450
451 #[rstest]
452 fn test_set_index_skips_forward() {
453 let mut it = BacktestDataIterator::new();
454 it.add_data(
455 "s",
456 vec![quote("A.B", 10), quote("A.B", 20), quote("A.B", 30)],
457 true,
458 );
459
460 it.set_index("s", 2);
461
462 assert_eq!(collect_ts(&mut it), vec![30]);
463 }
464
465 #[rstest]
466 fn test_set_index_nonexistent_stream_is_noop() {
467 let mut it = BacktestDataIterator::new();
468 it.add_data("s", vec![quote("A.B", 1)], true);
469
470 it.set_index("nonexistent", 0);
471
472 assert_eq!(collect_ts(&mut it), vec![1]);
473 }
474
475 #[rstest]
476 fn test_reset_all_cursors_single_stream() {
477 let mut it = BacktestDataIterator::new();
478 it.add_data("s", vec![quote("A.B", 1), quote("A.B", 2)], true);
479
480 collect_ts(&mut it);
481 assert!(it.is_done());
482
483 it.reset_all_cursors();
484 assert!(!it.is_done());
485 assert_eq!(collect_ts(&mut it), vec![1, 2]);
486 }
487
488 #[rstest]
489 fn test_reset_all_cursors_multi_stream() {
490 let mut it = BacktestDataIterator::new();
491 it.add_data("s1", vec![quote("A.B", 1), quote("A.B", 3)], true);
492 it.add_data("s2", vec![quote("C.D", 2), quote("C.D", 4)], true);
493
494 collect_ts(&mut it);
495 assert!(it.is_done());
496
497 it.reset_all_cursors();
498 assert_eq!(collect_ts(&mut it), vec![1, 2, 3, 4]);
499 }
500
501 #[rstest]
502 fn test_readding_data_replaces_stream() {
503 let mut it = BacktestDataIterator::new();
504 it.add_data("X", vec![quote("A.B", 1), quote("A.B", 2)], true);
505 it.add_data("X", vec![quote("A.B", 10)], true);
506
507 assert_eq!(collect_ts(&mut it), vec![10]);
508 }
509
510 #[rstest]
511 fn test_add_empty_data_is_noop() {
512 let mut it = BacktestDataIterator::new();
513 it.add_data("empty", vec![], true);
514
515 assert!(it.is_done());
516 assert!(it.next().is_none());
517 }
518
519 #[rstest]
520 fn test_empty_iterator_returns_none() {
521 let mut it = BacktestDataIterator::new();
522 assert!(it.next().is_none());
523 assert!(it.is_done());
524 }
525
526 #[rstest]
527 fn test_multiple_add_data_calls_with_different_names() {
528 let mut it = BacktestDataIterator::new();
529 it.add_data("batch_0", vec![quote("A.B", 1), quote("A.B", 3)], true);
530 it.add_data("batch_1", vec![quote("A.B", 2), quote("A.B", 4)], true);
531
532 assert_eq!(collect_ts(&mut it), vec![1, 2, 3, 4]);
533 }
534}