nautilus_backtest/
accumulator.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cmp::Ordering, collections::BinaryHeap};
17
18use nautilus_common::{clock::TestClock, timer::TimeEventHandler};
19use nautilus_core::UnixNanos;
20
21/// Wrapper for heap ordering (earlier timestamps = higher priority).
22///
23/// Uses reverse ordering so the BinaryHeap (max-heap) behaves as a min-heap.
24#[derive(Clone, Debug)]
25pub struct ScheduledTimeEventHandler(pub TimeEventHandler);
26
27impl PartialEq for ScheduledTimeEventHandler {
28    fn eq(&self, other: &Self) -> bool {
29        self.0.event.ts_event == other.0.event.ts_event
30    }
31}
32
33impl Eq for ScheduledTimeEventHandler {}
34
35impl PartialOrd for ScheduledTimeEventHandler {
36    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
37        Some(self.cmp(other))
38    }
39}
40
41impl Ord for ScheduledTimeEventHandler {
42    fn cmp(&self, other: &Self) -> Ordering {
43        // Reverse ordering for min-heap behavior (earlier timestamps = higher priority)
44        other.0.event.ts_event.cmp(&self.0.event.ts_event)
45    }
46}
47
48/// Provides a means of accumulating and draining time event handlers using a priority queue.
49///
50/// Events are maintained in timestamp order using a binary heap, allowing efficient
51/// retrieval of the next event to process.
52#[derive(Debug)]
53pub struct TimeEventAccumulator {
54    heap: BinaryHeap<ScheduledTimeEventHandler>,
55}
56
57impl TimeEventAccumulator {
58    /// Creates a new [`TimeEventAccumulator`] instance.
59    #[must_use]
60    pub fn new() -> Self {
61        Self {
62            heap: BinaryHeap::new(),
63        }
64    }
65
66    /// Advance the given clock to the `to_time_ns` and push events to the heap.
67    pub fn advance_clock(&mut self, clock: &mut TestClock, to_time_ns: UnixNanos, set_time: bool) {
68        let events = clock.advance_time(to_time_ns, set_time);
69        let handlers = clock.match_handlers(events);
70        for handler in handlers {
71            self.heap.push(ScheduledTimeEventHandler(handler));
72        }
73    }
74
75    /// Peek at the next event timestamp without removing it.
76    ///
77    /// Returns `None` if the heap is empty.
78    #[must_use]
79    pub fn peek_next_time(&self) -> Option<UnixNanos> {
80        self.heap.peek().map(|h| h.0.event.ts_event)
81    }
82
83    /// Pop the next event if its timestamp is at or before `ts`.
84    ///
85    /// Returns `None` if the heap is empty or the next event is after `ts`.
86    pub fn pop_next_at_or_before(&mut self, ts: UnixNanos) -> Option<TimeEventHandler> {
87        if self.heap.peek().is_some_and(|h| h.0.event.ts_event <= ts) {
88            self.heap.pop().map(|h| h.0)
89        } else {
90            None
91        }
92    }
93
94    /// Check if the heap is empty.
95    #[must_use]
96    pub fn is_empty(&self) -> bool {
97        self.heap.is_empty()
98    }
99
100    /// Get the number of events in the heap.
101    #[must_use]
102    pub fn len(&self) -> usize {
103        self.heap.len()
104    }
105
106    /// Clear all events from the heap.
107    pub fn clear(&mut self) {
108        self.heap.clear();
109    }
110
111    /// Drain all events from the heap in timestamp order.
112    ///
113    /// This is provided for backwards compatibility with code that expects
114    /// batch processing. For iterative processing, prefer `pop_next_at_or_before`.
115    pub fn drain(&mut self) -> Vec<TimeEventHandler> {
116        let mut handlers = Vec::with_capacity(self.heap.len());
117        while let Some(scheduled) = self.heap.pop() {
118            handlers.push(scheduled.0);
119        }
120        handlers
121    }
122}
123
124impl Default for TimeEventAccumulator {
125    /// Creates a new default [`TimeEventAccumulator`] instance.
126    fn default() -> Self {
127        Self::new()
128    }
129}
130
131#[cfg(all(test, feature = "python"))]
132mod tests {
133    use nautilus_common::timer::{TimeEvent, TimeEventCallback};
134    use nautilus_core::UUID4;
135    use pyo3::{Py, Python, prelude::*, types::PyList};
136    use rstest::*;
137    use ustr::Ustr;
138
139    use super::*;
140
141    #[rstest]
142    fn test_accumulator_pop_in_order() {
143        Python::initialize();
144        Python::attach(|py| {
145            let py_list = PyList::empty(py);
146            let py_append = Py::from(py_list.getattr("append").unwrap());
147
148            let mut accumulator = TimeEventAccumulator::new();
149
150            let time_event1 = TimeEvent::new(
151                Ustr::from("TEST_EVENT_1"),
152                UUID4::new(),
153                100.into(),
154                100.into(),
155            );
156            let time_event2 = TimeEvent::new(
157                Ustr::from("TEST_EVENT_2"),
158                UUID4::new(),
159                300.into(),
160                300.into(),
161            );
162            let time_event3 = TimeEvent::new(
163                Ustr::from("TEST_EVENT_3"),
164                UUID4::new(),
165                200.into(),
166                200.into(),
167            );
168
169            let callback = TimeEventCallback::from(py_append.into_any());
170
171            let handler1 = TimeEventHandler::new(time_event1.clone(), callback.clone());
172            let handler2 = TimeEventHandler::new(time_event2.clone(), callback.clone());
173            let handler3 = TimeEventHandler::new(time_event3.clone(), callback);
174
175            accumulator.heap.push(ScheduledTimeEventHandler(handler1));
176            accumulator.heap.push(ScheduledTimeEventHandler(handler2));
177            accumulator.heap.push(ScheduledTimeEventHandler(handler3));
178            assert_eq!(accumulator.len(), 3);
179
180            let popped1 = accumulator.pop_next_at_or_before(1000.into()).unwrap();
181            assert_eq!(popped1.event.ts_event, time_event1.ts_event);
182
183            let popped2 = accumulator.pop_next_at_or_before(1000.into()).unwrap();
184            assert_eq!(popped2.event.ts_event, time_event3.ts_event);
185
186            let popped3 = accumulator.pop_next_at_or_before(1000.into()).unwrap();
187            assert_eq!(popped3.event.ts_event, time_event2.ts_event);
188
189            assert!(accumulator.is_empty());
190        });
191    }
192
193    #[rstest]
194    fn test_accumulator_pop_respects_timestamp() {
195        Python::initialize();
196        Python::attach(|py| {
197            let py_list = PyList::empty(py);
198            let py_append = Py::from(py_list.getattr("append").unwrap());
199
200            let mut accumulator = TimeEventAccumulator::new();
201
202            let time_event1 = TimeEvent::new(
203                Ustr::from("TEST_EVENT_1"),
204                UUID4::new(),
205                100.into(),
206                100.into(),
207            );
208            let time_event2 = TimeEvent::new(
209                Ustr::from("TEST_EVENT_2"),
210                UUID4::new(),
211                300.into(),
212                300.into(),
213            );
214
215            let callback = TimeEventCallback::from(py_append.into_any());
216
217            accumulator
218                .heap
219                .push(ScheduledTimeEventHandler(TimeEventHandler::new(
220                    time_event1.clone(),
221                    callback.clone(),
222                )));
223            accumulator
224                .heap
225                .push(ScheduledTimeEventHandler(TimeEventHandler::new(
226                    time_event2.clone(),
227                    callback,
228                )));
229
230            let popped1 = accumulator.pop_next_at_or_before(200.into()).unwrap();
231            assert_eq!(popped1.event.ts_event, time_event1.ts_event);
232
233            // Event at 300 should not be returned with ts=200
234            assert!(accumulator.pop_next_at_or_before(200.into()).is_none());
235
236            let popped2 = accumulator.pop_next_at_or_before(300.into()).unwrap();
237            assert_eq!(popped2.event.ts_event, time_event2.ts_event);
238        });
239    }
240
241    #[rstest]
242    fn test_peek_next_time() {
243        Python::initialize();
244        Python::attach(|py| {
245            let py_list = PyList::empty(py);
246            let py_append = Py::from(py_list.getattr("append").unwrap());
247            let callback = TimeEventCallback::from(py_append.into_any());
248
249            let mut accumulator = TimeEventAccumulator::new();
250            assert!(accumulator.peek_next_time().is_none());
251
252            let time_event1 = TimeEvent::new(
253                Ustr::from("TEST_EVENT_1"),
254                UUID4::new(),
255                200.into(),
256                200.into(),
257            );
258            let time_event2 = TimeEvent::new(
259                Ustr::from("TEST_EVENT_2"),
260                UUID4::new(),
261                100.into(),
262                100.into(),
263            );
264
265            accumulator
266                .heap
267                .push(ScheduledTimeEventHandler(TimeEventHandler::new(
268                    time_event1,
269                    callback.clone(),
270                )));
271            assert_eq!(accumulator.peek_next_time(), Some(200.into()));
272
273            accumulator
274                .heap
275                .push(ScheduledTimeEventHandler(TimeEventHandler::new(
276                    time_event2,
277                    callback,
278                )));
279            assert_eq!(accumulator.peek_next_time(), Some(100.into()));
280        });
281    }
282
283    #[rstest]
284    fn test_drain_returns_in_order() {
285        Python::initialize();
286        Python::attach(|py| {
287            let py_list = PyList::empty(py);
288            let py_append = Py::from(py_list.getattr("append").unwrap());
289            let callback = TimeEventCallback::from(py_append.into_any());
290
291            let mut accumulator = TimeEventAccumulator::new();
292
293            for ts in [300u64, 100, 200] {
294                let event = TimeEvent::new(Ustr::from("TEST"), UUID4::new(), ts.into(), ts.into());
295                accumulator
296                    .heap
297                    .push(ScheduledTimeEventHandler(TimeEventHandler::new(
298                        event,
299                        callback.clone(),
300                    )));
301            }
302
303            let handlers = accumulator.drain();
304
305            assert_eq!(handlers.len(), 3);
306            assert_eq!(handlers[0].event.ts_event.as_u64(), 100);
307            assert_eq!(handlers[1].event.ts_event.as_u64(), 200);
308            assert_eq!(handlers[2].event.ts_event.as_u64(), 300);
309            assert!(accumulator.is_empty());
310        });
311    }
312
313    #[rstest]
314    fn test_interleaved_push_pop_maintains_order() {
315        Python::initialize();
316        Python::attach(|py| {
317            let py_list = PyList::empty(py);
318            let py_append = Py::from(py_list.getattr("append").unwrap());
319            let callback = TimeEventCallback::from(py_append.into_any());
320
321            let mut accumulator = TimeEventAccumulator::new();
322            let mut popped_timestamps: Vec<u64> = Vec::new();
323
324            for ts in [100u64, 300] {
325                let event = TimeEvent::new(Ustr::from("TEST"), UUID4::new(), ts.into(), ts.into());
326                accumulator
327                    .heap
328                    .push(ScheduledTimeEventHandler(TimeEventHandler::new(
329                        event,
330                        callback.clone(),
331                    )));
332            }
333
334            let handler = accumulator.pop_next_at_or_before(1000.into()).unwrap();
335            popped_timestamps.push(handler.event.ts_event.as_u64());
336
337            // Simulate callback scheduling new event at 150 (between popped 100 and pending 300)
338            let event = TimeEvent::new(Ustr::from("NEW"), UUID4::new(), 150.into(), 150.into());
339            accumulator
340                .heap
341                .push(ScheduledTimeEventHandler(TimeEventHandler::new(
342                    event, callback,
343                )));
344
345            while let Some(handler) = accumulator.pop_next_at_or_before(1000.into()) {
346                popped_timestamps.push(handler.event.ts_event.as_u64());
347            }
348
349            assert_eq!(popped_timestamps, vec![100, 150, 300]);
350        });
351    }
352
353    #[rstest]
354    fn test_same_timestamp_events() {
355        Python::initialize();
356        Python::attach(|py| {
357            let py_list = PyList::empty(py);
358            let py_append = Py::from(py_list.getattr("append").unwrap());
359            let callback = TimeEventCallback::from(py_append.into_any());
360
361            let mut accumulator = TimeEventAccumulator::new();
362
363            for i in 0..3 {
364                let event = TimeEvent::new(
365                    Ustr::from(&format!("EVENT_{i}")),
366                    UUID4::new(),
367                    100.into(),
368                    100.into(),
369                );
370                accumulator
371                    .heap
372                    .push(ScheduledTimeEventHandler(TimeEventHandler::new(
373                        event,
374                        callback.clone(),
375                    )));
376            }
377
378            let mut count = 0;
379            while let Some(handler) = accumulator.pop_next_at_or_before(100.into()) {
380                assert_eq!(handler.event.ts_event.as_u64(), 100);
381                count += 1;
382            }
383            assert_eq!(count, 3);
384        });
385    }
386
387    #[rstest]
388    fn test_pop_at_exact_timestamp_boundary() {
389        Python::initialize();
390        Python::attach(|py| {
391            let py_list = PyList::empty(py);
392            let py_append = Py::from(py_list.getattr("append").unwrap());
393            let callback = TimeEventCallback::from(py_append.into_any());
394
395            let mut accumulator = TimeEventAccumulator::new();
396
397            let event = TimeEvent::new(Ustr::from("TEST"), UUID4::new(), 100.into(), 100.into());
398            accumulator
399                .heap
400                .push(ScheduledTimeEventHandler(TimeEventHandler::new(
401                    event, callback,
402                )));
403
404            let handler = accumulator.pop_next_at_or_before(100.into());
405            assert!(handler.is_some());
406            assert_eq!(handler.unwrap().event.ts_event.as_u64(), 100);
407
408            let event2 = TimeEvent::new(Ustr::from("TEST2"), UUID4::new(), 200.into(), 200.into());
409            accumulator
410                .heap
411                .push(ScheduledTimeEventHandler(TimeEventHandler::new(
412                    event2,
413                    TimeEventCallback::from(
414                        Py::from(py_list.getattr("append").unwrap()).into_any(),
415                    ),
416                )));
417
418            assert!(accumulator.pop_next_at_or_before(199.into()).is_none());
419            assert!(accumulator.pop_next_at_or_before(200.into()).is_some());
420        });
421    }
422}