1use std::{cmp::Ordering, collections::BinaryHeap};
17
18use nautilus_common::{clock::TestClock, timer::TimeEventHandler};
19use nautilus_core::UnixNanos;
20
21#[derive(Clone, Debug)]
25pub struct ScheduledTimeEventHandler(pub TimeEventHandler);
26
27impl PartialEq for ScheduledTimeEventHandler {
28 fn eq(&self, other: &Self) -> bool {
29 self.0.event.ts_event == other.0.event.ts_event
30 }
31}
32
33impl Eq for ScheduledTimeEventHandler {}
34
35impl PartialOrd for ScheduledTimeEventHandler {
36 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
37 Some(self.cmp(other))
38 }
39}
40
41impl Ord for ScheduledTimeEventHandler {
42 fn cmp(&self, other: &Self) -> Ordering {
43 other.0.event.ts_event.cmp(&self.0.event.ts_event)
45 }
46}
47
48#[derive(Debug)]
53pub struct TimeEventAccumulator {
54 heap: BinaryHeap<ScheduledTimeEventHandler>,
55}
56
57impl TimeEventAccumulator {
58 #[must_use]
60 pub fn new() -> Self {
61 Self {
62 heap: BinaryHeap::new(),
63 }
64 }
65
66 pub fn advance_clock(&mut self, clock: &mut TestClock, to_time_ns: UnixNanos, set_time: bool) {
68 let events = clock.advance_time(to_time_ns, set_time);
69 let handlers = clock.match_handlers(events);
70 for handler in handlers {
71 self.heap.push(ScheduledTimeEventHandler(handler));
72 }
73 }
74
75 #[must_use]
79 pub fn peek_next_time(&self) -> Option<UnixNanos> {
80 self.heap.peek().map(|h| h.0.event.ts_event)
81 }
82
83 pub fn pop_next_at_or_before(&mut self, ts: UnixNanos) -> Option<TimeEventHandler> {
87 if self.heap.peek().is_some_and(|h| h.0.event.ts_event <= ts) {
88 self.heap.pop().map(|h| h.0)
89 } else {
90 None
91 }
92 }
93
94 #[must_use]
96 pub fn is_empty(&self) -> bool {
97 self.heap.is_empty()
98 }
99
100 #[must_use]
102 pub fn len(&self) -> usize {
103 self.heap.len()
104 }
105
106 pub fn clear(&mut self) {
108 self.heap.clear();
109 }
110
111 pub fn drain(&mut self) -> Vec<TimeEventHandler> {
116 let mut handlers = Vec::with_capacity(self.heap.len());
117 while let Some(scheduled) = self.heap.pop() {
118 handlers.push(scheduled.0);
119 }
120 handlers
121 }
122}
123
124impl Default for TimeEventAccumulator {
125 fn default() -> Self {
127 Self::new()
128 }
129}
130
131#[cfg(all(test, feature = "python"))]
132mod tests {
133 use nautilus_common::timer::{TimeEvent, TimeEventCallback};
134 use nautilus_core::UUID4;
135 use pyo3::{Py, Python, prelude::*, types::PyList};
136 use rstest::*;
137 use ustr::Ustr;
138
139 use super::*;
140
141 #[rstest]
142 fn test_accumulator_pop_in_order() {
143 Python::initialize();
144 Python::attach(|py| {
145 let py_list = PyList::empty(py);
146 let py_append = Py::from(py_list.getattr("append").unwrap());
147
148 let mut accumulator = TimeEventAccumulator::new();
149
150 let time_event1 = TimeEvent::new(
151 Ustr::from("TEST_EVENT_1"),
152 UUID4::new(),
153 100.into(),
154 100.into(),
155 );
156 let time_event2 = TimeEvent::new(
157 Ustr::from("TEST_EVENT_2"),
158 UUID4::new(),
159 300.into(),
160 300.into(),
161 );
162 let time_event3 = TimeEvent::new(
163 Ustr::from("TEST_EVENT_3"),
164 UUID4::new(),
165 200.into(),
166 200.into(),
167 );
168
169 let callback = TimeEventCallback::from(py_append.into_any());
170
171 let handler1 = TimeEventHandler::new(time_event1.clone(), callback.clone());
172 let handler2 = TimeEventHandler::new(time_event2.clone(), callback.clone());
173 let handler3 = TimeEventHandler::new(time_event3.clone(), callback);
174
175 accumulator.heap.push(ScheduledTimeEventHandler(handler1));
176 accumulator.heap.push(ScheduledTimeEventHandler(handler2));
177 accumulator.heap.push(ScheduledTimeEventHandler(handler3));
178 assert_eq!(accumulator.len(), 3);
179
180 let popped1 = accumulator.pop_next_at_or_before(1000.into()).unwrap();
181 assert_eq!(popped1.event.ts_event, time_event1.ts_event);
182
183 let popped2 = accumulator.pop_next_at_or_before(1000.into()).unwrap();
184 assert_eq!(popped2.event.ts_event, time_event3.ts_event);
185
186 let popped3 = accumulator.pop_next_at_or_before(1000.into()).unwrap();
187 assert_eq!(popped3.event.ts_event, time_event2.ts_event);
188
189 assert!(accumulator.is_empty());
190 });
191 }
192
193 #[rstest]
194 fn test_accumulator_pop_respects_timestamp() {
195 Python::initialize();
196 Python::attach(|py| {
197 let py_list = PyList::empty(py);
198 let py_append = Py::from(py_list.getattr("append").unwrap());
199
200 let mut accumulator = TimeEventAccumulator::new();
201
202 let time_event1 = TimeEvent::new(
203 Ustr::from("TEST_EVENT_1"),
204 UUID4::new(),
205 100.into(),
206 100.into(),
207 );
208 let time_event2 = TimeEvent::new(
209 Ustr::from("TEST_EVENT_2"),
210 UUID4::new(),
211 300.into(),
212 300.into(),
213 );
214
215 let callback = TimeEventCallback::from(py_append.into_any());
216
217 accumulator
218 .heap
219 .push(ScheduledTimeEventHandler(TimeEventHandler::new(
220 time_event1.clone(),
221 callback.clone(),
222 )));
223 accumulator
224 .heap
225 .push(ScheduledTimeEventHandler(TimeEventHandler::new(
226 time_event2.clone(),
227 callback,
228 )));
229
230 let popped1 = accumulator.pop_next_at_or_before(200.into()).unwrap();
231 assert_eq!(popped1.event.ts_event, time_event1.ts_event);
232
233 assert!(accumulator.pop_next_at_or_before(200.into()).is_none());
235
236 let popped2 = accumulator.pop_next_at_or_before(300.into()).unwrap();
237 assert_eq!(popped2.event.ts_event, time_event2.ts_event);
238 });
239 }
240
241 #[rstest]
242 fn test_peek_next_time() {
243 Python::initialize();
244 Python::attach(|py| {
245 let py_list = PyList::empty(py);
246 let py_append = Py::from(py_list.getattr("append").unwrap());
247 let callback = TimeEventCallback::from(py_append.into_any());
248
249 let mut accumulator = TimeEventAccumulator::new();
250 assert!(accumulator.peek_next_time().is_none());
251
252 let time_event1 = TimeEvent::new(
253 Ustr::from("TEST_EVENT_1"),
254 UUID4::new(),
255 200.into(),
256 200.into(),
257 );
258 let time_event2 = TimeEvent::new(
259 Ustr::from("TEST_EVENT_2"),
260 UUID4::new(),
261 100.into(),
262 100.into(),
263 );
264
265 accumulator
266 .heap
267 .push(ScheduledTimeEventHandler(TimeEventHandler::new(
268 time_event1,
269 callback.clone(),
270 )));
271 assert_eq!(accumulator.peek_next_time(), Some(200.into()));
272
273 accumulator
274 .heap
275 .push(ScheduledTimeEventHandler(TimeEventHandler::new(
276 time_event2,
277 callback,
278 )));
279 assert_eq!(accumulator.peek_next_time(), Some(100.into()));
280 });
281 }
282
283 #[rstest]
284 fn test_drain_returns_in_order() {
285 Python::initialize();
286 Python::attach(|py| {
287 let py_list = PyList::empty(py);
288 let py_append = Py::from(py_list.getattr("append").unwrap());
289 let callback = TimeEventCallback::from(py_append.into_any());
290
291 let mut accumulator = TimeEventAccumulator::new();
292
293 for ts in [300u64, 100, 200] {
294 let event = TimeEvent::new(Ustr::from("TEST"), UUID4::new(), ts.into(), ts.into());
295 accumulator
296 .heap
297 .push(ScheduledTimeEventHandler(TimeEventHandler::new(
298 event,
299 callback.clone(),
300 )));
301 }
302
303 let handlers = accumulator.drain();
304
305 assert_eq!(handlers.len(), 3);
306 assert_eq!(handlers[0].event.ts_event.as_u64(), 100);
307 assert_eq!(handlers[1].event.ts_event.as_u64(), 200);
308 assert_eq!(handlers[2].event.ts_event.as_u64(), 300);
309 assert!(accumulator.is_empty());
310 });
311 }
312
313 #[rstest]
314 fn test_interleaved_push_pop_maintains_order() {
315 Python::initialize();
316 Python::attach(|py| {
317 let py_list = PyList::empty(py);
318 let py_append = Py::from(py_list.getattr("append").unwrap());
319 let callback = TimeEventCallback::from(py_append.into_any());
320
321 let mut accumulator = TimeEventAccumulator::new();
322 let mut popped_timestamps: Vec<u64> = Vec::new();
323
324 for ts in [100u64, 300] {
325 let event = TimeEvent::new(Ustr::from("TEST"), UUID4::new(), ts.into(), ts.into());
326 accumulator
327 .heap
328 .push(ScheduledTimeEventHandler(TimeEventHandler::new(
329 event,
330 callback.clone(),
331 )));
332 }
333
334 let handler = accumulator.pop_next_at_or_before(1000.into()).unwrap();
335 popped_timestamps.push(handler.event.ts_event.as_u64());
336
337 let event = TimeEvent::new(Ustr::from("NEW"), UUID4::new(), 150.into(), 150.into());
339 accumulator
340 .heap
341 .push(ScheduledTimeEventHandler(TimeEventHandler::new(
342 event, callback,
343 )));
344
345 while let Some(handler) = accumulator.pop_next_at_or_before(1000.into()) {
346 popped_timestamps.push(handler.event.ts_event.as_u64());
347 }
348
349 assert_eq!(popped_timestamps, vec![100, 150, 300]);
350 });
351 }
352
353 #[rstest]
354 fn test_same_timestamp_events() {
355 Python::initialize();
356 Python::attach(|py| {
357 let py_list = PyList::empty(py);
358 let py_append = Py::from(py_list.getattr("append").unwrap());
359 let callback = TimeEventCallback::from(py_append.into_any());
360
361 let mut accumulator = TimeEventAccumulator::new();
362
363 for i in 0..3 {
364 let event = TimeEvent::new(
365 Ustr::from(&format!("EVENT_{i}")),
366 UUID4::new(),
367 100.into(),
368 100.into(),
369 );
370 accumulator
371 .heap
372 .push(ScheduledTimeEventHandler(TimeEventHandler::new(
373 event,
374 callback.clone(),
375 )));
376 }
377
378 let mut count = 0;
379 while let Some(handler) = accumulator.pop_next_at_or_before(100.into()) {
380 assert_eq!(handler.event.ts_event.as_u64(), 100);
381 count += 1;
382 }
383 assert_eq!(count, 3);
384 });
385 }
386
387 #[rstest]
388 fn test_pop_at_exact_timestamp_boundary() {
389 Python::initialize();
390 Python::attach(|py| {
391 let py_list = PyList::empty(py);
392 let py_append = Py::from(py_list.getattr("append").unwrap());
393 let callback = TimeEventCallback::from(py_append.into_any());
394
395 let mut accumulator = TimeEventAccumulator::new();
396
397 let event = TimeEvent::new(Ustr::from("TEST"), UUID4::new(), 100.into(), 100.into());
398 accumulator
399 .heap
400 .push(ScheduledTimeEventHandler(TimeEventHandler::new(
401 event, callback,
402 )));
403
404 let handler = accumulator.pop_next_at_or_before(100.into());
405 assert!(handler.is_some());
406 assert_eq!(handler.unwrap().event.ts_event.as_u64(), 100);
407
408 let event2 = TimeEvent::new(Ustr::from("TEST2"), UUID4::new(), 200.into(), 200.into());
409 accumulator
410 .heap
411 .push(ScheduledTimeEventHandler(TimeEventHandler::new(
412 event2,
413 TimeEventCallback::from(
414 Py::from(py_list.getattr("append").unwrap()).into_any(),
415 ),
416 )));
417
418 assert!(accumulator.pop_next_at_or_before(199.into()).is_none());
419 assert!(accumulator.pop_next_at_or_before(200.into()).is_some());
420 });
421 }
422}