1use std::{
19 cmp::Ordering,
20 fmt::{Debug, Display},
21 num::NonZeroU64,
22 rc::Rc,
23 sync::{
24 Arc,
25 atomic::{self, AtomicU64},
26 },
27};
28
29use nautilus_core::{
30 UUID4, UnixNanos,
31 correctness::{FAILED, check_valid_string_ascii},
32 datetime::floor_to_nearest_microsecond,
33 time::get_atomic_clock_realtime,
34};
35#[cfg(feature = "python")]
36use pyo3::{Py, PyAny, Python};
37use tokio::{
38 task::JoinHandle,
39 time::{Duration, Instant},
40};
41use ustr::Ustr;
42
43use crate::{runner::TimeEventSender, runtime::get_runtime};
44
45#[must_use]
51pub fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
52 NonZeroU64::new(std::cmp::max(interval_ns, 1)).expect("`interval_ns` must be positive")
53}
54
55#[repr(C)]
56#[derive(Clone, Debug, PartialEq, Eq)]
57#[cfg_attr(
58 feature = "python",
59 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
60)]
61pub struct TimeEvent {
66 pub name: Ustr,
68 pub event_id: UUID4,
70 pub ts_event: UnixNanos,
72 pub ts_init: UnixNanos,
74}
75
76impl TimeEvent {
77 #[must_use]
83 pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
84 Self {
85 name,
86 event_id,
87 ts_event,
88 ts_init,
89 }
90 }
91}
92
93impl Display for TimeEvent {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 write!(
96 f,
97 "{}(name={}, event_id={}, ts_event={}, ts_init={})",
98 stringify!(TimeEvent),
99 self.name,
100 self.event_id,
101 self.ts_event,
102 self.ts_init
103 )
104 }
105}
106
107#[repr(transparent)] #[derive(Clone, Debug, PartialEq, Eq)]
114pub struct ScheduledTimeEvent(pub TimeEvent);
115
116impl ScheduledTimeEvent {
117 #[must_use]
119 pub const fn new(event: TimeEvent) -> Self {
120 Self(event)
121 }
122
123 #[must_use]
125 pub fn into_inner(self) -> TimeEvent {
126 self.0
127 }
128}
129
130impl PartialOrd for ScheduledTimeEvent {
131 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
132 Some(self.cmp(other))
133 }
134}
135
136impl Ord for ScheduledTimeEvent {
137 fn cmp(&self, other: &Self) -> Ordering {
138 other.0.ts_event.cmp(&self.0.ts_event)
140 }
141}
142
143pub enum TimeEventCallback {
145 #[cfg(feature = "python")]
146 Python(Py<PyAny>),
147 Rust(Rc<dyn Fn(TimeEvent)>),
148}
149
150impl Clone for TimeEventCallback {
151 fn clone(&self) -> Self {
152 match self {
153 #[cfg(feature = "python")]
154 Self::Python(obj) => Self::Python(nautilus_core::python::clone_py_object(obj)),
155 Self::Rust(cb) => Self::Rust(cb.clone()),
156 }
157 }
158}
159
160impl Debug for TimeEventCallback {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 match self {
163 #[cfg(feature = "python")]
164 Self::Python(_) => f.write_str("Python callback"),
165 Self::Rust(_) => f.write_str("Rust callback"),
166 }
167 }
168}
169
170impl TimeEventCallback {
171 pub fn call(&self, event: TimeEvent) {
177 match self {
178 #[cfg(feature = "python")]
179 Self::Python(callback) => {
180 Python::attach(|py| {
181 callback.call1(py, (event,)).unwrap();
182 });
183 }
184 Self::Rust(callback) => callback(event),
185 }
186 }
187}
188
189impl<F> From<F> for TimeEventCallback
190where
191 F: Fn(TimeEvent) + 'static,
192{
193 fn from(value: F) -> Self {
194 Self::Rust(Rc::new(value))
195 }
196}
197
198impl From<Rc<dyn Fn(TimeEvent)>> for TimeEventCallback {
199 fn from(value: Rc<dyn Fn(TimeEvent)>) -> Self {
200 Self::Rust(value)
201 }
202}
203
204#[cfg(feature = "python")]
205impl From<Py<PyAny>> for TimeEventCallback {
206 fn from(value: Py<PyAny>) -> Self {
207 Self::Python(value)
208 }
209}
210
211#[allow(unsafe_code)]
219unsafe impl Send for TimeEventCallback {}
220#[allow(unsafe_code)]
221unsafe impl Sync for TimeEventCallback {}
222
223#[repr(C)]
224#[derive(Clone, Debug)]
225pub struct TimeEventHandlerV2 {
230 pub event: TimeEvent,
232 pub callback: TimeEventCallback,
234}
235
236impl TimeEventHandlerV2 {
237 #[must_use]
239 pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
240 Self { event, callback }
241 }
242
243 pub fn run(self) {
249 let Self { event, callback } = self;
250 callback.call(event);
251 }
252}
253
254impl PartialOrd for TimeEventHandlerV2 {
255 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
256 Some(self.cmp(other))
257 }
258}
259
260impl PartialEq for TimeEventHandlerV2 {
261 fn eq(&self, other: &Self) -> bool {
262 self.event.ts_event == other.event.ts_event
263 }
264}
265
266impl Eq for TimeEventHandlerV2 {}
267
268impl Ord for TimeEventHandlerV2 {
269 fn cmp(&self, other: &Self) -> Ordering {
270 self.event.ts_event.cmp(&other.event.ts_event)
271 }
272}
273
274#[derive(Clone, Copy, Debug)]
283pub struct TestTimer {
284 pub name: Ustr,
286 pub interval_ns: NonZeroU64,
288 pub start_time_ns: UnixNanos,
290 pub stop_time_ns: Option<UnixNanos>,
292 pub fire_immediately: bool,
294 next_time_ns: UnixNanos,
295 is_expired: bool,
296}
297
298impl TestTimer {
299 #[must_use]
305 pub fn new(
306 name: Ustr,
307 interval_ns: NonZeroU64,
308 start_time_ns: UnixNanos,
309 stop_time_ns: Option<UnixNanos>,
310 fire_immediately: bool,
311 ) -> Self {
312 check_valid_string_ascii(name, stringify!(name)).expect(FAILED);
313
314 let next_time_ns = if fire_immediately {
315 start_time_ns
316 } else {
317 start_time_ns + interval_ns.get()
318 };
319
320 Self {
321 name,
322 interval_ns,
323 start_time_ns,
324 stop_time_ns,
325 fire_immediately,
326 next_time_ns,
327 is_expired: false,
328 }
329 }
330
331 #[must_use]
333 pub const fn next_time_ns(&self) -> UnixNanos {
334 self.next_time_ns
335 }
336
337 #[must_use]
339 pub const fn is_expired(&self) -> bool {
340 self.is_expired
341 }
342
343 #[must_use]
344 pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
345 TimeEvent {
346 name: self.name,
347 event_id,
348 ts_event: self.next_time_ns,
349 ts_init,
350 }
351 }
352
353 pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
359 let advances = if self.next_time_ns <= to_time_ns {
361 (to_time_ns.as_u64() - self.next_time_ns.as_u64()) / self.interval_ns.get() + 1
362 } else {
363 0
364 };
365 self.take(advances as usize).map(|(event, _)| event)
366 }
367
368 pub const fn cancel(&mut self) {
372 self.is_expired = true;
373 }
374}
375
376impl Iterator for TestTimer {
377 type Item = (TimeEvent, UnixNanos);
378
379 fn next(&mut self) -> Option<Self::Item> {
380 if self.is_expired {
381 None
382 } else {
383 if let Some(stop_time_ns) = self.stop_time_ns
385 && self.next_time_ns > stop_time_ns
386 {
387 self.is_expired = true;
388 return None;
389 }
390
391 let item = (
392 TimeEvent {
393 name: self.name,
394 event_id: UUID4::new(),
395 ts_event: self.next_time_ns,
396 ts_init: self.next_time_ns,
397 },
398 self.next_time_ns,
399 );
400
401 if let Some(stop_time_ns) = self.stop_time_ns
403 && self.next_time_ns == stop_time_ns
404 {
405 self.is_expired = true;
406 }
407
408 self.next_time_ns += self.interval_ns;
409
410 Some(item)
411 }
412 }
413}
414
415#[derive(Debug)]
424pub struct LiveTimer {
425 pub name: Ustr,
427 pub interval_ns: NonZeroU64,
429 pub start_time_ns: UnixNanos,
431 pub stop_time_ns: Option<UnixNanos>,
433 pub fire_immediately: bool,
435 next_time_ns: Arc<AtomicU64>,
436 callback: TimeEventCallback,
437 task_handle: Option<JoinHandle<()>>,
438 sender: Option<Arc<dyn TimeEventSender>>,
439}
440
441impl LiveTimer {
442 #[allow(clippy::too_many_arguments)]
448 #[must_use]
449 pub fn new(
450 name: Ustr,
451 interval_ns: NonZeroU64,
452 start_time_ns: UnixNanos,
453 stop_time_ns: Option<UnixNanos>,
454 callback: TimeEventCallback,
455 fire_immediately: bool,
456 sender: Option<Arc<dyn TimeEventSender>>,
457 ) -> Self {
458 check_valid_string_ascii(name, stringify!(name)).expect(FAILED);
459
460 let next_time_ns = if fire_immediately {
461 start_time_ns.as_u64()
462 } else {
463 start_time_ns.as_u64() + interval_ns.get()
464 };
465
466 log::debug!("Creating timer '{name}'");
467
468 Self {
469 name,
470 interval_ns,
471 start_time_ns,
472 stop_time_ns,
473 fire_immediately,
474 next_time_ns: Arc::new(AtomicU64::new(next_time_ns)),
475 callback,
476 task_handle: None,
477 sender,
478 }
479 }
480
481 #[must_use]
485 pub fn next_time_ns(&self) -> UnixNanos {
486 UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
487 }
488
489 #[must_use]
494 pub fn is_expired(&self) -> bool {
495 self.task_handle
496 .as_ref()
497 .is_some_and(tokio::task::JoinHandle::is_finished)
498 }
499
500 #[allow(unused_variables, reason = "callback is used")]
509 pub fn start(&mut self) {
510 let event_name = self.name;
511 let stop_time_ns = self.stop_time_ns;
512 let interval_ns = self.interval_ns.get();
513 let callback = self.callback.clone();
514
515 let clock = get_atomic_clock_realtime();
517 let now_ns = clock.get_time_ns();
518
519 let now_raw = now_ns.as_u64();
521 let mut observed_next = self.next_time_ns.load(atomic::Ordering::SeqCst);
522
523 if observed_next <= now_raw {
524 loop {
525 match self.next_time_ns.compare_exchange(
526 observed_next,
527 now_raw,
528 atomic::Ordering::SeqCst,
529 atomic::Ordering::SeqCst,
530 ) {
531 Ok(_) => {
532 if observed_next < now_raw {
533 let original = UnixNanos::from(observed_next);
534 log::warn!(
535 "Timer '{event_name}' alert time {} was in the past, adjusted to current time for immediate fire",
536 original.to_rfc3339(),
537 );
538 }
539 observed_next = now_raw;
540 break;
541 }
542 Err(actual) => {
543 observed_next = actual;
544 if observed_next > now_raw {
545 break;
546 }
547 }
548 }
549 }
550 }
551
552 let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(observed_next));
554 let next_time_atomic = self.next_time_ns.clone();
555 next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
556
557 let sender = self.sender.clone();
558
559 let rt = get_runtime();
560 let handle = rt.spawn(async move {
561 let clock = get_atomic_clock_realtime();
562
563 let overhead = Duration::from_millis(1);
565 let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
566 let mut delay = Duration::from_nanos(delay_ns);
567
568 if delay > overhead {
570 delay -= overhead;
571 } else {
572 delay = Duration::from_nanos(0);
573 }
574
575 let start = Instant::now() + delay;
576
577 let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
578
579 loop {
580 timer.tick().await;
583 let now_ns = clock.get_time_ns();
584
585 let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
586
587 match callback {
588 #[cfg(feature = "python")]
589 TimeEventCallback::Python(ref callback) => {
590 call_python_with_time_event(event, callback);
591 }
592 TimeEventCallback::Rust(_) => {
593 debug_assert!(
594 sender.is_some(),
595 "LiveTimer with Rust callback requires TimeEventSender"
596 );
597 let sender = sender
598 .as_ref()
599 .expect("timer event sender was unset for Rust callback system");
600 let handler = TimeEventHandlerV2::new(event, callback.clone());
601 sender.send(handler);
602 }
603 }
604
605 next_time_ns += interval_ns;
607 next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
608
609 if let Some(stop_time_ns) = stop_time_ns
611 && std::cmp::max(next_time_ns, now_ns) >= stop_time_ns
612 {
613 break; }
615 }
616 });
617
618 self.task_handle = Some(handle);
619 }
620
621 pub fn cancel(&mut self) {
625 log::debug!("Cancel timer '{}'", self.name);
626 if let Some(ref handle) = self.task_handle {
627 handle.abort();
628 }
629 }
630}
631
632#[cfg(feature = "python")]
633fn call_python_with_time_event(event: TimeEvent, callback: &Py<PyAny>) {
634 use nautilus_core::python::IntoPyObjectNautilusExt;
635 use pyo3::types::PyCapsule;
636
637 Python::attach(|py| {
638 let capsule: Py<PyAny> = PyCapsule::new_with_destructor(py, event, None, |_, _| {})
646 .expect("Error creating `PyCapsule`")
647 .into_py_any_unwrap(py);
648
649 match callback.call1(py, (capsule,)) {
650 Ok(_) => {}
651 Err(e) => tracing::error!("Error on callback: {e:?}"),
652 }
653 });
654}
655
656#[cfg(test)]
660mod tests {
661 use std::{num::NonZeroU64, sync::Arc};
662
663 use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime};
664 use rstest::*;
665 use ustr::Ustr;
666
667 use super::{LiveTimer, TestTimer, TimeEvent, TimeEventCallback, TimeEventHandlerV2};
668 use crate::runner::TimeEventSender;
669
670 #[rstest]
671 fn test_test_timer_pop_event() {
672 let mut timer = TestTimer::new(
673 Ustr::from("TEST_TIMER"),
674 NonZeroU64::new(1).unwrap(),
675 UnixNanos::from(1),
676 None,
677 false,
678 );
679
680 assert!(timer.next().is_some());
681 assert!(timer.next().is_some());
682 timer.is_expired = true;
683 assert!(timer.next().is_none());
684 }
685
686 #[rstest]
687 fn test_test_timer_advance_within_next_time_ns() {
688 let mut timer = TestTimer::new(
689 Ustr::from("TEST_TIMER"),
690 NonZeroU64::new(5).unwrap(),
691 UnixNanos::default(),
692 None,
693 false,
694 );
695 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
696 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
697 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
698 assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
699 assert_eq!(timer.next_time_ns, 5);
700 assert!(!timer.is_expired);
701 }
702
703 #[rstest]
704 fn test_test_timer_advance_up_to_next_time_ns() {
705 let mut timer = TestTimer::new(
706 Ustr::from("TEST_TIMER"),
707 NonZeroU64::new(1).unwrap(),
708 UnixNanos::default(),
709 None,
710 false,
711 );
712 assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
713 assert!(!timer.is_expired);
714 }
715
716 #[rstest]
717 fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
718 let mut timer = TestTimer::new(
719 Ustr::from("TEST_TIMER"),
720 NonZeroU64::new(1).unwrap(),
721 UnixNanos::default(),
722 Some(UnixNanos::from(2)),
723 false,
724 );
725 assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
726 assert!(timer.is_expired);
727 }
728
729 #[rstest]
730 fn test_test_timer_advance_beyond_next_time_ns() {
731 let mut timer = TestTimer::new(
732 Ustr::from("TEST_TIMER"),
733 NonZeroU64::new(1).unwrap(),
734 UnixNanos::default(),
735 Some(UnixNanos::from(5)),
736 false,
737 );
738 assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
739 assert!(timer.is_expired);
740 }
741
742 #[rstest]
743 fn test_test_timer_advance_beyond_stop_time() {
744 let mut timer = TestTimer::new(
745 Ustr::from("TEST_TIMER"),
746 NonZeroU64::new(1).unwrap(),
747 UnixNanos::default(),
748 Some(UnixNanos::from(5)),
749 false,
750 );
751 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
752 assert!(timer.is_expired);
753 }
754
755 #[rstest]
756 fn test_test_timer_advance_exact_boundary() {
757 let mut timer = TestTimer::new(
758 Ustr::from("TEST_TIMER"),
759 NonZeroU64::new(5).unwrap(),
760 UnixNanos::from(0),
761 None,
762 false,
763 );
764 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(5)).collect();
765 assert_eq!(events.len(), 1, "Expected one event at the 5 ns boundary");
766
767 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
768 assert_eq!(events.len(), 1, "Expected one event at the 10 ns boundary");
769 }
770
771 #[rstest]
772 fn test_test_timer_fire_immediately_true() {
773 let mut timer = TestTimer::new(
774 Ustr::from("TEST_TIMER"),
775 NonZeroU64::new(5).unwrap(),
776 UnixNanos::from(10),
777 None,
778 true, );
780
781 assert_eq!(timer.next_time_ns(), UnixNanos::from(10));
783
784 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
786 assert_eq!(events.len(), 1);
787 assert_eq!(events[0].ts_event, UnixNanos::from(10));
788
789 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
791 }
792
793 #[rstest]
794 fn test_test_timer_fire_immediately_false() {
795 let mut timer = TestTimer::new(
796 Ustr::from("TEST_TIMER"),
797 NonZeroU64::new(5).unwrap(),
798 UnixNanos::from(10),
799 None,
800 false, );
802
803 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
805
806 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
808 assert_eq!(events.len(), 0);
809
810 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(15)).collect();
812 assert_eq!(events.len(), 1);
813 assert_eq!(events[0].ts_event, UnixNanos::from(15));
814 }
815
816 #[rstest]
817 fn test_live_timer_fire_immediately_field() {
818 let timer = LiveTimer::new(
819 Ustr::from("TEST_TIMER"),
820 NonZeroU64::new(1000).unwrap(),
821 UnixNanos::from(100),
822 None,
823 TimeEventCallback::from(|_| {}),
824 true, None, );
827
828 assert!(timer.fire_immediately);
830
831 assert_eq!(timer.next_time_ns(), UnixNanos::from(100));
833 }
834
835 #[rstest]
836 fn test_live_timer_fire_immediately_false_field() {
837 let timer = LiveTimer::new(
838 Ustr::from("TEST_TIMER"),
839 NonZeroU64::new(1000).unwrap(),
840 UnixNanos::from(100),
841 None,
842 TimeEventCallback::from(|_| {}),
843 false, None, );
846
847 assert!(!timer.fire_immediately);
849
850 assert_eq!(timer.next_time_ns(), UnixNanos::from(1100));
852 }
853
854 #[rstest]
855 fn test_live_timer_adjusts_past_due_start_time() {
856 #[derive(Debug)]
857 struct NoopSender;
858
859 impl TimeEventSender for NoopSender {
860 fn send(&self, _handler: TimeEventHandlerV2) {}
861 }
862
863 let sender = Arc::new(NoopSender);
864 let mut timer = LiveTimer::new(
865 Ustr::from("PAST_TIMER"),
866 NonZeroU64::new(1).unwrap(),
867 UnixNanos::from(0),
868 None,
869 TimeEventCallback::from(|_| {}),
870 true,
871 Some(sender),
872 );
873
874 let before = get_atomic_clock_realtime().get_time_ns();
875
876 timer.start();
877
878 assert!(timer.next_time_ns() >= before);
879
880 timer.cancel();
881 }
882
883 use proptest::prelude::*;
888
889 #[derive(Clone, Debug)]
890 enum TimerOperation {
891 AdvanceTime(u64),
892 Cancel,
893 }
894
895 fn timer_operation_strategy() -> impl Strategy<Value = TimerOperation> {
896 prop_oneof![
897 8 => prop::num::u64::ANY.prop_map(|v| TimerOperation::AdvanceTime(v % 1000 + 1)),
898 2 => Just(TimerOperation::Cancel),
899 ]
900 }
901
902 fn timer_config_strategy() -> impl Strategy<Value = (u64, u64, Option<u64>, bool)> {
903 (
904 1u64..=100u64, 0u64..=50u64, prop::option::of(51u64..=200u64), prop::bool::ANY, )
909 }
910
911 fn timer_test_strategy()
912 -> impl Strategy<Value = (Vec<TimerOperation>, (u64, u64, Option<u64>, bool))> {
913 (
914 prop::collection::vec(timer_operation_strategy(), 5..=50),
915 timer_config_strategy(),
916 )
917 }
918
919 fn test_timer_with_operations(
920 operations: Vec<TimerOperation>,
921 (interval_ns, start_time_ns, stop_time_ns, fire_immediately): (u64, u64, Option<u64>, bool),
922 ) {
923 let mut timer = TestTimer::new(
924 Ustr::from("PROP_TEST_TIMER"),
925 NonZeroU64::new(interval_ns).unwrap(),
926 UnixNanos::from(start_time_ns),
927 stop_time_ns.map(UnixNanos::from),
928 fire_immediately,
929 );
930
931 let mut current_time = start_time_ns;
932
933 for operation in operations {
934 if timer.is_expired() {
935 break;
936 }
937
938 match operation {
939 TimerOperation::AdvanceTime(delta) => {
940 let to_time = current_time + delta;
941 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(to_time)).collect();
942 current_time = to_time;
943
944 for (i, event) in events.iter().enumerate() {
946 if i > 0 {
948 assert!(
949 event.ts_event >= events[i - 1].ts_event,
950 "Events should be in chronological order"
951 );
952 }
953
954 assert!(
956 event.ts_event.as_u64() >= start_time_ns,
957 "Event timestamp should not be before start time"
958 );
959
960 assert!(
961 event.ts_event.as_u64() <= to_time,
962 "Event timestamp should not be after advance time"
963 );
964
965 if let Some(stop_time_ns) = stop_time_ns {
967 assert!(
968 event.ts_event.as_u64() <= stop_time_ns,
969 "Event timestamp should not exceed stop time"
970 );
971 }
972 }
973 }
974 TimerOperation::Cancel => {
975 timer.cancel();
976 assert!(timer.is_expired(), "Timer should be expired after cancel");
977 }
978 }
979
980 if !timer.is_expired() {
982 let expected_interval_multiple = if fire_immediately {
984 timer.next_time_ns().as_u64() >= start_time_ns
985 } else {
986 timer.next_time_ns().as_u64() >= start_time_ns + interval_ns
987 };
988 assert!(
989 expected_interval_multiple,
990 "Next time should respect interval spacing"
991 );
992
993 if let Some(stop_time_ns) = stop_time_ns
996 && timer.next_time_ns().as_u64() > stop_time_ns
997 {
998 let mut test_timer = timer;
1000 let events: Vec<TimeEvent> = test_timer
1001 .advance(UnixNanos::from(stop_time_ns + 1))
1002 .collect();
1003 assert!(
1004 events.is_empty() || test_timer.is_expired(),
1005 "Timer should not generate events beyond stop time"
1006 );
1007 }
1008 }
1009 }
1010
1011 if !timer.is_expired()
1014 && let Some(stop_time_ns) = stop_time_ns
1015 {
1016 let events: Vec<TimeEvent> = timer
1017 .advance(UnixNanos::from(stop_time_ns + 1000))
1018 .collect();
1019 assert!(
1020 timer.is_expired() || events.is_empty(),
1021 "Timer should eventually expire or stop generating events"
1022 );
1023 }
1024 }
1025
1026 proptest! {
1027 #[rstest]
1028 fn prop_timer_advance_operations((operations, config) in timer_test_strategy()) {
1029 test_timer_with_operations(operations, config);
1030 }
1031
1032 #[rstest]
1033 fn prop_timer_interval_consistency(
1034 interval_ns in 1u64..=100u64,
1035 start_time_ns in 0u64..=50u64,
1036 fire_immediately in prop::bool::ANY,
1037 advance_count in 1usize..=20usize,
1038 ) {
1039 let mut timer = TestTimer::new(
1040 Ustr::from("CONSISTENCY_TEST"),
1041 NonZeroU64::new(interval_ns).unwrap(),
1042 UnixNanos::from(start_time_ns),
1043 None, fire_immediately,
1045 );
1046
1047 let mut previous_event_time = if fire_immediately { start_time_ns } else { start_time_ns + interval_ns };
1048
1049 for _ in 0..advance_count {
1050 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(previous_event_time)).collect();
1051
1052 if !events.is_empty() {
1053 prop_assert_eq!(events.len(), 1);
1055 prop_assert_eq!(events[0].ts_event.as_u64(), previous_event_time);
1056 }
1057
1058 previous_event_time += interval_ns;
1059 }
1060 }
1061 }
1062}