nautilus_backtest/
data_iterator.rs1#![allow(dead_code)]
17#![allow(clippy::module_name_repetitions)]
18
19use std::collections::{BinaryHeap, HashMap};
20
21use nautilus_core::UnixNanos;
22use nautilus_model::data::{Data, HasTsInit};
23
24#[derive(Debug, Eq, PartialEq)]
26struct HeapEntry {
27 ts: UnixNanos,
28 priority: i32,
29 index: usize,
30}
31
32impl Ord for HeapEntry {
33 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
34 self.ts
36 .cmp(&other.ts)
37 .then_with(|| self.priority.cmp(&other.priority))
38 .then_with(|| self.index.cmp(&other.index))
39 .reverse() }
41}
42
43impl PartialOrd for HeapEntry {
44 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
45 Some(self.cmp(other))
46 }
47}
48
49#[derive(Debug, Default)]
51pub struct BacktestDataIterator {
52 streams: HashMap<i32, Vec<Data>>, names: HashMap<i32, String>, priorities: HashMap<String, i32>, indices: HashMap<i32, usize>, heap: BinaryHeap<HeapEntry>,
57 single_priority: Option<i32>,
58 next_priority_counter: i32, }
60
61impl BacktestDataIterator {
62 #[must_use]
64 pub fn new() -> Self {
65 Self {
66 streams: HashMap::new(),
67 names: HashMap::new(),
68 priorities: HashMap::new(),
69 indices: HashMap::new(),
70 heap: BinaryHeap::new(),
71 single_priority: None,
72 next_priority_counter: 0,
73 }
74 }
75
76 pub fn add_data(&mut self, name: &str, mut data: Vec<Data>, append_data: bool) {
79 if data.is_empty() {
80 return;
81 }
82
83 data.sort_by_key(nautilus_model::data::HasTsInit::ts_init);
85
86 let priority = if let Some(p) = self.priorities.get(name) {
87 *p
89 } else {
90 self.next_priority_counter += 1;
91 let sign = if append_data { 1 } else { -1 };
92 sign * self.next_priority_counter
93 };
94
95 self.remove_data(name, true);
97
98 self.streams.insert(priority, data);
99 self.names.insert(priority, name.to_string());
100 self.priorities.insert(name.to_string(), priority);
101 self.indices.insert(priority, 0);
102
103 self.rebuild_heap();
104 }
105
106 pub fn remove_data(&mut self, name: &str, complete_remove: bool) {
109 if let Some(priority) = self.priorities.remove(name) {
110 self.streams.remove(&priority);
111 self.indices.remove(&priority);
112 self.names.remove(&priority);
113
114 self.heap.retain(|e| e.priority != priority);
116
117 if self.heap.is_empty() {
118 self.single_priority = None;
119 }
120 }
121 if complete_remove {
122 }
124 }
125
126 pub fn set_index(&mut self, name: &str, index: usize) {
128 if let Some(priority) = self.priorities.get(name) {
129 self.indices.insert(*priority, index);
130 self.rebuild_heap();
131 }
132 }
133
134 #[allow(clippy::should_implement_trait)]
136 pub fn next(&mut self) -> Option<Data> {
137 if let Some(p) = self.single_priority {
139 let data = self.streams.get_mut(&p)?;
140 let idx = self.indices.get_mut(&p)?;
141 if *idx >= data.len() {
142 return None;
143 }
144 let element = data[*idx].clone();
145 *idx += 1;
146 return Some(element);
147 }
148
149 let entry = self.heap.pop()?;
151 let stream_vec = self.streams.get(&entry.priority)?;
152 let element = stream_vec[entry.index].clone();
153
154 let next_index = entry.index + 1;
156 self.indices.insert(entry.priority, next_index);
157 if next_index < stream_vec.len() {
158 self.heap.push(HeapEntry {
159 ts: stream_vec[next_index].ts_init(),
160 priority: entry.priority,
161 index: next_index,
162 });
163 }
164
165 Some(element)
166 }
167
168 #[must_use]
169 pub fn is_done(&self) -> bool {
170 if let Some(p) = self.single_priority {
171 if let Some(idx) = self.indices.get(&p)
172 && let Some(vec) = self.streams.get(&p)
173 {
174 return *idx >= vec.len();
175 }
176 true
177 } else {
178 self.heap.is_empty()
179 }
180 }
181
182 fn rebuild_heap(&mut self) {
183 self.heap.clear();
184
185 if self.streams.len() == 1 {
187 self.single_priority = self.streams.keys().next().copied();
188 return;
189 }
190 self.single_priority = None;
191
192 for (&priority, vec) in &self.streams {
193 let idx = *self.indices.get(&priority).unwrap_or(&0);
194 if idx < vec.len() {
195 self.heap.push(HeapEntry {
196 ts: vec[idx].ts_init(),
197 priority,
198 index: idx,
199 });
200 }
201 }
202 }
203}
204
205#[cfg(test)]
210mod tests {
211 use nautilus_model::{
212 data::QuoteTick,
213 identifiers::InstrumentId,
214 types::{Price, Quantity},
215 };
216 use rstest::rstest;
217
218 use super::*;
219
220 fn quote(id: &str, ts: u64) -> Data {
221 let inst = InstrumentId::from(id);
222 Data::Quote(QuoteTick::new(
223 inst,
224 Price::from("1.0"),
225 Price::from("1.0"),
226 Quantity::from(100),
227 Quantity::from(100),
228 ts.into(),
229 ts.into(),
230 ))
231 }
232
233 #[rstest]
234 fn test_single_stream() {
235 let mut it = BacktestDataIterator::new();
236 let stream = vec![quote("BTC-PERP.BINANCE", 1), quote("BTC-PERP.BINANCE", 3)];
237 it.add_data("main", stream, true);
238 assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(1));
239 assert_eq!(it.next().unwrap().ts_init(), UnixNanos::from(3));
240 assert!(it.next().is_none());
241 }
242
243 #[rstest]
244 fn test_two_stream_merge() {
245 let mut it = BacktestDataIterator::new();
246 it.add_data("s1", vec![quote("A.B", 1), quote("A.B", 4)], true);
247 it.add_data("s2", vec![quote("C.D", 2), quote("C.D", 3)], false);
248
249 let mut ts = Vec::new();
250 while let Some(d) = it.next() {
251 ts.push(d.ts_init());
252 }
253 assert_eq!(
254 ts,
255 vec![
256 UnixNanos::from(1),
257 UnixNanos::from(2),
258 UnixNanos::from(3),
259 UnixNanos::from(4)
260 ]
261 );
262 }
263}