nautilus_common/
timer.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Real-time and test timers for use with `Clock` implementations.
17
18use std::{
19    cmp::Ordering,
20    fmt::{Debug, Display},
21    num::NonZeroU64,
22    rc::Rc,
23    sync::{
24        Arc,
25        atomic::{self, AtomicU64},
26    },
27};
28
29use nautilus_core::{
30    UUID4, UnixNanos,
31    correctness::{FAILED, check_valid_string_ascii},
32    datetime::floor_to_nearest_microsecond,
33    time::get_atomic_clock_realtime,
34};
35#[cfg(feature = "python")]
36use pyo3::{Py, PyAny, Python};
37use tokio::{
38    task::JoinHandle,
39    time::{Duration, Instant},
40};
41use ustr::Ustr;
42
43use crate::{runner::TimeEventSender, runtime::get_runtime};
44
45/// Creates a valid nanoseconds interval that is guaranteed to be positive.
46///
47/// # Panics
48///
49/// Panics if `interval_ns` is zero.
50#[must_use]
51pub fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
52    NonZeroU64::new(std::cmp::max(interval_ns, 1)).expect("`interval_ns` must be positive")
53}
54
55#[repr(C)]
56#[derive(Clone, Debug, PartialEq, Eq)]
57#[cfg_attr(
58    feature = "python",
59    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
60)]
61/// Represents a time event occurring at the event timestamp.
62///
63/// A `TimeEvent` carries metadata such as the event's name, a unique event ID,
64/// and timestamps indicating when the event was scheduled to occur and when it was initialized.
65pub struct TimeEvent {
66    /// The event name, identifying the nature or purpose of the event.
67    pub name: Ustr,
68    /// The unique identifier for the event.
69    pub event_id: UUID4,
70    /// UNIX timestamp (nanoseconds) when the event occurred.
71    pub ts_event: UnixNanos,
72    /// UNIX timestamp (nanoseconds) when the instance was created.
73    pub ts_init: UnixNanos,
74}
75
76impl TimeEvent {
77    /// Creates a new [`TimeEvent`] instance.
78    ///
79    /// # Safety
80    ///
81    /// Assumes `name` is a valid string.
82    #[must_use]
83    pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
84        Self {
85            name,
86            event_id,
87            ts_event,
88            ts_init,
89        }
90    }
91}
92
93impl Display for TimeEvent {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        write!(
96            f,
97            "{}(name={}, event_id={}, ts_event={}, ts_init={})",
98            stringify!(TimeEvent),
99            self.name,
100            self.event_id,
101            self.ts_event,
102            self.ts_init
103        )
104    }
105}
106
107/// Wrapper for [`TimeEvent`] that implements ordering by timestamp for heap scheduling.
108///
109/// This newtype allows time events to be ordered in a priority queue (max heap) by their
110/// timestamp while keeping [`TimeEvent`] itself clean with standard field-based equality.
111/// Events are ordered in reverse (earlier timestamps have higher priority).
112#[repr(transparent)] // Guarantees zero-cost abstraction with identical memory layout
113#[derive(Clone, Debug, PartialEq, Eq)]
114pub struct ScheduledTimeEvent(pub TimeEvent);
115
116impl ScheduledTimeEvent {
117    /// Creates a new scheduled time event.
118    #[must_use]
119    pub const fn new(event: TimeEvent) -> Self {
120        Self(event)
121    }
122
123    /// Extracts the inner time event.
124    #[must_use]
125    pub fn into_inner(self) -> TimeEvent {
126        self.0
127    }
128}
129
130impl PartialOrd for ScheduledTimeEvent {
131    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
132        Some(self.cmp(other))
133    }
134}
135
136impl Ord for ScheduledTimeEvent {
137    fn cmp(&self, other: &Self) -> Ordering {
138        // Reverse order for max heap: earlier timestamps have higher priority
139        other.0.ts_event.cmp(&self.0.ts_event)
140    }
141}
142
143/// Callback type for time events.
144pub enum TimeEventCallback {
145    #[cfg(feature = "python")]
146    Python(Py<PyAny>),
147    Rust(Rc<dyn Fn(TimeEvent)>),
148}
149
150impl Clone for TimeEventCallback {
151    fn clone(&self) -> Self {
152        match self {
153            #[cfg(feature = "python")]
154            Self::Python(obj) => Self::Python(nautilus_core::python::clone_py_object(obj)),
155            Self::Rust(cb) => Self::Rust(cb.clone()),
156        }
157    }
158}
159
160impl Debug for TimeEventCallback {
161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162        match self {
163            #[cfg(feature = "python")]
164            Self::Python(_) => f.write_str("Python callback"),
165            Self::Rust(_) => f.write_str("Rust callback"),
166        }
167    }
168}
169
170impl TimeEventCallback {
171    /// Invokes the callback for the given `TimeEvent`.
172    ///
173    /// # Panics
174    ///
175    /// Panics if the underlying Python callback invocation fails (e.g., raises an exception).
176    pub fn call(&self, event: TimeEvent) {
177        match self {
178            #[cfg(feature = "python")]
179            Self::Python(callback) => {
180                Python::attach(|py| {
181                    callback.call1(py, (event,)).unwrap();
182                });
183            }
184            Self::Rust(callback) => callback(event),
185        }
186    }
187}
188
189impl<F> From<F> for TimeEventCallback
190where
191    F: Fn(TimeEvent) + 'static,
192{
193    fn from(value: F) -> Self {
194        Self::Rust(Rc::new(value))
195    }
196}
197
198impl From<Rc<dyn Fn(TimeEvent)>> for TimeEventCallback {
199    fn from(value: Rc<dyn Fn(TimeEvent)>) -> Self {
200        Self::Rust(value)
201    }
202}
203
204#[cfg(feature = "python")]
205impl From<Py<PyAny>> for TimeEventCallback {
206    fn from(value: Py<PyAny>) -> Self {
207        Self::Python(value)
208    }
209}
210
211// TimeEventCallback supports both single-threaded and async use cases:
212// - Python variant uses Py<PyAny> for cross-thread compatibility with Python's GIL.
213// - Rust variant uses Rc<dyn Fn(TimeEvent)> for efficient single-threaded callbacks.
214//
215// SAFETY: The async timer tasks only use Python callbacks, and Rust callbacks are never
216// sent across thread boundaries in practice. This unsafe implementation allows the enum
217// to be moved into async tasks while maintaining the efficient Rc for single-threaded use.
218#[allow(unsafe_code)]
219unsafe impl Send for TimeEventCallback {}
220#[allow(unsafe_code)]
221unsafe impl Sync for TimeEventCallback {}
222
223#[repr(C)]
224#[derive(Clone, Debug)]
225/// Represents a time event and its associated handler.
226///
227/// `TimeEventHandler` associates a `TimeEvent` with a callback function that is triggered
228/// when the event's timestamp is reached.
229pub struct TimeEventHandlerV2 {
230    /// The time event.
231    pub event: TimeEvent,
232    /// The callable handler for the event.
233    pub callback: TimeEventCallback,
234}
235
236impl TimeEventHandlerV2 {
237    /// Creates a new [`TimeEventHandlerV2`] instance.
238    #[must_use]
239    pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
240        Self { event, callback }
241    }
242
243    /// Executes the handler by invoking its callback for the associated event.
244    ///
245    /// # Panics
246    ///
247    /// Panics if the underlying callback invocation fails (e.g., a Python callback raises an exception).
248    pub fn run(self) {
249        let Self { event, callback } = self;
250        callback.call(event);
251    }
252}
253
254impl PartialOrd for TimeEventHandlerV2 {
255    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
256        Some(self.cmp(other))
257    }
258}
259
260impl PartialEq for TimeEventHandlerV2 {
261    fn eq(&self, other: &Self) -> bool {
262        self.event.ts_event == other.event.ts_event
263    }
264}
265
266impl Eq for TimeEventHandlerV2 {}
267
268impl Ord for TimeEventHandlerV2 {
269    fn cmp(&self, other: &Self) -> Ordering {
270        self.event.ts_event.cmp(&other.event.ts_event)
271    }
272}
273
274/// A test timer for user with a `TestClock`.
275///
276/// `TestTimer` simulates time progression in a controlled environment,
277/// allowing for precise control over event generation in test scenarios.
278///
279/// # Threading
280///
281/// The timer mutates its internal state and should only be used from its owning thread.
282#[derive(Clone, Copy, Debug)]
283pub struct TestTimer {
284    /// The name of the timer.
285    pub name: Ustr,
286    /// The interval between timer events in nanoseconds.
287    pub interval_ns: NonZeroU64,
288    /// The start time of the timer in UNIX nanoseconds.
289    pub start_time_ns: UnixNanos,
290    /// The optional stop time of the timer in UNIX nanoseconds.
291    pub stop_time_ns: Option<UnixNanos>,
292    /// If the timer should fire immediately at start time.
293    pub fire_immediately: bool,
294    next_time_ns: UnixNanos,
295    is_expired: bool,
296}
297
298impl TestTimer {
299    /// Creates a new [`TestTimer`] instance.
300    ///
301    /// # Panics
302    ///
303    /// Panics if `name` is not a valid string.
304    #[must_use]
305    pub fn new(
306        name: Ustr,
307        interval_ns: NonZeroU64,
308        start_time_ns: UnixNanos,
309        stop_time_ns: Option<UnixNanos>,
310        fire_immediately: bool,
311    ) -> Self {
312        check_valid_string_ascii(name, stringify!(name)).expect(FAILED);
313
314        let next_time_ns = if fire_immediately {
315            start_time_ns
316        } else {
317            start_time_ns + interval_ns.get()
318        };
319
320        Self {
321            name,
322            interval_ns,
323            start_time_ns,
324            stop_time_ns,
325            fire_immediately,
326            next_time_ns,
327            is_expired: false,
328        }
329    }
330
331    /// Returns the next time in UNIX nanoseconds when the timer will fire.
332    #[must_use]
333    pub const fn next_time_ns(&self) -> UnixNanos {
334        self.next_time_ns
335    }
336
337    /// Returns whether the timer is expired.
338    #[must_use]
339    pub const fn is_expired(&self) -> bool {
340        self.is_expired
341    }
342
343    #[must_use]
344    pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
345        TimeEvent {
346            name: self.name,
347            event_id,
348            ts_event: self.next_time_ns,
349            ts_init,
350        }
351    }
352
353    /// Advance the test timer forward to the given time, generating a sequence
354    /// of events. A [`TimeEvent`] is appended for each time a next event is
355    /// <= the given `to_time_ns`.
356    ///
357    /// This allows testing of multiple time intervals within a single step.
358    pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
359        // Calculate how many events should fire up to and including to_time_ns
360        let advances = if self.next_time_ns <= to_time_ns {
361            (to_time_ns.as_u64() - self.next_time_ns.as_u64()) / self.interval_ns.get() + 1
362        } else {
363            0
364        };
365        self.take(advances as usize).map(|(event, _)| event)
366    }
367
368    /// Cancels the timer (the timer will not generate an event).
369    ///
370    /// Used to stop the timer before its scheduled stop time.
371    pub const fn cancel(&mut self) {
372        self.is_expired = true;
373    }
374}
375
376impl Iterator for TestTimer {
377    type Item = (TimeEvent, UnixNanos);
378
379    fn next(&mut self) -> Option<Self::Item> {
380        if self.is_expired {
381            None
382        } else {
383            // Check if current event would exceed stop time before creating the event
384            if let Some(stop_time_ns) = self.stop_time_ns
385                && self.next_time_ns > stop_time_ns
386            {
387                self.is_expired = true;
388                return None;
389            }
390
391            let item = (
392                TimeEvent {
393                    name: self.name,
394                    event_id: UUID4::new(),
395                    ts_event: self.next_time_ns,
396                    ts_init: self.next_time_ns,
397                },
398                self.next_time_ns,
399            );
400
401            // Check if we should expire after this event (for repeating timers at stop boundary)
402            if let Some(stop_time_ns) = self.stop_time_ns
403                && self.next_time_ns == stop_time_ns
404            {
405                self.is_expired = true;
406            }
407
408            self.next_time_ns += self.interval_ns;
409
410            Some(item)
411        }
412    }
413}
414
415/// A live timer for use with a `LiveClock`.
416///
417/// `LiveTimer` triggers events at specified intervals in a real-time environment,
418/// using Tokio's async runtime to handle scheduling and execution.
419///
420/// # Threading
421///
422/// The timer runs on the runtime thread that created it and dispatches events across threads as needed.
423#[derive(Debug)]
424pub struct LiveTimer {
425    /// The name of the timer.
426    pub name: Ustr,
427    /// The start time of the timer in UNIX nanoseconds.
428    pub interval_ns: NonZeroU64,
429    /// The start time of the timer in UNIX nanoseconds.
430    pub start_time_ns: UnixNanos,
431    /// The optional stop time of the timer in UNIX nanoseconds.
432    pub stop_time_ns: Option<UnixNanos>,
433    /// If the timer should fire immediately at start time.
434    pub fire_immediately: bool,
435    next_time_ns: Arc<AtomicU64>,
436    callback: TimeEventCallback,
437    task_handle: Option<JoinHandle<()>>,
438    sender: Option<Arc<dyn TimeEventSender>>,
439}
440
441impl LiveTimer {
442    /// Creates a new [`LiveTimer`] instance.
443    ///
444    /// # Panics
445    ///
446    /// Panics if `name` is not a valid string.
447    #[allow(clippy::too_many_arguments)]
448    #[must_use]
449    pub fn new(
450        name: Ustr,
451        interval_ns: NonZeroU64,
452        start_time_ns: UnixNanos,
453        stop_time_ns: Option<UnixNanos>,
454        callback: TimeEventCallback,
455        fire_immediately: bool,
456        sender: Option<Arc<dyn TimeEventSender>>,
457    ) -> Self {
458        check_valid_string_ascii(name, stringify!(name)).expect(FAILED);
459
460        let next_time_ns = if fire_immediately {
461            start_time_ns.as_u64()
462        } else {
463            start_time_ns.as_u64() + interval_ns.get()
464        };
465
466        log::debug!("Creating timer '{name}'");
467
468        Self {
469            name,
470            interval_ns,
471            start_time_ns,
472            stop_time_ns,
473            fire_immediately,
474            next_time_ns: Arc::new(AtomicU64::new(next_time_ns)),
475            callback,
476            task_handle: None,
477            sender,
478        }
479    }
480
481    /// Returns the next time in UNIX nanoseconds when the timer will fire.
482    ///
483    /// Provides the scheduled time for the next event based on the current state of the timer.
484    #[must_use]
485    pub fn next_time_ns(&self) -> UnixNanos {
486        UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
487    }
488
489    /// Returns whether the timer is expired.
490    ///
491    /// An expired timer will not trigger any further events.
492    /// A timer that has not been started is not expired.
493    #[must_use]
494    pub fn is_expired(&self) -> bool {
495        self.task_handle
496            .as_ref()
497            .is_some_and(tokio::task::JoinHandle::is_finished)
498    }
499
500    /// Starts the timer.
501    ///
502    /// Time events will begin triggering at the specified intervals.
503    /// The generated events are handled by the provided callback function.
504    ///
505    /// # Panics
506    ///
507    /// Panics if Rust-based callback system is active and no time event sender has been set.
508    #[allow(unused_variables, reason = "callback is used")]
509    pub fn start(&mut self) {
510        let event_name = self.name;
511        let stop_time_ns = self.stop_time_ns;
512        let interval_ns = self.interval_ns.get();
513        let callback = self.callback.clone();
514
515        // Get current time
516        let clock = get_atomic_clock_realtime();
517        let now_ns = clock.get_time_ns();
518
519        // Check if the timer's alert time is in the past and adjust if needed
520        let now_raw = now_ns.as_u64();
521        let mut observed_next = self.next_time_ns.load(atomic::Ordering::SeqCst);
522
523        if observed_next <= now_raw {
524            loop {
525                match self.next_time_ns.compare_exchange(
526                    observed_next,
527                    now_raw,
528                    atomic::Ordering::SeqCst,
529                    atomic::Ordering::SeqCst,
530                ) {
531                    Ok(_) => {
532                        if observed_next < now_raw {
533                            let original = UnixNanos::from(observed_next);
534                            log::warn!(
535                                "Timer '{event_name}' alert time {} was in the past, adjusted to current time for immediate fire",
536                                original.to_rfc3339(),
537                            );
538                        }
539                        observed_next = now_raw;
540                        break;
541                    }
542                    Err(actual) => {
543                        observed_next = actual;
544                        if observed_next > now_raw {
545                            break;
546                        }
547                    }
548                }
549            }
550        }
551
552        // Floor the next time to the nearest microsecond which is within the timers accuracy
553        let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(observed_next));
554        let next_time_atomic = self.next_time_ns.clone();
555        next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
556
557        let sender = self.sender.clone();
558
559        let rt = get_runtime();
560        let handle = rt.spawn(async move {
561            let clock = get_atomic_clock_realtime();
562
563            // 1-millisecond delay to account for the overhead of initializing a tokio timer
564            let overhead = Duration::from_millis(1);
565            let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
566            let mut delay = Duration::from_nanos(delay_ns);
567
568            // Subtract the estimated startup overhead; saturating to zero for sub-ms delays
569            if delay > overhead {
570                delay -= overhead;
571            } else {
572                delay = Duration::from_nanos(0);
573            }
574
575            let start = Instant::now() + delay;
576
577            let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
578
579            loop {
580                // SAFETY: `timer.tick` is cancellation safe, if the cancel branch completes
581                // first then no tick has been consumed (no event was ready).
582                timer.tick().await;
583                let now_ns = clock.get_time_ns();
584
585                let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
586
587                match callback {
588                    #[cfg(feature = "python")]
589                    TimeEventCallback::Python(ref callback) => {
590                        call_python_with_time_event(event, callback);
591                    }
592                    TimeEventCallback::Rust(_) => {
593                        debug_assert!(
594                            sender.is_some(),
595                            "LiveTimer with Rust callback requires TimeEventSender"
596                        );
597                        let sender = sender
598                            .as_ref()
599                            .expect("timer event sender was unset for Rust callback system");
600                        let handler = TimeEventHandlerV2::new(event, callback.clone());
601                        sender.send(handler);
602                    }
603                }
604
605                // Prepare next time interval
606                next_time_ns += interval_ns;
607                next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
608
609                // Check if expired
610                if let Some(stop_time_ns) = stop_time_ns
611                    && std::cmp::max(next_time_ns, now_ns) >= stop_time_ns
612                {
613                    break; // Timer expired
614                }
615            }
616        });
617
618        self.task_handle = Some(handle);
619    }
620
621    /// Cancels the timer.
622    ///
623    /// The timer will not generate a final event.
624    pub fn cancel(&mut self) {
625        log::debug!("Cancel timer '{}'", self.name);
626        if let Some(ref handle) = self.task_handle {
627            handle.abort();
628        }
629    }
630}
631
632#[cfg(feature = "python")]
633fn call_python_with_time_event(event: TimeEvent, callback: &Py<PyAny>) {
634    use nautilus_core::python::IntoPyObjectNautilusExt;
635    use pyo3::types::PyCapsule;
636
637    Python::attach(|py| {
638        // Create a new PyCapsule that owns `event` and registers a destructor so
639        // the contained `TimeEvent` is properly freed once the capsule is
640        // garbage-collected by Python. Without the destructor the memory would
641        // leak because the capsule would not know how to drop the Rust value.
642
643        // Register a destructor that simply drops the `TimeEvent` once the
644        // capsule is freed on the Python side.
645        let capsule: Py<PyAny> = PyCapsule::new_with_destructor(py, event, None, |_, _| {})
646            .expect("Error creating `PyCapsule`")
647            .into_py_any_unwrap(py);
648
649        match callback.call1(py, (capsule,)) {
650            Ok(_) => {}
651            Err(e) => tracing::error!("Error on callback: {e:?}"),
652        }
653    });
654}
655
656////////////////////////////////////////////////////////////////////////////////
657// Tests
658////////////////////////////////////////////////////////////////////////////////
659#[cfg(test)]
660mod tests {
661    use std::{num::NonZeroU64, sync::Arc};
662
663    use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime};
664    use rstest::*;
665    use ustr::Ustr;
666
667    use super::{LiveTimer, TestTimer, TimeEvent, TimeEventCallback, TimeEventHandlerV2};
668    use crate::runner::TimeEventSender;
669
670    #[rstest]
671    fn test_test_timer_pop_event() {
672        let mut timer = TestTimer::new(
673            Ustr::from("TEST_TIMER"),
674            NonZeroU64::new(1).unwrap(),
675            UnixNanos::from(1),
676            None,
677            false,
678        );
679
680        assert!(timer.next().is_some());
681        assert!(timer.next().is_some());
682        timer.is_expired = true;
683        assert!(timer.next().is_none());
684    }
685
686    #[rstest]
687    fn test_test_timer_advance_within_next_time_ns() {
688        let mut timer = TestTimer::new(
689            Ustr::from("TEST_TIMER"),
690            NonZeroU64::new(5).unwrap(),
691            UnixNanos::default(),
692            None,
693            false,
694        );
695        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
696        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
697        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
698        assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
699        assert_eq!(timer.next_time_ns, 5);
700        assert!(!timer.is_expired);
701    }
702
703    #[rstest]
704    fn test_test_timer_advance_up_to_next_time_ns() {
705        let mut timer = TestTimer::new(
706            Ustr::from("TEST_TIMER"),
707            NonZeroU64::new(1).unwrap(),
708            UnixNanos::default(),
709            None,
710            false,
711        );
712        assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
713        assert!(!timer.is_expired);
714    }
715
716    #[rstest]
717    fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
718        let mut timer = TestTimer::new(
719            Ustr::from("TEST_TIMER"),
720            NonZeroU64::new(1).unwrap(),
721            UnixNanos::default(),
722            Some(UnixNanos::from(2)),
723            false,
724        );
725        assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
726        assert!(timer.is_expired);
727    }
728
729    #[rstest]
730    fn test_test_timer_advance_beyond_next_time_ns() {
731        let mut timer = TestTimer::new(
732            Ustr::from("TEST_TIMER"),
733            NonZeroU64::new(1).unwrap(),
734            UnixNanos::default(),
735            Some(UnixNanos::from(5)),
736            false,
737        );
738        assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
739        assert!(timer.is_expired);
740    }
741
742    #[rstest]
743    fn test_test_timer_advance_beyond_stop_time() {
744        let mut timer = TestTimer::new(
745            Ustr::from("TEST_TIMER"),
746            NonZeroU64::new(1).unwrap(),
747            UnixNanos::default(),
748            Some(UnixNanos::from(5)),
749            false,
750        );
751        assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
752        assert!(timer.is_expired);
753    }
754
755    #[rstest]
756    fn test_test_timer_advance_exact_boundary() {
757        let mut timer = TestTimer::new(
758            Ustr::from("TEST_TIMER"),
759            NonZeroU64::new(5).unwrap(),
760            UnixNanos::from(0),
761            None,
762            false,
763        );
764        let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(5)).collect();
765        assert_eq!(events.len(), 1, "Expected one event at the 5 ns boundary");
766
767        let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
768        assert_eq!(events.len(), 1, "Expected one event at the 10 ns boundary");
769    }
770
771    #[rstest]
772    fn test_test_timer_fire_immediately_true() {
773        let mut timer = TestTimer::new(
774            Ustr::from("TEST_TIMER"),
775            NonZeroU64::new(5).unwrap(),
776            UnixNanos::from(10),
777            None,
778            true, // fire_immediately = true
779        );
780
781        // With fire_immediately=true, next_time_ns should be start_time_ns
782        assert_eq!(timer.next_time_ns(), UnixNanos::from(10));
783
784        // Advance to start time should produce an event
785        let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
786        assert_eq!(events.len(), 1);
787        assert_eq!(events[0].ts_event, UnixNanos::from(10));
788
789        // Next event should be at start_time + interval
790        assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
791    }
792
793    #[rstest]
794    fn test_test_timer_fire_immediately_false() {
795        let mut timer = TestTimer::new(
796            Ustr::from("TEST_TIMER"),
797            NonZeroU64::new(5).unwrap(),
798            UnixNanos::from(10),
799            None,
800            false, // fire_immediately = false
801        );
802
803        // With fire_immediately=false, next_time_ns should be start_time_ns + interval
804        assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
805
806        // Advance to start time should produce no events
807        let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
808        assert_eq!(events.len(), 0);
809
810        // Advance to first interval should produce an event
811        let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(15)).collect();
812        assert_eq!(events.len(), 1);
813        assert_eq!(events[0].ts_event, UnixNanos::from(15));
814    }
815
816    #[rstest]
817    fn test_live_timer_fire_immediately_field() {
818        let timer = LiveTimer::new(
819            Ustr::from("TEST_TIMER"),
820            NonZeroU64::new(1000).unwrap(),
821            UnixNanos::from(100),
822            None,
823            TimeEventCallback::from(|_| {}),
824            true, // fire_immediately = true
825            None, // time_event_sender
826        );
827
828        // Verify the field is set correctly
829        assert!(timer.fire_immediately);
830
831        // With fire_immediately=true, next_time_ns should be start_time_ns
832        assert_eq!(timer.next_time_ns(), UnixNanos::from(100));
833    }
834
835    #[rstest]
836    fn test_live_timer_fire_immediately_false_field() {
837        let timer = LiveTimer::new(
838            Ustr::from("TEST_TIMER"),
839            NonZeroU64::new(1000).unwrap(),
840            UnixNanos::from(100),
841            None,
842            TimeEventCallback::from(|_| {}),
843            false, // fire_immediately = false
844            None,  // time_event_sender
845        );
846
847        // Verify the field is set correctly
848        assert!(!timer.fire_immediately);
849
850        // With fire_immediately=false, next_time_ns should be start_time_ns + interval
851        assert_eq!(timer.next_time_ns(), UnixNanos::from(1100));
852    }
853
854    #[rstest]
855    fn test_live_timer_adjusts_past_due_start_time() {
856        #[derive(Debug)]
857        struct NoopSender;
858
859        impl TimeEventSender for NoopSender {
860            fn send(&self, _handler: TimeEventHandlerV2) {}
861        }
862
863        let sender = Arc::new(NoopSender);
864        let mut timer = LiveTimer::new(
865            Ustr::from("PAST_TIMER"),
866            NonZeroU64::new(1).unwrap(),
867            UnixNanos::from(0),
868            None,
869            TimeEventCallback::from(|_| {}),
870            true,
871            Some(sender),
872        );
873
874        let before = get_atomic_clock_realtime().get_time_ns();
875
876        timer.start();
877
878        assert!(timer.next_time_ns() >= before);
879
880        timer.cancel();
881    }
882
883    ////////////////////////////////////////////////////////////////////////////////
884    // Property-based testing
885    ////////////////////////////////////////////////////////////////////////////////
886
887    use proptest::prelude::*;
888
889    #[derive(Clone, Debug)]
890    enum TimerOperation {
891        AdvanceTime(u64),
892        Cancel,
893    }
894
895    fn timer_operation_strategy() -> impl Strategy<Value = TimerOperation> {
896        prop_oneof![
897            8 => prop::num::u64::ANY.prop_map(|v| TimerOperation::AdvanceTime(v % 1000 + 1)),
898            2 => Just(TimerOperation::Cancel),
899        ]
900    }
901
902    fn timer_config_strategy() -> impl Strategy<Value = (u64, u64, Option<u64>, bool)> {
903        (
904            1u64..=100u64,                    // interval_ns (1-100)
905            0u64..=50u64,                     // start_time_ns (0-50)
906            prop::option::of(51u64..=200u64), // stop_time_ns (51-200 or None)
907            prop::bool::ANY,                  // fire_immediately
908        )
909    }
910
911    fn timer_test_strategy()
912    -> impl Strategy<Value = (Vec<TimerOperation>, (u64, u64, Option<u64>, bool))> {
913        (
914            prop::collection::vec(timer_operation_strategy(), 5..=50),
915            timer_config_strategy(),
916        )
917    }
918
919    fn test_timer_with_operations(
920        operations: Vec<TimerOperation>,
921        (interval_ns, start_time_ns, stop_time_ns, fire_immediately): (u64, u64, Option<u64>, bool),
922    ) {
923        let mut timer = TestTimer::new(
924            Ustr::from("PROP_TEST_TIMER"),
925            NonZeroU64::new(interval_ns).unwrap(),
926            UnixNanos::from(start_time_ns),
927            stop_time_ns.map(UnixNanos::from),
928            fire_immediately,
929        );
930
931        let mut current_time = start_time_ns;
932
933        for operation in operations {
934            if timer.is_expired() {
935                break;
936            }
937
938            match operation {
939                TimerOperation::AdvanceTime(delta) => {
940                    let to_time = current_time + delta;
941                    let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(to_time)).collect();
942                    current_time = to_time;
943
944                    // Verify event ordering and timing
945                    for (i, event) in events.iter().enumerate() {
946                        // Event timestamps should be in order
947                        if i > 0 {
948                            assert!(
949                                event.ts_event >= events[i - 1].ts_event,
950                                "Events should be in chronological order"
951                            );
952                        }
953
954                        // Event timestamp should be within reasonable bounds
955                        assert!(
956                            event.ts_event.as_u64() >= start_time_ns,
957                            "Event timestamp should not be before start time"
958                        );
959
960                        assert!(
961                            event.ts_event.as_u64() <= to_time,
962                            "Event timestamp should not be after advance time"
963                        );
964
965                        // If there's a stop time, event should not exceed it
966                        if let Some(stop_time_ns) = stop_time_ns {
967                            assert!(
968                                event.ts_event.as_u64() <= stop_time_ns,
969                                "Event timestamp should not exceed stop time"
970                            );
971                        }
972                    }
973                }
974                TimerOperation::Cancel => {
975                    timer.cancel();
976                    assert!(timer.is_expired(), "Timer should be expired after cancel");
977                }
978            }
979
980            // Timer invariants
981            if !timer.is_expired() {
982                // Next time should be properly spaced
983                let expected_interval_multiple = if fire_immediately {
984                    timer.next_time_ns().as_u64() >= start_time_ns
985                } else {
986                    timer.next_time_ns().as_u64() >= start_time_ns + interval_ns
987                };
988                assert!(
989                    expected_interval_multiple,
990                    "Next time should respect interval spacing"
991                );
992
993                // If timer has stop time, check if it should be considered logically expired
994                // Note: Timer only becomes actually expired when advance() or next() is called
995                if let Some(stop_time_ns) = stop_time_ns
996                    && timer.next_time_ns().as_u64() > stop_time_ns
997                {
998                    // The timer should expire on the next advance/iteration
999                    let mut test_timer = timer;
1000                    let events: Vec<TimeEvent> = test_timer
1001                        .advance(UnixNanos::from(stop_time_ns + 1))
1002                        .collect();
1003                    assert!(
1004                        events.is_empty() || test_timer.is_expired(),
1005                        "Timer should not generate events beyond stop time"
1006                    );
1007                }
1008            }
1009        }
1010
1011        // Final consistency check: if timer is not expired and we haven't hit stop time,
1012        // advancing far enough should eventually expire it
1013        if !timer.is_expired()
1014            && let Some(stop_time_ns) = stop_time_ns
1015        {
1016            let events: Vec<TimeEvent> = timer
1017                .advance(UnixNanos::from(stop_time_ns + 1000))
1018                .collect();
1019            assert!(
1020                timer.is_expired() || events.is_empty(),
1021                "Timer should eventually expire or stop generating events"
1022            );
1023        }
1024    }
1025
1026    proptest! {
1027        #[rstest]
1028        fn prop_timer_advance_operations((operations, config) in timer_test_strategy()) {
1029            test_timer_with_operations(operations, config);
1030        }
1031
1032        #[rstest]
1033        fn prop_timer_interval_consistency(
1034            interval_ns in 1u64..=100u64,
1035            start_time_ns in 0u64..=50u64,
1036            fire_immediately in prop::bool::ANY,
1037            advance_count in 1usize..=20usize,
1038        ) {
1039            let mut timer = TestTimer::new(
1040                Ustr::from("CONSISTENCY_TEST"),
1041                NonZeroU64::new(interval_ns).unwrap(),
1042                UnixNanos::from(start_time_ns),
1043                None, // No stop time for this test
1044                fire_immediately,
1045            );
1046
1047            let mut previous_event_time = if fire_immediately { start_time_ns } else { start_time_ns + interval_ns };
1048
1049            for _ in 0..advance_count {
1050                let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(previous_event_time)).collect();
1051
1052                if !events.is_empty() {
1053                    // Should get exactly one event at the expected time
1054                    prop_assert_eq!(events.len(), 1);
1055                    prop_assert_eq!(events[0].ts_event.as_u64(), previous_event_time);
1056                }
1057
1058                previous_event_time += interval_ns;
1059            }
1060        }
1061    }
1062}