1use std::{
19 cmp::Ordering,
20 fmt::{Debug, Display},
21 num::NonZeroU64,
22 rc::Rc,
23 sync::{
24 Arc,
25 atomic::{self, AtomicU64},
26 },
27};
28
29use nautilus_core::{
30 UUID4, UnixNanos,
31 correctness::{FAILED, check_valid_string},
32 datetime::floor_to_nearest_microsecond,
33 time::get_atomic_clock_realtime,
34};
35#[cfg(feature = "python")]
36use pyo3::{PyObject, Python};
37use tokio::{
38 task::JoinHandle,
39 time::{Duration, Instant},
40};
41use ustr::Ustr;
42
43use crate::{runner::TimeEventSender, runtime::get_runtime};
44
45#[must_use]
51pub fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
52 NonZeroU64::new(std::cmp::max(interval_ns, 1)).expect("`interval_ns` must be positive")
53}
54
55#[repr(C)]
56#[derive(Clone, Debug, Eq)]
57#[cfg_attr(
58 feature = "python",
59 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
60)]
61pub struct TimeEvent {
66 pub name: Ustr,
68 pub event_id: UUID4,
70 pub ts_event: UnixNanos,
72 pub ts_init: UnixNanos,
74}
75
76impl PartialOrd for TimeEvent {
78 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
79 Some(self.cmp(other))
80 }
81}
82
83impl Ord for TimeEvent {
85 fn cmp(&self, other: &Self) -> Ordering {
86 other.ts_event.cmp(&self.ts_event)
87 }
88}
89
90impl TimeEvent {
91 #[must_use]
97 pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
98 Self {
99 name,
100 event_id,
101 ts_event,
102 ts_init,
103 }
104 }
105}
106
107impl Display for TimeEvent {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 write!(
110 f,
111 "{}(name={}, event_id={}, ts_event={}, ts_init={})",
112 stringify!(TimeEvent),
113 self.name,
114 self.event_id,
115 self.ts_event,
116 self.ts_init
117 )
118 }
119}
120
121impl PartialEq for TimeEvent {
122 fn eq(&self, other: &Self) -> bool {
123 self.event_id == other.event_id
124 }
125}
126
127pub type RustTimeEventCallback = dyn Fn(TimeEvent);
128
129pub enum TimeEventCallback {
130 #[cfg(feature = "python")]
131 Python(PyObject),
132 Rust(Rc<RustTimeEventCallback>),
133}
134
135impl Clone for TimeEventCallback {
136 fn clone(&self) -> Self {
137 match self {
138 #[cfg(feature = "python")]
139 Self::Python(obj) => Self::Python(nautilus_core::python::clone_py_object(obj)),
140 Self::Rust(cb) => Self::Rust(cb.clone()),
141 }
142 }
143}
144
145impl Debug for TimeEventCallback {
146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 match self {
148 #[cfg(feature = "python")]
149 Self::Python(_) => f.write_str("Python callback"),
150 Self::Rust(_) => f.write_str("Rust callback"),
151 }
152 }
153}
154
155impl TimeEventCallback {
156 pub fn call(&self, event: TimeEvent) {
162 match self {
163 #[cfg(feature = "python")]
164 Self::Python(callback) => {
165 Python::with_gil(|py| {
166 callback.call1(py, (event,)).unwrap();
167 });
168 }
169 Self::Rust(callback) => callback(event),
170 }
171 }
172}
173
174impl From<Rc<RustTimeEventCallback>> for TimeEventCallback {
175 fn from(value: Rc<RustTimeEventCallback>) -> Self {
176 Self::Rust(value)
177 }
178}
179
180#[cfg(feature = "python")]
181impl From<PyObject> for TimeEventCallback {
182 fn from(value: PyObject) -> Self {
183 Self::Python(value)
184 }
185}
186
187#[allow(unsafe_code)]
195unsafe impl Send for TimeEventCallback {}
196#[allow(unsafe_code)]
197unsafe impl Sync for TimeEventCallback {}
198
199#[repr(C)]
200#[derive(Clone, Debug)]
201pub struct TimeEventHandlerV2 {
206 pub event: TimeEvent,
208 pub callback: TimeEventCallback,
210}
211
212impl TimeEventHandlerV2 {
213 #[must_use]
215 pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
216 Self { event, callback }
217 }
218
219 pub fn run(self) {
225 let Self { event, callback } = self;
226 callback.call(event);
227 }
228}
229
230impl PartialOrd for TimeEventHandlerV2 {
231 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
232 Some(self.cmp(other))
233 }
234}
235
236impl PartialEq for TimeEventHandlerV2 {
237 fn eq(&self, other: &Self) -> bool {
238 self.event.ts_event == other.event.ts_event
239 }
240}
241
242impl Eq for TimeEventHandlerV2 {}
243
244impl Ord for TimeEventHandlerV2 {
245 fn cmp(&self, other: &Self) -> Ordering {
246 self.event.ts_event.cmp(&other.event.ts_event)
247 }
248}
249
250#[derive(Clone, Copy, Debug)]
255pub struct TestTimer {
256 pub name: Ustr,
258 pub interval_ns: NonZeroU64,
260 pub start_time_ns: UnixNanos,
262 pub stop_time_ns: Option<UnixNanos>,
264 pub fire_immediately: bool,
266 next_time_ns: UnixNanos,
267 is_expired: bool,
268}
269
270impl TestTimer {
271 #[must_use]
277 pub fn new(
278 name: Ustr,
279 interval_ns: NonZeroU64,
280 start_time_ns: UnixNanos,
281 stop_time_ns: Option<UnixNanos>,
282 fire_immediately: bool,
283 ) -> Self {
284 check_valid_string(name, stringify!(name)).expect(FAILED);
285
286 let next_time_ns = if fire_immediately {
287 start_time_ns
288 } else {
289 start_time_ns + interval_ns.get()
290 };
291
292 Self {
293 name,
294 interval_ns,
295 start_time_ns,
296 stop_time_ns,
297 fire_immediately,
298 next_time_ns,
299 is_expired: false,
300 }
301 }
302
303 #[must_use]
305 pub const fn next_time_ns(&self) -> UnixNanos {
306 self.next_time_ns
307 }
308
309 #[must_use]
311 pub const fn is_expired(&self) -> bool {
312 self.is_expired
313 }
314
315 #[must_use]
316 pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
317 TimeEvent {
318 name: self.name,
319 event_id,
320 ts_event: self.next_time_ns,
321 ts_init,
322 }
323 }
324
325 pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
331 let advances = if self.next_time_ns <= to_time_ns {
333 (to_time_ns.as_u64() - self.next_time_ns.as_u64()) / self.interval_ns.get() + 1
334 } else {
335 0
336 };
337 self.take(advances as usize).map(|(event, _)| event)
338 }
339
340 pub const fn cancel(&mut self) {
344 self.is_expired = true;
345 }
346}
347
348impl Iterator for TestTimer {
349 type Item = (TimeEvent, UnixNanos);
350
351 fn next(&mut self) -> Option<Self::Item> {
352 if self.is_expired {
353 None
354 } else {
355 if let Some(stop_time_ns) = self.stop_time_ns
357 && self.next_time_ns > stop_time_ns
358 {
359 self.is_expired = true;
360 return None;
361 }
362
363 let item = (
364 TimeEvent {
365 name: self.name,
366 event_id: UUID4::new(),
367 ts_event: self.next_time_ns,
368 ts_init: self.next_time_ns,
369 },
370 self.next_time_ns,
371 );
372
373 if let Some(stop_time_ns) = self.stop_time_ns
375 && self.next_time_ns == stop_time_ns
376 {
377 self.is_expired = true;
378 }
379
380 self.next_time_ns += self.interval_ns;
381
382 Some(item)
383 }
384 }
385}
386
387#[derive(Debug)]
392pub struct LiveTimer {
393 pub name: Ustr,
395 pub interval_ns: NonZeroU64,
397 pub start_time_ns: UnixNanos,
399 pub stop_time_ns: Option<UnixNanos>,
401 pub fire_immediately: bool,
403 next_time_ns: Arc<AtomicU64>,
404 callback: TimeEventCallback,
405 task_handle: Option<JoinHandle<()>>,
406 sender: Option<Arc<dyn TimeEventSender>>,
407}
408
409impl LiveTimer {
410 #[allow(clippy::too_many_arguments)]
416 #[must_use]
417 pub fn new(
418 name: Ustr,
419 interval_ns: NonZeroU64,
420 start_time_ns: UnixNanos,
421 stop_time_ns: Option<UnixNanos>,
422 callback: TimeEventCallback,
423 fire_immediately: bool,
424 sender: Option<Arc<dyn TimeEventSender>>,
425 ) -> Self {
426 check_valid_string(name, stringify!(name)).expect(FAILED);
427
428 let next_time_ns = if fire_immediately {
429 start_time_ns.as_u64()
430 } else {
431 start_time_ns.as_u64() + interval_ns.get()
432 };
433
434 log::debug!("Creating timer '{name}'");
435
436 Self {
437 name,
438 interval_ns,
439 start_time_ns,
440 stop_time_ns,
441 fire_immediately,
442 next_time_ns: Arc::new(AtomicU64::new(next_time_ns)),
443 callback,
444 task_handle: None,
445 sender,
446 }
447 }
448
449 #[must_use]
453 pub fn next_time_ns(&self) -> UnixNanos {
454 UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
455 }
456
457 #[must_use]
462 pub fn is_expired(&self) -> bool {
463 self.task_handle
464 .as_ref()
465 .is_some_and(tokio::task::JoinHandle::is_finished)
466 }
467
468 #[allow(unused_variables)] pub fn start(&mut self) {
478 let event_name = self.name;
479 let stop_time_ns = self.stop_time_ns;
480 let interval_ns = self.interval_ns.get();
481 let callback = self.callback.clone();
482
483 let clock = get_atomic_clock_realtime();
485 let now_ns = clock.get_time_ns();
486
487 let mut next_time_ns = self.next_time_ns.load(atomic::Ordering::SeqCst);
489 if next_time_ns <= now_ns {
490 log::warn!(
491 "Timer '{event_name}' alert time {next_time_ns} was in the past, adjusted to current time for immediate fire"
492 );
493 next_time_ns = now_ns.into();
494 self.next_time_ns
495 .store(now_ns.as_u64(), atomic::Ordering::SeqCst);
496 }
497
498 let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(next_time_ns));
500 let next_time_atomic = self.next_time_ns.clone();
501
502 let sender = self.sender.clone();
503
504 let rt = get_runtime();
505 let handle = rt.spawn(async move {
506 let clock = get_atomic_clock_realtime();
507
508 let overhead = Duration::from_millis(1);
510 let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
511 let delay = Duration::from_nanos(delay_ns).saturating_sub(overhead);
512 let start = Instant::now() + delay;
513
514 let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
515
516 loop {
517 timer.tick().await;
520 let now_ns = clock.get_time_ns();
521
522 let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
523
524 match callback {
525 #[cfg(feature = "python")]
526 TimeEventCallback::Python(ref callback) => {
527 call_python_with_time_event(event, callback);
528 }
529 TimeEventCallback::Rust(_) => {
530 let sender = sender
531 .as_ref()
532 .expect("timer event sender was unset for Rust callback system");
533 let handler = TimeEventHandlerV2::new(event, callback.clone());
534 sender.send(handler);
535 }
536 }
537
538 next_time_ns += interval_ns;
540 next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
541
542 if let Some(stop_time_ns) = stop_time_ns
544 && std::cmp::max(next_time_ns, now_ns) >= stop_time_ns
545 {
546 break; }
548 }
549 });
550
551 self.task_handle = Some(handle);
552 }
553
554 pub fn cancel(&mut self) {
558 log::debug!("Cancel timer '{}'", self.name);
559 if let Some(ref handle) = self.task_handle {
560 handle.abort();
561 }
562 }
563}
564
565#[cfg(feature = "python")]
566fn call_python_with_time_event(event: TimeEvent, callback: &PyObject) {
567 use nautilus_core::python::IntoPyObjectNautilusExt;
568 use pyo3::types::PyCapsule;
569
570 Python::with_gil(|py| {
571 let capsule: PyObject = PyCapsule::new_with_destructor(py, event, None, |_, _| {})
579 .expect("Error creating `PyCapsule`")
580 .into_py_any_unwrap(py);
581
582 match callback.call1(py, (capsule,)) {
583 Ok(_) => {}
584 Err(e) => tracing::error!("Error on callback: {e:?}"),
585 }
586 });
587}
588
589#[cfg(test)]
593mod tests {
594 use std::{num::NonZeroU64, rc::Rc};
595
596 use nautilus_core::UnixNanos;
597 use rstest::*;
598 use ustr::Ustr;
599
600 use super::{LiveTimer, TestTimer, TimeEvent, TimeEventCallback};
601
602 #[rstest]
603 fn test_test_timer_pop_event() {
604 let mut timer = TestTimer::new(
605 Ustr::from("TEST_TIMER"),
606 NonZeroU64::new(1).unwrap(),
607 UnixNanos::from(1),
608 None,
609 false,
610 );
611
612 assert!(timer.next().is_some());
613 assert!(timer.next().is_some());
614 timer.is_expired = true;
615 assert!(timer.next().is_none());
616 }
617
618 #[rstest]
619 fn test_test_timer_advance_within_next_time_ns() {
620 let mut timer = TestTimer::new(
621 Ustr::from("TEST_TIMER"),
622 NonZeroU64::new(5).unwrap(),
623 UnixNanos::default(),
624 None,
625 false,
626 );
627 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
628 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
629 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
630 assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
631 assert_eq!(timer.next_time_ns, 5);
632 assert!(!timer.is_expired);
633 }
634
635 #[rstest]
636 fn test_test_timer_advance_up_to_next_time_ns() {
637 let mut timer = TestTimer::new(
638 Ustr::from("TEST_TIMER"),
639 NonZeroU64::new(1).unwrap(),
640 UnixNanos::default(),
641 None,
642 false,
643 );
644 assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
645 assert!(!timer.is_expired);
646 }
647
648 #[rstest]
649 fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
650 let mut timer = TestTimer::new(
651 Ustr::from("TEST_TIMER"),
652 NonZeroU64::new(1).unwrap(),
653 UnixNanos::default(),
654 Some(UnixNanos::from(2)),
655 false,
656 );
657 assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
658 assert!(timer.is_expired);
659 }
660
661 #[rstest]
662 fn test_test_timer_advance_beyond_next_time_ns() {
663 let mut timer = TestTimer::new(
664 Ustr::from("TEST_TIMER"),
665 NonZeroU64::new(1).unwrap(),
666 UnixNanos::default(),
667 Some(UnixNanos::from(5)),
668 false,
669 );
670 assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
671 assert!(timer.is_expired);
672 }
673
674 #[rstest]
675 fn test_test_timer_advance_beyond_stop_time() {
676 let mut timer = TestTimer::new(
677 Ustr::from("TEST_TIMER"),
678 NonZeroU64::new(1).unwrap(),
679 UnixNanos::default(),
680 Some(UnixNanos::from(5)),
681 false,
682 );
683 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
684 assert!(timer.is_expired);
685 }
686
687 #[rstest]
688 fn test_test_timer_advance_exact_boundary() {
689 let mut timer = TestTimer::new(
690 Ustr::from("TEST_TIMER"),
691 NonZeroU64::new(5).unwrap(),
692 UnixNanos::from(0),
693 None,
694 false,
695 );
696 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(5)).collect();
697 assert_eq!(events.len(), 1, "Expected one event at the 5 ns boundary");
698
699 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
700 assert_eq!(events.len(), 1, "Expected one event at the 10 ns boundary");
701 }
702
703 #[rstest]
704 fn test_test_timer_fire_immediately_true() {
705 let mut timer = TestTimer::new(
706 Ustr::from("TEST_TIMER"),
707 NonZeroU64::new(5).unwrap(),
708 UnixNanos::from(10),
709 None,
710 true, );
712
713 assert_eq!(timer.next_time_ns(), UnixNanos::from(10));
715
716 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
718 assert_eq!(events.len(), 1);
719 assert_eq!(events[0].ts_event, UnixNanos::from(10));
720
721 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
723 }
724
725 #[rstest]
726 fn test_test_timer_fire_immediately_false() {
727 let mut timer = TestTimer::new(
728 Ustr::from("TEST_TIMER"),
729 NonZeroU64::new(5).unwrap(),
730 UnixNanos::from(10),
731 None,
732 false, );
734
735 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
737
738 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
740 assert_eq!(events.len(), 0);
741
742 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(15)).collect();
744 assert_eq!(events.len(), 1);
745 assert_eq!(events[0].ts_event, UnixNanos::from(15));
746 }
747
748 #[rstest]
749 fn test_live_timer_fire_immediately_field() {
750 let timer = LiveTimer::new(
751 Ustr::from("TEST_TIMER"),
752 NonZeroU64::new(1000).unwrap(),
753 UnixNanos::from(100),
754 None,
755 TimeEventCallback::Rust(Rc::new(|_| {})),
756 true, None, );
759
760 assert!(timer.fire_immediately);
762
763 assert_eq!(timer.next_time_ns(), UnixNanos::from(100));
765 }
766
767 #[rstest]
768 fn test_live_timer_fire_immediately_false_field() {
769 let timer = LiveTimer::new(
770 Ustr::from("TEST_TIMER"),
771 NonZeroU64::new(1000).unwrap(),
772 UnixNanos::from(100),
773 None,
774 TimeEventCallback::Rust(Rc::new(|_| {})),
775 false, None, );
778
779 assert!(!timer.fire_immediately);
781
782 assert_eq!(timer.next_time_ns(), UnixNanos::from(1100));
784 }
785
786 use proptest::prelude::*;
791
792 #[derive(Clone, Debug)]
793 enum TimerOperation {
794 AdvanceTime(u64),
795 Cancel,
796 }
797
798 fn timer_operation_strategy() -> impl Strategy<Value = TimerOperation> {
799 prop_oneof![
800 8 => prop::num::u64::ANY.prop_map(|v| TimerOperation::AdvanceTime(v % 1000 + 1)),
801 2 => Just(TimerOperation::Cancel),
802 ]
803 }
804
805 fn timer_config_strategy() -> impl Strategy<Value = (u64, u64, Option<u64>, bool)> {
806 (
807 1u64..=100u64, 0u64..=50u64, prop::option::of(51u64..=200u64), prop::bool::ANY, )
812 }
813
814 fn timer_test_strategy()
815 -> impl Strategy<Value = (Vec<TimerOperation>, (u64, u64, Option<u64>, bool))> {
816 (
817 prop::collection::vec(timer_operation_strategy(), 5..=50),
818 timer_config_strategy(),
819 )
820 }
821
822 fn test_timer_with_operations(
823 operations: Vec<TimerOperation>,
824 (interval_ns, start_time_ns, stop_time_ns, fire_immediately): (u64, u64, Option<u64>, bool),
825 ) {
826 let mut timer = TestTimer::new(
827 Ustr::from("PROP_TEST_TIMER"),
828 NonZeroU64::new(interval_ns).unwrap(),
829 UnixNanos::from(start_time_ns),
830 stop_time_ns.map(UnixNanos::from),
831 fire_immediately,
832 );
833
834 let mut current_time = start_time_ns;
835
836 for operation in operations {
837 if timer.is_expired() {
838 break;
839 }
840
841 match operation {
842 TimerOperation::AdvanceTime(delta) => {
843 let to_time = current_time + delta;
844 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(to_time)).collect();
845 current_time = to_time;
846
847 for (i, event) in events.iter().enumerate() {
849 if i > 0 {
851 assert!(
852 event.ts_event >= events[i - 1].ts_event,
853 "Events should be in chronological order"
854 );
855 }
856
857 assert!(
859 event.ts_event.as_u64() >= start_time_ns,
860 "Event timestamp should not be before start time"
861 );
862
863 assert!(
864 event.ts_event.as_u64() <= to_time,
865 "Event timestamp should not be after advance time"
866 );
867
868 if let Some(stop_time_ns) = stop_time_ns {
870 assert!(
871 event.ts_event.as_u64() <= stop_time_ns,
872 "Event timestamp should not exceed stop time"
873 );
874 }
875 }
876 }
877 TimerOperation::Cancel => {
878 timer.cancel();
879 assert!(timer.is_expired(), "Timer should be expired after cancel");
880 }
881 }
882
883 if !timer.is_expired() {
885 let expected_interval_multiple = if fire_immediately {
887 timer.next_time_ns().as_u64() >= start_time_ns
888 } else {
889 timer.next_time_ns().as_u64() >= start_time_ns + interval_ns
890 };
891 assert!(
892 expected_interval_multiple,
893 "Next time should respect interval spacing"
894 );
895
896 if let Some(stop_time_ns) = stop_time_ns
899 && timer.next_time_ns().as_u64() > stop_time_ns
900 {
901 let mut test_timer = timer;
903 let events: Vec<TimeEvent> = test_timer
904 .advance(UnixNanos::from(stop_time_ns + 1))
905 .collect();
906 assert!(
907 events.is_empty() || test_timer.is_expired(),
908 "Timer should not generate events beyond stop time"
909 );
910 }
911 }
912 }
913
914 if !timer.is_expired()
917 && let Some(stop_time_ns) = stop_time_ns
918 {
919 let events: Vec<TimeEvent> = timer
920 .advance(UnixNanos::from(stop_time_ns + 1000))
921 .collect();
922 assert!(
923 timer.is_expired() || events.is_empty(),
924 "Timer should eventually expire or stop generating events"
925 );
926 }
927 }
928
929 proptest! {
930 #[rstest]
931 fn prop_timer_advance_operations((operations, config) in timer_test_strategy()) {
932 test_timer_with_operations(operations, config);
933 }
934
935 #[rstest]
936 fn prop_timer_interval_consistency(
937 interval_ns in 1u64..=100u64,
938 start_time_ns in 0u64..=50u64,
939 fire_immediately in prop::bool::ANY,
940 advance_count in 1usize..=20usize,
941 ) {
942 let mut timer = TestTimer::new(
943 Ustr::from("CONSISTENCY_TEST"),
944 NonZeroU64::new(interval_ns).unwrap(),
945 UnixNanos::from(start_time_ns),
946 None, fire_immediately,
948 );
949
950 let mut previous_event_time = if fire_immediately { start_time_ns } else { start_time_ns + interval_ns };
951
952 for _ in 0..advance_count {
953 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(previous_event_time)).collect();
954
955 if !events.is_empty() {
956 prop_assert_eq!(events.len(), 1);
958 prop_assert_eq!(events[0].ts_event.as_u64(), previous_event_time);
959 }
960
961 previous_event_time += interval_ns;
962 }
963 }
964 }
965}