nautilus_common/
clock.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Real-time and static `Clock` implementations.
17
18use std::{
19    collections::{BTreeMap, BinaryHeap, HashMap},
20    fmt::Debug,
21    ops::Deref,
22    pin::Pin,
23    sync::Arc,
24    task::{Context, Poll},
25    time::Duration,
26};
27
28use chrono::{DateTime, Utc};
29use futures::Stream;
30use nautilus_core::{
31    AtomicTime, UnixNanos,
32    correctness::{check_positive_u64, check_predicate_true, check_valid_string_ascii},
33    time::get_atomic_clock_realtime,
34};
35use thousands::Separable;
36use tokio::sync::Mutex;
37use ustr::Ustr;
38
39use crate::{
40    runner::{TimeEventSender, get_time_event_sender},
41    timer::{
42        LiveTimer, ScheduledTimeEvent, TestTimer, TimeEvent, TimeEventCallback, TimeEventHandlerV2,
43        create_valid_interval,
44    },
45};
46
47/// Represents a type of clock.
48///
49/// # Notes
50///
51/// An active timer is one which has not expired (`timer.is_expired == False`).
52pub trait Clock: Debug {
53    /// Returns the current date and time as a timezone-aware `DateTime<UTC>`.
54    fn utc_now(&self) -> DateTime<Utc> {
55        DateTime::from_timestamp_nanos(self.timestamp_ns().as_i64())
56    }
57
58    /// Returns the current UNIX timestamp in nanoseconds (ns).
59    fn timestamp_ns(&self) -> UnixNanos;
60
61    /// Returns the current UNIX timestamp in microseconds (μs).
62    fn timestamp_us(&self) -> u64;
63
64    /// Returns the current UNIX timestamp in milliseconds (ms).
65    fn timestamp_ms(&self) -> u64;
66
67    /// Returns the current UNIX timestamp in seconds.
68    fn timestamp(&self) -> f64;
69
70    /// Returns the names of active timers in the clock.
71    fn timer_names(&self) -> Vec<&str>;
72
73    /// Returns the count of active timers in the clock.
74    fn timer_count(&self) -> usize;
75
76    /// If a timer with the `name` exists.
77    fn timer_exists(&self, name: &Ustr) -> bool;
78
79    /// Register a default event handler for the clock. If a timer
80    /// does not have an event handler, then this handler is used.
81    fn register_default_handler(&mut self, callback: TimeEventCallback);
82
83    /// Get handler for [`TimeEvent`].
84    ///
85    /// Note: Panics if the event does not have an associated handler
86    fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2;
87
88    /// Set a timer to alert at the specified time.
89    ///
90    /// See [`Clock::set_time_alert_ns`] for flag semantics.
91    ///
92    /// # Callback
93    ///
94    /// - `callback`: Some, then callback handles the time event.
95    /// - `callback`: None, then the clock’s default time event callback is used.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if `name` is invalid, `alert_time` is in the past when not allowed,
100    /// or any predicate check fails.
101    #[allow(clippy::too_many_arguments)]
102    fn set_time_alert(
103        &mut self,
104        name: &str,
105        alert_time: DateTime<Utc>,
106        callback: Option<TimeEventCallback>,
107        allow_past: Option<bool>,
108    ) -> anyhow::Result<()> {
109        self.set_time_alert_ns(name, alert_time.into(), callback, allow_past)
110    }
111
112    /// Set a timer to alert at the specified time.
113    ///
114    /// Any existing timer registered under the same `name` is cancelled with a warning before the new alert is scheduled.
115    ///
116    /// # Flags
117    ///
118    /// | `allow_past` | Behavior                                                                                |
119    /// |--------------|-----------------------------------------------------------------------------------------|
120    /// | `true`       | If alert time is **in the past**, the alert fires immediately; otherwise at alert time. |
121    /// | `false`      | Returns an error if alert time is earlier than now.                                     |
122    ///
123    /// # Callback
124    ///
125    /// - `callback`: Some, then callback handles the time event.
126    /// - `callback`: None, then the clock’s default time event callback is used.
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if `name` is invalid, `alert_time_ns` is earlier than now when not allowed,
131    /// or any predicate check fails.
132    #[allow(clippy::too_many_arguments)]
133    fn set_time_alert_ns(
134        &mut self,
135        name: &str,
136        alert_time_ns: UnixNanos,
137        callback: Option<TimeEventCallback>,
138        allow_past: Option<bool>,
139    ) -> anyhow::Result<()>;
140
141    /// Set a timer to fire time events at every interval between start and stop time.
142    ///
143    /// Any existing timer registered under the same `name` is cancelled with a warning before the new timer is scheduled.
144    ///
145    /// See [`Clock::set_timer_ns`] for flag semantics.
146    ///
147    /// # Callback
148    ///
149    /// - `callback`: Some, then callback handles the time event.
150    /// - `callback`: None, then the clock’s default time event callback is used.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if `name` is invalid, `interval` is not positive,
155    /// or if any predicate check fails.
156    #[allow(clippy::too_many_arguments)]
157    fn set_timer(
158        &mut self,
159        name: &str,
160        interval: Duration,
161        start_time: Option<DateTime<Utc>>,
162        stop_time: Option<DateTime<Utc>>,
163        callback: Option<TimeEventCallback>,
164        allow_past: Option<bool>,
165        fire_immediately: Option<bool>,
166    ) -> anyhow::Result<()> {
167        self.set_timer_ns(
168            name,
169            interval.as_nanos() as u64,
170            start_time.map(UnixNanos::from),
171            stop_time.map(UnixNanos::from),
172            callback,
173            allow_past,
174            fire_immediately,
175        )
176    }
177
178    /// Set a timer to fire time events at every interval between start and stop time.
179    ///
180    /// Any existing timer registered under the same `name` is cancelled before the new timer is scheduled.
181    ///
182    /// # Start Time
183    ///
184    /// - `None` or `Some(0)`: Uses the current time as start time.
185    /// - `Some(non_zero)`: Uses the specified timestamp as start time.
186    ///
187    /// # Flags
188    ///
189    /// | `allow_past` | `fire_immediately` | Behavior                                                                              |
190    /// |--------------|--------------------|---------------------------------------------------------------------------------------|
191    /// | `true`       | `true`             | First event fires immediately at start time, even if start time is in the past.       |
192    /// | `true`       | `false`            | First event fires at start time + interval, even if start time is in the past.        |
193    /// | `false`      | `true`             | Returns error if start time is in the past (first event would be immediate but past). |
194    /// | `false`      | `false`            | Returns error if start time + interval is in the past.                                |
195    ///
196    /// # Callback
197    ///
198    /// - `callback`: Some, then callback handles the time event.
199    /// - `callback`: None, then the clock's default time event callback is used.
200    ///
201    /// # Errors
202    ///
203    /// Returns an error if `name` is invalid, `interval_ns` is not positive,
204    /// or if any predicate check fails.
205    #[allow(clippy::too_many_arguments)]
206    fn set_timer_ns(
207        &mut self,
208        name: &str,
209        interval_ns: u64,
210        start_time_ns: Option<UnixNanos>,
211        stop_time_ns: Option<UnixNanos>,
212        callback: Option<TimeEventCallback>,
213        allow_past: Option<bool>,
214        fire_immediately: Option<bool>,
215    ) -> anyhow::Result<()>;
216
217    /// Returns the time interval in which the timer `name` is triggered.
218    ///
219    /// If the timer doesn't exist `None` is returned.
220    fn next_time_ns(&self, name: &str) -> Option<UnixNanos>;
221
222    /// Cancels the timer with `name`.
223    fn cancel_timer(&mut self, name: &str);
224
225    /// Cancels all timers.
226    fn cancel_timers(&mut self);
227
228    /// Resets the clock by clearing it's internal state.
229    fn reset(&mut self);
230}
231
232/// A static test clock.
233///
234/// Stores the current timestamp internally which can be advanced.
235///
236/// # Threading
237///
238/// This clock is thread-affine; use it only from the thread that created it.
239#[derive(Debug)]
240pub struct TestClock {
241    time: AtomicTime,
242    // Use btree map to ensure stable ordering when scanning for timers in `advance_time`
243    timers: BTreeMap<Ustr, TestTimer>,
244    default_callback: Option<TimeEventCallback>,
245    callbacks: HashMap<Ustr, TimeEventCallback>,
246    heap: BinaryHeap<ScheduledTimeEvent>, // TODO: Deprecated - move to global time event heap
247}
248
249impl TestClock {
250    /// Creates a new [`TestClock`] instance.
251    #[must_use]
252    pub fn new() -> Self {
253        Self {
254            time: AtomicTime::new(false, UnixNanos::default()),
255            timers: BTreeMap::new(),
256            default_callback: None,
257            callbacks: HashMap::new(),
258            heap: BinaryHeap::new(),
259        }
260    }
261
262    /// Returns a reference to the internal timers for the clock.
263    #[must_use]
264    pub const fn get_timers(&self) -> &BTreeMap<Ustr, TestTimer> {
265        &self.timers
266    }
267
268    /// Advances the internal clock to the specified `to_time_ns` and optionally sets the clock to that time.
269    ///
270    /// This function ensures that the clock behaves in a non-decreasing manner. If `set_time` is `true`,
271    /// the internal clock will be updated to the value of `to_time_ns`. Otherwise, the clock will advance
272    /// without explicitly setting the time.
273    ///
274    /// The method processes active timers, advancing them to `to_time_ns`, and collects any `TimeEvent`
275    /// objects that are triggered as a result. Only timers that are not expired are processed.
276    ///
277    /// # Warnings
278    ///
279    /// Logs a warning if >= 1,000,000 time events are allocated during advancement.
280    ///
281    /// # Panics
282    ///
283    /// Panics if `to_time_ns` is less than the current internal clock time.
284    pub fn advance_time(&mut self, to_time_ns: UnixNanos, set_time: bool) -> Vec<TimeEvent> {
285        const WARN_TIME_EVENTS_THRESHOLD: usize = 1_000_000;
286
287        let from_time_ns = self.time.get_time_ns();
288
289        // Time should be non-decreasing
290        assert!(
291            to_time_ns >= from_time_ns,
292            "`to_time_ns` {to_time_ns} was < `from_time_ns` {}",
293            from_time_ns
294        );
295
296        if set_time {
297            self.time.set_time(to_time_ns);
298        }
299
300        // Iterate and advance timers and collect events, only retain alive timers
301        let mut events: Vec<TimeEvent> = Vec::new();
302        self.timers.retain(|_, timer| {
303            timer.advance(to_time_ns).for_each(|event| {
304                events.push(event);
305            });
306
307            !timer.is_expired()
308        });
309
310        if events.len() >= WARN_TIME_EVENTS_THRESHOLD {
311            log::warn!(
312                "Allocated {} time events during clock advancement from {} to {}, \
313                 consider stopping the timer between large time ranges with no data points",
314                events.len().separate_with_commas(),
315                from_time_ns,
316                to_time_ns
317            );
318        }
319
320        events.sort_by(|a, b| a.ts_event.cmp(&b.ts_event));
321        events
322    }
323
324    /// Advances the internal clock to the specified `to_time_ns` and optionally sets the clock to that time.
325    ///
326    /// Pushes the [`TimeEvent`]s on the heap to ensure ordering
327    ///
328    /// Note: `set_time` is not used but present to keep backward compatible api call
329    ///
330    /// # Warnings
331    ///
332    /// Logs a warning when the internal heap already exceeds 100,000 scheduled events before pushing new ones.
333    ///
334    /// # Panics
335    ///
336    /// Panics if `to_time_ns` is less than the current internal clock time.
337    pub fn advance_to_time_on_heap(&mut self, to_time_ns: UnixNanos) {
338        const WARN_HEAP_SIZE_THRESHOLD: usize = 100_000;
339
340        let from_time_ns = self.time.get_time_ns();
341
342        // Time should be non-decreasing
343        assert!(
344            to_time_ns >= from_time_ns,
345            "`to_time_ns` {to_time_ns} was < `from_time_ns` {}",
346            from_time_ns
347        );
348
349        self.time.set_time(to_time_ns);
350
351        if self.heap.len() > WARN_HEAP_SIZE_THRESHOLD {
352            log::warn!(
353                "TestClock heap size {} exceeds recommended limit",
354                self.heap.len()
355            );
356        }
357
358        // Iterate and advance timers and push events to heap. Only retain alive timers.
359        self.timers.retain(|_, timer| {
360            timer.advance(to_time_ns).for_each(|event| {
361                self.heap.push(ScheduledTimeEvent::new(event));
362            });
363
364            !timer.is_expired()
365        });
366    }
367
368    /// Matches `TimeEvent` objects with their corresponding event handlers.
369    ///
370    /// This function takes an `events` vector of `TimeEvent` objects, assumes they are already sorted
371    /// by their `ts_event`, and matches them with the appropriate callback handler from the internal
372    /// registry of callbacks. If no specific callback is found for an event, the default callback is used.
373    ///
374    /// # Panics
375    ///
376    /// Panics if the default callback is not set for the clock when matching handlers.
377    #[must_use]
378    pub fn match_handlers(&self, events: Vec<TimeEvent>) -> Vec<TimeEventHandlerV2> {
379        events
380            .into_iter()
381            .map(|event| {
382                let callback = self.callbacks.get(&event.name).cloned().unwrap_or_else(|| {
383                    // If callback_py is None, use the default_callback_py
384                    // TODO: clone for now
385                    self.default_callback
386                        .clone()
387                        .expect("Default callback should exist")
388                });
389                TimeEventHandlerV2::new(event, callback)
390            })
391            .collect()
392    }
393}
394
395impl Iterator for TestClock {
396    type Item = TimeEventHandlerV2;
397
398    fn next(&mut self) -> Option<Self::Item> {
399        self.heap
400            .pop()
401            .map(|event| self.get_handler(event.into_inner()))
402    }
403}
404
405impl Default for TestClock {
406    /// Creates a new default [`TestClock`] instance.
407    fn default() -> Self {
408        Self::new()
409    }
410}
411
412impl Deref for TestClock {
413    type Target = AtomicTime;
414
415    fn deref(&self) -> &Self::Target {
416        &self.time
417    }
418}
419
420impl Clock for TestClock {
421    fn timestamp_ns(&self) -> UnixNanos {
422        self.time.get_time_ns()
423    }
424
425    fn timestamp_us(&self) -> u64 {
426        self.time.get_time_us()
427    }
428
429    fn timestamp_ms(&self) -> u64 {
430        self.time.get_time_ms()
431    }
432
433    fn timestamp(&self) -> f64 {
434        self.time.get_time()
435    }
436
437    fn timer_names(&self) -> Vec<&str> {
438        self.timers
439            .iter()
440            .filter(|(_, timer)| !timer.is_expired())
441            .map(|(k, _)| k.as_str())
442            .collect()
443    }
444
445    fn timer_count(&self) -> usize {
446        self.timers
447            .iter()
448            .filter(|(_, timer)| !timer.is_expired())
449            .count()
450    }
451
452    fn timer_exists(&self, name: &Ustr) -> bool {
453        self.timers.contains_key(name)
454    }
455
456    fn register_default_handler(&mut self, callback: TimeEventCallback) {
457        self.default_callback = Some(callback);
458    }
459
460    /// Returns the handler for the given `TimeEvent`.
461    ///
462    /// # Panics
463    ///
464    /// Panics if no event-specific or default callback has been registered for the event.
465    fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
466        // Get the callback from either the event-specific callbacks or default callback
467        let callback = self
468            .callbacks
469            .get(&event.name)
470            .cloned()
471            .or_else(|| self.default_callback.clone())
472            .unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
473
474        TimeEventHandlerV2::new(event, callback)
475    }
476
477    fn set_time_alert_ns(
478        &mut self,
479        name: &str,
480        mut alert_time_ns: UnixNanos, // mut allows adjustment based on allow_past
481        callback: Option<TimeEventCallback>,
482        allow_past: Option<bool>,
483    ) -> anyhow::Result<()> {
484        check_valid_string_ascii(name, stringify!(name))?;
485
486        let name = Ustr::from(name);
487        let allow_past = allow_past.unwrap_or(true);
488
489        if self.timer_exists(&name) {
490            self.cancel_timer(name.as_str());
491            log::warn!("Timer '{name}' replaced");
492        }
493
494        check_predicate_true(
495            callback.is_some()
496                | self.callbacks.contains_key(&name)
497                | self.default_callback.is_some(),
498            "No callbacks provided",
499        )?;
500
501        match callback {
502            Some(callback_py) => self.callbacks.insert(name, callback_py),
503            None => None,
504        };
505
506        let ts_now = self.get_time_ns();
507
508        if alert_time_ns < ts_now {
509            if allow_past {
510                alert_time_ns = ts_now;
511                log::warn!(
512                    "Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
513                    alert_time_ns.to_rfc3339(),
514                );
515            } else {
516                anyhow::bail!(
517                    "Timer '{name}' alert time {} was in the past (current time is {})",
518                    alert_time_ns.to_rfc3339(),
519                    ts_now.to_rfc3339(),
520                );
521            }
522        }
523
524        // Safe to calculate interval now that we've ensured alert_time_ns >= ts_now
525        let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
526        // When alert time equals current time, fire immediately
527        let fire_immediately = alert_time_ns == ts_now;
528
529        let timer = TestTimer::new(
530            name,
531            interval_ns,
532            ts_now,
533            Some(alert_time_ns),
534            fire_immediately,
535        );
536        self.timers.insert(name, timer);
537
538        Ok(())
539    }
540
541    fn set_timer_ns(
542        &mut self,
543        name: &str,
544        interval_ns: u64,
545        start_time_ns: Option<UnixNanos>,
546        stop_time_ns: Option<UnixNanos>,
547        callback: Option<TimeEventCallback>,
548        allow_past: Option<bool>,
549        fire_immediately: Option<bool>,
550    ) -> anyhow::Result<()> {
551        check_valid_string_ascii(name, stringify!(name))?;
552        check_positive_u64(interval_ns, stringify!(interval_ns))?;
553        check_predicate_true(
554            callback.is_some() | self.default_callback.is_some(),
555            "No callbacks provided",
556        )?;
557
558        let name = Ustr::from(name);
559        let allow_past = allow_past.unwrap_or(true);
560        let fire_immediately = fire_immediately.unwrap_or(false);
561
562        if self.timer_exists(&name) {
563            self.cancel_timer(name.as_str());
564            log::warn!("Timer '{name}' replaced");
565        }
566
567        match callback {
568            Some(callback_py) => self.callbacks.insert(name, callback_py),
569            None => None,
570        };
571
572        let mut start_time_ns = start_time_ns.unwrap_or_default();
573        let ts_now = self.get_time_ns();
574
575        if start_time_ns == 0 {
576            // Zero start time indicates no explicit start; we use the current time
577            start_time_ns = self.timestamp_ns();
578        } else if !allow_past {
579            // Calculate the next event time based on fire_immediately flag
580            let next_event_time = if fire_immediately {
581                start_time_ns
582            } else {
583                start_time_ns + interval_ns
584            };
585
586            // Check if the next event would be in the past
587            if next_event_time < ts_now {
588                anyhow::bail!(
589                    "Timer '{name}' next event time {} would be in the past (current time is {})",
590                    next_event_time.to_rfc3339(),
591                    ts_now.to_rfc3339(),
592                );
593            }
594        }
595
596        if let Some(stop_time) = stop_time_ns {
597            if stop_time <= start_time_ns {
598                anyhow::bail!(
599                    "Timer '{name}' stop time {} must be after start time {}",
600                    stop_time.to_rfc3339(),
601                    start_time_ns.to_rfc3339(),
602                );
603            }
604            if !allow_past && stop_time <= ts_now {
605                anyhow::bail!(
606                    "Timer '{name}' stop time {} is in the past (current time is {})",
607                    stop_time.to_rfc3339(),
608                    ts_now.to_rfc3339(),
609                );
610            }
611        }
612
613        let interval_ns = create_valid_interval(interval_ns);
614
615        let timer = TestTimer::new(
616            name,
617            interval_ns,
618            start_time_ns,
619            stop_time_ns,
620            fire_immediately,
621        );
622        self.timers.insert(name, timer);
623
624        Ok(())
625    }
626
627    fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
628        self.timers
629            .get(&Ustr::from(name))
630            .map(|timer| timer.next_time_ns())
631    }
632
633    fn cancel_timer(&mut self, name: &str) {
634        let timer = self.timers.remove(&Ustr::from(name));
635        if let Some(mut timer) = timer {
636            timer.cancel();
637        }
638    }
639
640    fn cancel_timers(&mut self) {
641        for timer in &mut self.timers.values_mut() {
642            timer.cancel();
643        }
644
645        self.timers.clear();
646    }
647
648    fn reset(&mut self) {
649        self.time = AtomicTime::new(false, UnixNanos::default());
650        self.timers = BTreeMap::new();
651        self.heap = BinaryHeap::new();
652        self.callbacks = HashMap::new();
653    }
654}
655
656/// A real-time clock which uses system time.
657///
658/// Timestamps are guaranteed to be unique and monotonically increasing.
659///
660/// # Threading
661///
662/// The clock holds thread-local runtime state and must remain on its originating thread.
663#[derive(Debug)]
664pub struct LiveClock {
665    time: &'static AtomicTime,
666    timers: HashMap<Ustr, LiveTimer>,
667    default_callback: Option<TimeEventCallback>,
668    callbacks: HashMap<Ustr, TimeEventCallback>,
669    sender: Option<Arc<dyn TimeEventSender>>,
670}
671
672impl LiveClock {
673    /// Creates a new [`LiveClock`] instance.
674    #[must_use]
675    pub fn new(sender: Option<Arc<dyn TimeEventSender>>) -> Self {
676        Self {
677            time: get_atomic_clock_realtime(),
678            timers: HashMap::new(),
679            default_callback: None,
680            callbacks: HashMap::new(),
681            sender,
682        }
683    }
684
685    #[must_use]
686    pub const fn get_timers(&self) -> &HashMap<Ustr, LiveTimer> {
687        &self.timers
688    }
689
690    // Clean up expired timers. Retain only live ones
691    fn clear_expired_timers(&mut self) {
692        self.timers.retain(|_, timer| !timer.is_expired());
693    }
694}
695
696impl Default for LiveClock {
697    /// Creates a new default [`LiveClock`] instance.
698    fn default() -> Self {
699        Self::new(Some(get_time_event_sender()))
700    }
701}
702
703impl Deref for LiveClock {
704    type Target = AtomicTime;
705
706    fn deref(&self) -> &Self::Target {
707        self.time
708    }
709}
710
711impl Clock for LiveClock {
712    fn timestamp_ns(&self) -> UnixNanos {
713        self.time.get_time_ns()
714    }
715
716    fn timestamp_us(&self) -> u64 {
717        self.time.get_time_us()
718    }
719
720    fn timestamp_ms(&self) -> u64 {
721        self.time.get_time_ms()
722    }
723
724    fn timestamp(&self) -> f64 {
725        self.time.get_time()
726    }
727
728    fn timer_names(&self) -> Vec<&str> {
729        self.timers
730            .iter()
731            .filter(|(_, timer)| !timer.is_expired())
732            .map(|(k, _)| k.as_str())
733            .collect()
734    }
735
736    fn timer_count(&self) -> usize {
737        self.timers
738            .iter()
739            .filter(|(_, timer)| !timer.is_expired())
740            .count()
741    }
742
743    fn timer_exists(&self, name: &Ustr) -> bool {
744        self.timers.contains_key(name)
745    }
746
747    fn register_default_handler(&mut self, handler: TimeEventCallback) {
748        self.default_callback = Some(handler);
749    }
750
751    /// # Panics
752    ///
753    /// This function panics if:
754    /// - The event does not have an associated handler (see trait documentation).
755    #[allow(unused_variables)]
756    fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
757        // Get the callback from either the event-specific callbacks or default callback
758        let callback = self
759            .callbacks
760            .get(&event.name)
761            .cloned()
762            .or_else(|| self.default_callback.clone())
763            .unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
764
765        TimeEventHandlerV2::new(event, callback)
766    }
767
768    fn set_time_alert_ns(
769        &mut self,
770        name: &str,
771        mut alert_time_ns: UnixNanos, // mut allows adjustment based on allow_past
772        callback: Option<TimeEventCallback>,
773        allow_past: Option<bool>,
774    ) -> anyhow::Result<()> {
775        check_valid_string_ascii(name, stringify!(name))?;
776
777        let name = Ustr::from(name);
778        let allow_past = allow_past.unwrap_or(true);
779
780        if self.timer_exists(&name) {
781            self.cancel_timer(name.as_str());
782            log::warn!("Timer '{name}' replaced");
783        }
784
785        check_predicate_true(
786            callback.is_some()
787                | self.callbacks.contains_key(&name)
788                | self.default_callback.is_some(),
789            "No callbacks provided",
790        )?;
791
792        let callback = if let Some(callback) = callback {
793            self.callbacks.insert(name, callback.clone());
794            callback
795        } else if let Some(existing) = self.callbacks.get(&name) {
796            existing.clone()
797        } else {
798            let default = self
799                .default_callback
800                .clone()
801                .expect("Default callback should exist");
802            self.callbacks.insert(name, default.clone());
803            default
804        };
805
806        let ts_now = self.get_time_ns();
807
808        // Handle past timestamps based on flag
809        if alert_time_ns < ts_now {
810            if allow_past {
811                alert_time_ns = ts_now;
812                log::warn!(
813                    "Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
814                    alert_time_ns.to_rfc3339(),
815                );
816            } else {
817                anyhow::bail!(
818                    "Timer '{name}' alert time {} was in the past (current time is {})",
819                    alert_time_ns.to_rfc3339(),
820                    ts_now.to_rfc3339(),
821                );
822            }
823        }
824
825        // Safe to calculate interval now that we've ensured alert_time_ns >= ts_now
826        let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
827
828        let mut timer = LiveTimer::new(
829            name,
830            interval_ns,
831            ts_now,
832            Some(alert_time_ns),
833            callback,
834            false,
835            self.sender.clone(),
836        );
837
838        timer.start();
839
840        self.clear_expired_timers();
841        self.timers.insert(name, timer);
842
843        Ok(())
844    }
845
846    fn set_timer_ns(
847        &mut self,
848        name: &str,
849        interval_ns: u64,
850        start_time_ns: Option<UnixNanos>,
851        stop_time_ns: Option<UnixNanos>,
852        callback: Option<TimeEventCallback>,
853        allow_past: Option<bool>,
854        fire_immediately: Option<bool>,
855    ) -> anyhow::Result<()> {
856        check_valid_string_ascii(name, stringify!(name))?;
857        check_positive_u64(interval_ns, stringify!(interval_ns))?;
858        check_predicate_true(
859            callback.is_some() | self.default_callback.is_some(),
860            "No callbacks provided",
861        )?;
862
863        let name = Ustr::from(name);
864        let allow_past = allow_past.unwrap_or(true);
865        let fire_immediately = fire_immediately.unwrap_or(false);
866
867        if self.timer_exists(&name) {
868            self.cancel_timer(name.as_str());
869            log::warn!("Timer '{name}' replaced");
870        }
871
872        let callback = match callback {
873            Some(callback) => callback,
874            None => self.default_callback.clone().unwrap(),
875        };
876
877        self.callbacks.insert(name, callback.clone());
878
879        let mut start_time_ns = start_time_ns.unwrap_or_default();
880        let ts_now = self.get_time_ns();
881
882        if start_time_ns == 0 {
883            // Zero start time indicates no explicit start; we use the current time
884            start_time_ns = self.timestamp_ns();
885        } else if start_time_ns < ts_now && !allow_past {
886            anyhow::bail!(
887                "Timer '{name}' start time {} was in the past (current time is {})",
888                start_time_ns.to_rfc3339(),
889                ts_now.to_rfc3339(),
890            );
891        }
892
893        if let Some(stop_time) = stop_time_ns {
894            if stop_time <= start_time_ns {
895                anyhow::bail!(
896                    "Timer '{name}' stop time {} must be after start time {}",
897                    stop_time.to_rfc3339(),
898                    start_time_ns.to_rfc3339(),
899                );
900            }
901            if !allow_past && stop_time <= ts_now {
902                anyhow::bail!(
903                    "Timer '{name}' stop time {} is in the past (current time is {})",
904                    stop_time.to_rfc3339(),
905                    ts_now.to_rfc3339(),
906                );
907            }
908        }
909
910        let interval_ns = create_valid_interval(interval_ns);
911
912        let mut timer = LiveTimer::new(
913            name,
914            interval_ns,
915            start_time_ns,
916            stop_time_ns,
917            callback,
918            fire_immediately,
919            self.sender.clone(),
920        );
921        timer.start();
922
923        self.clear_expired_timers();
924        self.timers.insert(name, timer);
925
926        Ok(())
927    }
928
929    fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
930        self.timers
931            .get(&Ustr::from(name))
932            .map(|timer| timer.next_time_ns())
933    }
934
935    fn cancel_timer(&mut self, name: &str) {
936        let timer = self.timers.remove(&Ustr::from(name));
937        if let Some(mut timer) = timer {
938            timer.cancel();
939        }
940    }
941
942    fn cancel_timers(&mut self) {
943        for timer in &mut self.timers.values_mut() {
944            timer.cancel();
945        }
946
947        self.timers.clear();
948    }
949
950    fn reset(&mut self) {
951        self.cancel_timers();
952        self.callbacks.clear();
953    }
954}
955
956// Helper struct to stream events from the heap
957#[derive(Debug)]
958pub struct TimeEventStream {
959    heap: Arc<Mutex<BinaryHeap<ScheduledTimeEvent>>>,
960}
961
962impl TimeEventStream {
963    pub const fn new(heap: Arc<Mutex<BinaryHeap<ScheduledTimeEvent>>>) -> Self {
964        Self { heap }
965    }
966}
967
968impl Stream for TimeEventStream {
969    type Item = TimeEvent;
970
971    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
972        let mut heap = match self.heap.try_lock() {
973            Ok(guard) => guard,
974            Err(e) => {
975                tracing::error!("Unable to get LiveClock heap lock: {e}");
976                cx.waker().wake_by_ref();
977                return Poll::Pending;
978            }
979        };
980
981        if let Some(event) = heap.pop() {
982            Poll::Ready(Some(event.into_inner()))
983        } else {
984            cx.waker().wake_by_ref();
985            Poll::Pending
986        }
987    }
988}
989
990////////////////////////////////////////////////////////////////////////////////
991// Tests
992////////////////////////////////////////////////////////////////////////////////
993
994#[cfg(test)]
995mod tests {
996    use std::{
997        sync::{Arc, Mutex},
998        time::Duration,
999    };
1000
1001    use nautilus_core::{MUTEX_POISONED, time::get_atomic_clock_realtime};
1002    use rstest::{fixture, rstest};
1003    use ustr::Ustr;
1004
1005    use super::*;
1006    use crate::{runner::TimeEventSender, testing::wait_until};
1007
1008    #[derive(Debug, Default)]
1009    struct TestCallback {
1010        /// Shared flag updated from within the timer callback; Mutex keeps the closure `Send` for tests.
1011        called: Arc<Mutex<bool>>,
1012    }
1013
1014    impl TestCallback {
1015        fn new(called: Arc<Mutex<bool>>) -> Self {
1016            Self { called }
1017        }
1018    }
1019
1020    impl From<TestCallback> for TimeEventCallback {
1021        fn from(callback: TestCallback) -> Self {
1022            Self::from(move |_event: TimeEvent| {
1023                if let Ok(mut called) = callback.called.lock() {
1024                    *called = true;
1025                }
1026            })
1027        }
1028    }
1029
1030    #[derive(Debug)]
1031    struct CollectingSender {
1032        events: Arc<Mutex<Vec<(TimeEvent, UnixNanos)>>>,
1033    }
1034
1035    impl CollectingSender {
1036        fn new(events: Arc<Mutex<Vec<(TimeEvent, UnixNanos)>>>) -> Self {
1037            Self { events }
1038        }
1039    }
1040
1041    impl TimeEventSender for CollectingSender {
1042        fn send(&self, handler: TimeEventHandlerV2) {
1043            let TimeEventHandlerV2 { event, callback } = handler;
1044            let now_ns = get_atomic_clock_realtime().get_time_ns();
1045            let event_clone = event.clone();
1046            callback.call(event);
1047            self.events
1048                .lock()
1049                .expect(MUTEX_POISONED)
1050                .push((event_clone, now_ns));
1051        }
1052    }
1053
1054    fn wait_for_events(
1055        events: &Arc<Mutex<Vec<(TimeEvent, UnixNanos)>>>,
1056        target: usize,
1057        timeout: Duration,
1058    ) {
1059        wait_until(
1060            || events.lock().expect(MUTEX_POISONED).len() >= target,
1061            timeout,
1062        );
1063    }
1064
1065    #[fixture]
1066    pub fn test_clock() -> TestClock {
1067        let mut clock = TestClock::new();
1068        clock.register_default_handler(TestCallback::default().into());
1069        clock
1070    }
1071
1072    #[rstest]
1073    fn test_time_monotonicity(mut test_clock: TestClock) {
1074        let initial_time = test_clock.timestamp_ns();
1075        test_clock.advance_time(UnixNanos::from(*initial_time + 1000), true);
1076        assert!(test_clock.timestamp_ns() > initial_time);
1077    }
1078
1079    #[rstest]
1080    fn test_timer_registration(mut test_clock: TestClock) {
1081        test_clock
1082            .set_time_alert_ns(
1083                "test_timer",
1084                (*test_clock.timestamp_ns() + 1000).into(),
1085                None,
1086                None,
1087            )
1088            .unwrap();
1089        assert_eq!(test_clock.timer_count(), 1);
1090        assert_eq!(test_clock.timer_names(), vec!["test_timer"]);
1091    }
1092
1093    #[rstest]
1094    fn test_timer_expiration(mut test_clock: TestClock) {
1095        let alert_time = (*test_clock.timestamp_ns() + 1000).into();
1096        test_clock
1097            .set_time_alert_ns("test_timer", alert_time, None, None)
1098            .unwrap();
1099        let events = test_clock.advance_time(alert_time, true);
1100        assert_eq!(events.len(), 1);
1101        assert_eq!(events[0].name.as_str(), "test_timer");
1102    }
1103
1104    #[rstest]
1105    fn test_timer_cancellation(mut test_clock: TestClock) {
1106        test_clock
1107            .set_time_alert_ns(
1108                "test_timer",
1109                (*test_clock.timestamp_ns() + 1000).into(),
1110                None,
1111                None,
1112            )
1113            .unwrap();
1114        assert_eq!(test_clock.timer_count(), 1);
1115        test_clock.cancel_timer("test_timer");
1116        assert_eq!(test_clock.timer_count(), 0);
1117    }
1118
1119    #[rstest]
1120    fn test_time_advancement(mut test_clock: TestClock) {
1121        let start_time = test_clock.timestamp_ns();
1122        test_clock
1123            .set_timer_ns("test_timer", 1000, Some(start_time), None, None, None, None)
1124            .unwrap();
1125        let events = test_clock.advance_time(UnixNanos::from(*start_time + 2500), true);
1126        assert_eq!(events.len(), 2);
1127        assert_eq!(*events[0].ts_event, *start_time + 1000);
1128        assert_eq!(*events[1].ts_event, *start_time + 2000);
1129    }
1130
1131    #[rstest]
1132    fn test_default_and_custom_callbacks() {
1133        let mut clock = TestClock::new();
1134        let default_called = Arc::new(Mutex::new(false));
1135        let custom_called = Arc::new(Mutex::new(false));
1136
1137        let default_callback = TestCallback::new(Arc::clone(&default_called));
1138        let custom_callback = TestCallback::new(Arc::clone(&custom_called));
1139
1140        clock.register_default_handler(TimeEventCallback::from(default_callback));
1141        clock
1142            .set_time_alert_ns(
1143                "default_timer",
1144                (*clock.timestamp_ns() + 1000).into(),
1145                None,
1146                None,
1147            )
1148            .unwrap();
1149        clock
1150            .set_time_alert_ns(
1151                "custom_timer",
1152                (*clock.timestamp_ns() + 1000).into(),
1153                Some(TimeEventCallback::from(custom_callback)),
1154                None,
1155            )
1156            .unwrap();
1157
1158        let events = clock.advance_time(UnixNanos::from(*clock.timestamp_ns() + 1000), true);
1159        let handlers = clock.match_handlers(events);
1160
1161        for handler in handlers {
1162            handler.callback.call(handler.event);
1163        }
1164
1165        assert!(*default_called.lock().expect(MUTEX_POISONED));
1166        assert!(*custom_called.lock().expect(MUTEX_POISONED));
1167    }
1168
1169    #[rstest]
1170    fn test_multiple_timers(mut test_clock: TestClock) {
1171        let start_time = test_clock.timestamp_ns();
1172        test_clock
1173            .set_timer_ns("timer1", 1000, Some(start_time), None, None, None, None)
1174            .unwrap();
1175        test_clock
1176            .set_timer_ns("timer2", 2000, Some(start_time), None, None, None, None)
1177            .unwrap();
1178        let events = test_clock.advance_time(UnixNanos::from(*start_time + 2000), true);
1179        assert_eq!(events.len(), 3);
1180        assert_eq!(events[0].name.as_str(), "timer1");
1181        assert_eq!(events[1].name.as_str(), "timer1");
1182        assert_eq!(events[2].name.as_str(), "timer2");
1183    }
1184
1185    #[rstest]
1186    fn test_allow_past_parameter_true(mut test_clock: TestClock) {
1187        test_clock.set_time(UnixNanos::from(2000));
1188        let current_time = test_clock.timestamp_ns();
1189        let past_time = UnixNanos::from(current_time.as_u64() - 1000);
1190
1191        // With allow_past=true (default), should adjust to current time and succeed
1192        test_clock
1193            .set_time_alert_ns("past_timer", past_time, None, Some(true))
1194            .unwrap();
1195
1196        // Verify timer was created with adjusted time
1197        assert_eq!(test_clock.timer_count(), 1);
1198        assert_eq!(test_clock.timer_names(), vec!["past_timer"]);
1199
1200        // Next time should be at or after current time, not in the past
1201        let next_time = test_clock.next_time_ns("past_timer").unwrap();
1202        assert!(next_time >= current_time);
1203    }
1204
1205    #[rstest]
1206    fn test_allow_past_parameter_false(mut test_clock: TestClock) {
1207        test_clock.set_time(UnixNanos::from(2000));
1208        let current_time = test_clock.timestamp_ns();
1209        let past_time = current_time - 1000;
1210
1211        // With allow_past=false, should fail for past times
1212        let result = test_clock.set_time_alert_ns("past_timer", past_time, None, Some(false));
1213
1214        // Verify the operation failed with appropriate error
1215        assert!(result.is_err());
1216        assert!(format!("{}", result.unwrap_err()).contains("was in the past"));
1217
1218        // Verify no timer was created
1219        assert_eq!(test_clock.timer_count(), 0);
1220        assert!(test_clock.timer_names().is_empty());
1221    }
1222
1223    #[rstest]
1224    fn test_invalid_stop_time_validation(mut test_clock: TestClock) {
1225        test_clock.set_time(UnixNanos::from(2000));
1226        let current_time = test_clock.timestamp_ns();
1227        let start_time = current_time + 1000;
1228        let stop_time = current_time + 500; // Stop time before start time
1229
1230        // Should fail because stop_time < start_time
1231        let result = test_clock.set_timer_ns(
1232            "invalid_timer",
1233            100,
1234            Some(start_time),
1235            Some(stop_time),
1236            None,
1237            None,
1238            None,
1239        );
1240
1241        // Verify the operation failed with appropriate error
1242        assert!(result.is_err());
1243        assert!(format!("{}", result.unwrap_err()).contains("must be after start time"));
1244
1245        // Verify no timer was created
1246        assert_eq!(test_clock.timer_count(), 0);
1247    }
1248
1249    #[rstest]
1250    fn test_set_timer_ns_fire_immediately_true(mut test_clock: TestClock) {
1251        let start_time = test_clock.timestamp_ns();
1252        let interval_ns = 1000;
1253
1254        test_clock
1255            .set_timer_ns(
1256                "fire_immediately_timer",
1257                interval_ns,
1258                Some(start_time),
1259                None,
1260                None,
1261                None,
1262                Some(true),
1263            )
1264            .unwrap();
1265
1266        // Advance time to check immediate firing and subsequent intervals
1267        let events = test_clock.advance_time(start_time + 2500, true);
1268
1269        // Should fire immediately at start_time (0), then at start_time+1000, then at start_time+2000
1270        assert_eq!(events.len(), 3);
1271        assert_eq!(*events[0].ts_event, *start_time); // Fires immediately
1272        assert_eq!(*events[1].ts_event, *start_time + 1000); // Then after interval
1273        assert_eq!(*events[2].ts_event, *start_time + 2000); // Then after second interval
1274    }
1275
1276    #[rstest]
1277    fn test_set_timer_ns_fire_immediately_false(mut test_clock: TestClock) {
1278        let start_time = test_clock.timestamp_ns();
1279        let interval_ns = 1000;
1280
1281        test_clock
1282            .set_timer_ns(
1283                "normal_timer",
1284                interval_ns,
1285                Some(start_time),
1286                None,
1287                None,
1288                None,
1289                Some(false),
1290            )
1291            .unwrap();
1292
1293        // Advance time to check normal behavior
1294        let events = test_clock.advance_time(start_time + 2500, true);
1295
1296        // Should fire after first interval, not immediately
1297        assert_eq!(events.len(), 2);
1298        assert_eq!(*events[0].ts_event, *start_time + 1000); // Fires after first interval
1299        assert_eq!(*events[1].ts_event, *start_time + 2000); // Then after second interval
1300    }
1301
1302    #[rstest]
1303    fn test_set_timer_ns_fire_immediately_default_is_false(mut test_clock: TestClock) {
1304        let start_time = test_clock.timestamp_ns();
1305        let interval_ns = 1000;
1306
1307        // Don't specify fire_immediately (should default to false)
1308        test_clock
1309            .set_timer_ns(
1310                "default_timer",
1311                interval_ns,
1312                Some(start_time),
1313                None,
1314                None,
1315                None,
1316                None,
1317            )
1318            .unwrap();
1319
1320        let events = test_clock.advance_time(start_time + 1500, true);
1321
1322        // Should behave the same as fire_immediately=false
1323        assert_eq!(events.len(), 1);
1324        assert_eq!(*events[0].ts_event, *start_time + 1000); // Fires after first interval
1325    }
1326
1327    #[rstest]
1328    fn test_set_timer_ns_fire_immediately_with_zero_start_time(mut test_clock: TestClock) {
1329        test_clock.set_time(5000.into());
1330        let interval_ns = 1000;
1331
1332        test_clock
1333            .set_timer_ns(
1334                "zero_start_timer",
1335                interval_ns,
1336                None,
1337                None,
1338                None,
1339                None,
1340                Some(true),
1341            )
1342            .unwrap();
1343
1344        let events = test_clock.advance_time(UnixNanos::from(7000), true);
1345
1346        // With zero start time, should use current time as start
1347        // Fire immediately at current time (5000), then at 6000, 7000
1348        assert_eq!(events.len(), 3);
1349        assert_eq!(*events[0].ts_event, 5000); // Immediate fire at current time
1350        assert_eq!(*events[1].ts_event, 6000);
1351        assert_eq!(*events[2].ts_event, 7000);
1352    }
1353
1354    #[rstest]
1355    fn test_multiple_timers_different_fire_immediately_settings(mut test_clock: TestClock) {
1356        let start_time = test_clock.timestamp_ns();
1357        let interval_ns = 1000;
1358
1359        // One timer with fire_immediately=true
1360        test_clock
1361            .set_timer_ns(
1362                "immediate_timer",
1363                interval_ns,
1364                Some(start_time),
1365                None,
1366                None,
1367                None,
1368                Some(true),
1369            )
1370            .unwrap();
1371
1372        // One timer with fire_immediately=false
1373        test_clock
1374            .set_timer_ns(
1375                "normal_timer",
1376                interval_ns,
1377                Some(start_time),
1378                None,
1379                None,
1380                None,
1381                Some(false),
1382            )
1383            .unwrap();
1384
1385        let events = test_clock.advance_time(start_time + 1500, true);
1386
1387        // Should have 3 events total: immediate_timer fires at start & 1000, normal_timer fires at 1000
1388        assert_eq!(events.len(), 3);
1389
1390        // Sort events by timestamp to check order
1391        let mut event_times: Vec<u64> = events.iter().map(|e| e.ts_event.as_u64()).collect();
1392        event_times.sort();
1393
1394        assert_eq!(event_times[0], start_time.as_u64()); // immediate_timer fires immediately
1395        assert_eq!(event_times[1], start_time.as_u64() + 1000); // both timers fire at 1000
1396        assert_eq!(event_times[2], start_time.as_u64() + 1000); // both timers fire at 1000
1397    }
1398
1399    #[rstest]
1400    fn test_timer_name_collision_overwrites(mut test_clock: TestClock) {
1401        let start_time = test_clock.timestamp_ns();
1402
1403        // Set first timer
1404        test_clock
1405            .set_timer_ns(
1406                "collision_timer",
1407                1000,
1408                Some(start_time),
1409                None,
1410                None,
1411                None,
1412                None,
1413            )
1414            .unwrap();
1415
1416        // Setting timer with same name should overwrite the existing one
1417        let result = test_clock.set_timer_ns(
1418            "collision_timer",
1419            2000,
1420            Some(start_time),
1421            None,
1422            None,
1423            None,
1424            None,
1425        );
1426
1427        assert!(result.is_ok());
1428        // Should still only have one timer (overwritten)
1429        assert_eq!(test_clock.timer_count(), 1);
1430
1431        // The timer should have the new interval
1432        let next_time = test_clock.next_time_ns("collision_timer").unwrap();
1433        // With interval 2000 and start at start_time, next time should be start_time + 2000
1434        assert_eq!(next_time, start_time + 2000);
1435    }
1436
1437    #[rstest]
1438    fn test_timer_zero_interval_error(mut test_clock: TestClock) {
1439        let start_time = test_clock.timestamp_ns();
1440
1441        // Attempt to set timer with zero interval should fail
1442        let result =
1443            test_clock.set_timer_ns("zero_interval", 0, Some(start_time), None, None, None, None);
1444
1445        assert!(result.is_err());
1446        assert_eq!(test_clock.timer_count(), 0);
1447    }
1448
1449    #[rstest]
1450    fn test_timer_empty_name_error(mut test_clock: TestClock) {
1451        let start_time = test_clock.timestamp_ns();
1452
1453        // Attempt to set timer with empty name should fail
1454        let result = test_clock.set_timer_ns("", 1000, Some(start_time), None, None, None, None);
1455
1456        assert!(result.is_err());
1457        assert_eq!(test_clock.timer_count(), 0);
1458    }
1459
1460    #[rstest]
1461    fn test_timer_exists(mut test_clock: TestClock) {
1462        let name = Ustr::from("exists_timer");
1463        assert!(!test_clock.timer_exists(&name));
1464
1465        test_clock
1466            .set_time_alert_ns(
1467                name.as_str(),
1468                (*test_clock.timestamp_ns() + 1_000).into(),
1469                None,
1470                None,
1471            )
1472            .unwrap();
1473
1474        assert!(test_clock.timer_exists(&name));
1475    }
1476
1477    #[rstest]
1478    fn test_timer_rejects_past_stop_time_when_not_allowed(mut test_clock: TestClock) {
1479        test_clock.set_time(UnixNanos::from(10_000));
1480        let current = test_clock.timestamp_ns();
1481
1482        let result = test_clock.set_timer_ns(
1483            "past_stop",
1484            10_000,
1485            Some(current - 500),
1486            Some(current - 100),
1487            None,
1488            Some(false),
1489            None,
1490        );
1491
1492        let err = result.expect_err("expected stop time validation error");
1493        let err_msg = err.to_string();
1494        assert!(err_msg.contains("stop time"));
1495        assert!(err_msg.contains("in the past"));
1496    }
1497
1498    #[rstest]
1499    fn test_timer_accepts_future_stop_time(mut test_clock: TestClock) {
1500        let current = test_clock.timestamp_ns();
1501
1502        let result = test_clock.set_timer_ns(
1503            "future_stop",
1504            1_000,
1505            Some(current),
1506            Some(current + 10_000),
1507            None,
1508            Some(false),
1509            None,
1510        );
1511
1512        assert!(result.is_ok());
1513    }
1514
1515    #[rstest]
1516    fn test_live_clock_timer_replacement_cancels_previous_task() {
1517        let events = Arc::new(Mutex::new(Vec::new()));
1518        let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
1519
1520        let mut clock = LiveClock::new(Some(sender));
1521        clock.register_default_handler(TimeEventCallback::from(|_| {}));
1522
1523        let fast_interval = Duration::from_millis(10).as_nanos() as u64;
1524        clock
1525            .set_timer_ns("replace", fast_interval, None, None, None, None, None)
1526            .unwrap();
1527
1528        wait_for_events(&events, 2, Duration::from_millis(200));
1529        events.lock().expect(MUTEX_POISONED).clear();
1530
1531        let slow_interval = Duration::from_millis(30).as_nanos() as u64;
1532        clock
1533            .set_timer_ns("replace", slow_interval, None, None, None, None, None)
1534            .unwrap();
1535
1536        wait_for_events(&events, 3, Duration::from_millis(300));
1537
1538        let snapshot = events.lock().expect(MUTEX_POISONED).clone();
1539        let diffs: Vec<u64> = snapshot
1540            .windows(2)
1541            .map(|pair| pair[1].0.ts_event.as_u64() - pair[0].0.ts_event.as_u64())
1542            .collect();
1543
1544        assert!(!diffs.is_empty());
1545        for diff in diffs {
1546            assert_ne!(diff, fast_interval);
1547        }
1548
1549        clock.cancel_timers();
1550    }
1551
1552    #[rstest]
1553    fn test_live_clock_time_alert_persists_callback() {
1554        let events = Arc::new(Mutex::new(Vec::new()));
1555        let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
1556
1557        let mut clock = LiveClock::new(Some(sender));
1558        clock.register_default_handler(TimeEventCallback::from(|_| {}));
1559
1560        let now = clock.timestamp_ns();
1561        let alert_time = now + 1_000_u64;
1562
1563        clock
1564            .set_time_alert_ns("alert-callback", alert_time, None, None)
1565            .unwrap();
1566
1567        assert!(clock.callbacks.contains_key(&Ustr::from("alert-callback")));
1568
1569        clock.cancel_timers();
1570    }
1571
1572    #[rstest]
1573    fn test_live_clock_reset_stops_active_timers() {
1574        let events = Arc::new(Mutex::new(Vec::new()));
1575        let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
1576
1577        let mut clock = LiveClock::new(Some(sender));
1578        clock.register_default_handler(TimeEventCallback::from(|_| {}));
1579
1580        clock
1581            .set_timer_ns(
1582                "reset-test",
1583                Duration::from_millis(15).as_nanos() as u64,
1584                None,
1585                None,
1586                None,
1587                None,
1588                None,
1589            )
1590            .unwrap();
1591
1592        wait_for_events(&events, 2, Duration::from_millis(250));
1593
1594        clock.reset();
1595
1596        // Wait for any in-flight events to arrive
1597        let start = std::time::Instant::now();
1598        wait_until(
1599            || start.elapsed() >= Duration::from_millis(50),
1600            Duration::from_secs(2),
1601        );
1602
1603        // Clear any events that arrived before reset took effect
1604        events.lock().expect(MUTEX_POISONED).clear();
1605
1606        // Verify no new events arrive (timer should be stopped)
1607        let start = std::time::Instant::now();
1608        wait_until(
1609            || start.elapsed() >= Duration::from_millis(50),
1610            Duration::from_secs(2),
1611        );
1612        assert!(events.lock().expect(MUTEX_POISONED).is_empty());
1613    }
1614
1615    #[rstest]
1616    fn test_live_timer_short_delay_not_early() {
1617        let events = Arc::new(Mutex::new(Vec::new()));
1618        let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
1619
1620        let mut clock = LiveClock::new(Some(sender));
1621        clock.register_default_handler(TimeEventCallback::from(|_| {}));
1622
1623        let now = clock.timestamp_ns();
1624        let start_time = UnixNanos::from(*now + 500_000); // 0.5 ms in the future
1625        let interval_ns = 1_000_000;
1626
1627        clock
1628            .set_timer_ns(
1629                "short-delay",
1630                interval_ns,
1631                Some(start_time),
1632                None,
1633                None,
1634                None,
1635                Some(true),
1636            )
1637            .unwrap();
1638
1639        wait_for_events(&events, 1, Duration::from_millis(100));
1640
1641        let snapshot = events.lock().expect(MUTEX_POISONED).clone();
1642        assert!(!snapshot.is_empty());
1643
1644        for (event, actual_ts) in &snapshot {
1645            assert!(actual_ts.as_u64() >= event.ts_event.as_u64());
1646        }
1647
1648        clock.cancel_timers();
1649    }
1650
1651    #[rstest]
1652    fn test_timer_fire_immediately_at_exact_stop_time(mut test_clock: TestClock) {
1653        let start_time = test_clock.timestamp_ns();
1654        let interval_ns = 1000;
1655        let stop_time = start_time + interval_ns; // Stop exactly at first interval
1656
1657        test_clock
1658            .set_timer_ns(
1659                "exact_stop",
1660                interval_ns,
1661                Some(start_time),
1662                Some(stop_time),
1663                None,
1664                None,
1665                Some(true),
1666            )
1667            .unwrap();
1668
1669        let events = test_clock.advance_time(stop_time, true);
1670
1671        // Should fire immediately at start, then at stop time (which equals first interval)
1672        assert_eq!(events.len(), 2);
1673        assert_eq!(*events[0].ts_event, *start_time); // Immediate fire
1674        assert_eq!(*events[1].ts_event, *stop_time); // Fire at stop time
1675    }
1676
1677    #[rstest]
1678    fn test_timer_advance_to_exact_next_time(mut test_clock: TestClock) {
1679        let start_time = test_clock.timestamp_ns();
1680        let interval_ns = 1000;
1681
1682        test_clock
1683            .set_timer_ns(
1684                "exact_advance",
1685                interval_ns,
1686                Some(start_time),
1687                None,
1688                None,
1689                None,
1690                Some(false),
1691            )
1692            .unwrap();
1693
1694        // Advance to exactly the next fire time
1695        let next_time = test_clock.next_time_ns("exact_advance").unwrap();
1696        let events = test_clock.advance_time(next_time, true);
1697
1698        assert_eq!(events.len(), 1);
1699        assert_eq!(*events[0].ts_event, *next_time);
1700    }
1701
1702    #[rstest]
1703    fn test_allow_past_bar_aggregation_use_case(mut test_clock: TestClock) {
1704        // Simulate bar aggregation scenario: current time is in middle of a bar window
1705        test_clock.set_time(UnixNanos::from(100_500)); // 100.5 seconds
1706
1707        let bar_start_time = UnixNanos::from(100_000); // 100 seconds (0.5 sec ago)
1708        let interval_ns = 1000; // 1 second bars
1709
1710        // With allow_past=false and fire_immediately=false:
1711        // start_time is in past (100 sec) but next event (101 sec) is in future
1712        // This should be ALLOWED for bar aggregation
1713        let result = test_clock.set_timer_ns(
1714            "bar_timer",
1715            interval_ns,
1716            Some(bar_start_time),
1717            None,
1718            None,
1719            Some(false), // allow_past = false
1720            Some(false), // fire_immediately = false
1721        );
1722
1723        // Should succeed because next event time (100_000 + 1000 = 101_000) > current time (100_500)
1724        assert!(result.is_ok());
1725        assert_eq!(test_clock.timer_count(), 1);
1726
1727        // Next event should be at bar_start_time + interval = 101_000
1728        let next_time = test_clock.next_time_ns("bar_timer").unwrap();
1729        assert_eq!(*next_time, 101_000);
1730    }
1731
1732    #[rstest]
1733    fn test_allow_past_false_rejects_when_next_event_in_past(mut test_clock: TestClock) {
1734        test_clock.set_time(UnixNanos::from(102_000)); // 102 seconds
1735
1736        let past_start_time = UnixNanos::from(100_000); // 100 seconds (2 sec ago)
1737        let interval_ns = 1000; // 1 second interval
1738
1739        // With allow_past=false and fire_immediately=false:
1740        // Next event would be 100_000 + 1000 = 101_000, which is < current time (102_000)
1741        // This should be REJECTED
1742        let result = test_clock.set_timer_ns(
1743            "past_event_timer",
1744            interval_ns,
1745            Some(past_start_time),
1746            None,
1747            None,
1748            Some(false), // allow_past = false
1749            Some(false), // fire_immediately = false
1750        );
1751
1752        // Should fail because next event time (101_000) < current time (102_000)
1753        assert!(result.is_err());
1754        assert!(
1755            result
1756                .unwrap_err()
1757                .to_string()
1758                .contains("would be in the past")
1759        );
1760    }
1761
1762    #[rstest]
1763    fn test_allow_past_false_with_fire_immediately_true(mut test_clock: TestClock) {
1764        test_clock.set_time(UnixNanos::from(100_500)); // 100.5 seconds
1765
1766        let past_start_time = UnixNanos::from(100_000); // 100 seconds (0.5 sec ago)
1767        let interval_ns = 1000;
1768
1769        // With fire_immediately=true, next event = start_time (which is in past)
1770        // This should be REJECTED with allow_past=false
1771        let result = test_clock.set_timer_ns(
1772            "immediate_past_timer",
1773            interval_ns,
1774            Some(past_start_time),
1775            None,
1776            None,
1777            Some(false), // allow_past = false
1778            Some(true),  // fire_immediately = true
1779        );
1780
1781        // Should fail because next event time (100_000) < current time (100_500)
1782        assert!(result.is_err());
1783        assert!(
1784            result
1785                .unwrap_err()
1786                .to_string()
1787                .contains("would be in the past")
1788        );
1789    }
1790
1791    #[rstest]
1792    fn test_cancel_timer_during_execution(mut test_clock: TestClock) {
1793        let start_time = test_clock.timestamp_ns();
1794
1795        test_clock
1796            .set_timer_ns(
1797                "cancel_test",
1798                1000,
1799                Some(start_time),
1800                None,
1801                None,
1802                None,
1803                None,
1804            )
1805            .unwrap();
1806
1807        assert_eq!(test_clock.timer_count(), 1);
1808
1809        // Cancel the timer
1810        test_clock.cancel_timer("cancel_test");
1811
1812        assert_eq!(test_clock.timer_count(), 0);
1813
1814        // Advance time - should get no events from cancelled timer
1815        let events = test_clock.advance_time(start_time + 2000, true);
1816        assert_eq!(events.len(), 0);
1817    }
1818
1819    #[rstest]
1820    fn test_cancel_all_timers(mut test_clock: TestClock) {
1821        // Create multiple timers
1822        test_clock
1823            .set_timer_ns("timer1", 1000, None, None, None, None, None)
1824            .unwrap();
1825        test_clock
1826            .set_timer_ns("timer2", 1500, None, None, None, None, None)
1827            .unwrap();
1828        test_clock
1829            .set_timer_ns("timer3", 2000, None, None, None, None, None)
1830            .unwrap();
1831
1832        assert_eq!(test_clock.timer_count(), 3);
1833
1834        // Cancel all timers
1835        test_clock.cancel_timers();
1836
1837        assert_eq!(test_clock.timer_count(), 0);
1838
1839        // Advance time - should get no events
1840        let events = test_clock.advance_time(UnixNanos::from(5000), true);
1841        assert_eq!(events.len(), 0);
1842    }
1843
1844    #[rstest]
1845    fn test_clock_reset_clears_timers(mut test_clock: TestClock) {
1846        test_clock
1847            .set_timer_ns("reset_test", 1000, None, None, None, None, None)
1848            .unwrap();
1849
1850        assert_eq!(test_clock.timer_count(), 1);
1851
1852        // Reset the clock
1853        test_clock.reset();
1854
1855        assert_eq!(test_clock.timer_count(), 0);
1856        assert_eq!(test_clock.timestamp_ns(), UnixNanos::default()); // Time reset to zero
1857    }
1858
1859    #[rstest]
1860    fn test_set_time_alert_default_impl(mut test_clock: TestClock) {
1861        let current_time = test_clock.utc_now();
1862        let alert_time = current_time + chrono::Duration::seconds(1);
1863
1864        // Test the default implementation that delegates to set_time_alert_ns
1865        test_clock
1866            .set_time_alert("alert_test", alert_time, None, None)
1867            .unwrap();
1868
1869        assert_eq!(test_clock.timer_count(), 1);
1870        assert_eq!(test_clock.timer_names(), vec!["alert_test"]);
1871
1872        // Verify the timer is set for the correct time
1873        let expected_ns = UnixNanos::from(alert_time);
1874        let next_time = test_clock.next_time_ns("alert_test").unwrap();
1875
1876        // Should be very close (within a few nanoseconds due to conversion)
1877        let diff = if next_time >= expected_ns {
1878            next_time.as_u64() - expected_ns.as_u64()
1879        } else {
1880            expected_ns.as_u64() - next_time.as_u64()
1881        };
1882        assert!(
1883            diff < 1000,
1884            "Timer should be set within 1 microsecond of expected time"
1885        );
1886    }
1887
1888    #[rstest]
1889    fn test_set_timer_default_impl(mut test_clock: TestClock) {
1890        let current_time = test_clock.utc_now();
1891        let start_time = current_time + chrono::Duration::seconds(1);
1892        let interval = Duration::from_millis(500);
1893
1894        // Test the default implementation that delegates to set_timer_ns
1895        test_clock
1896            .set_timer(
1897                "timer_test",
1898                interval,
1899                Some(start_time),
1900                None,
1901                None,
1902                None,
1903                None,
1904            )
1905            .unwrap();
1906
1907        assert_eq!(test_clock.timer_count(), 1);
1908        assert_eq!(test_clock.timer_names(), vec!["timer_test"]);
1909
1910        // Advance time and verify timer fires at correct intervals
1911        let start_ns = UnixNanos::from(start_time);
1912        let interval_ns = interval.as_nanos() as u64;
1913
1914        let events = test_clock.advance_time(start_ns + interval_ns * 3, true);
1915        assert_eq!(events.len(), 3); // Should fire 3 times
1916
1917        // Verify timing
1918        assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1919        assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1920        assert_eq!(*events[2].ts_event, *start_ns + interval_ns * 3);
1921    }
1922
1923    #[rstest]
1924    fn test_set_timer_with_stop_time_default_impl(mut test_clock: TestClock) {
1925        let current_time = test_clock.utc_now();
1926        let start_time = current_time + chrono::Duration::seconds(1);
1927        let stop_time = current_time + chrono::Duration::seconds(3);
1928        let interval = Duration::from_secs(1);
1929
1930        // Test with stop time
1931        test_clock
1932            .set_timer(
1933                "timer_with_stop",
1934                interval,
1935                Some(start_time),
1936                Some(stop_time),
1937                None,
1938                None,
1939                None,
1940            )
1941            .unwrap();
1942
1943        assert_eq!(test_clock.timer_count(), 1);
1944
1945        // Advance beyond stop time
1946        let stop_ns = UnixNanos::from(stop_time);
1947        let events = test_clock.advance_time(stop_ns + 1000, true);
1948
1949        // Should fire twice: at start_time + 1s and start_time + 2s, but not at start_time + 3s since that would be at stop_time
1950        assert_eq!(events.len(), 2);
1951
1952        let start_ns = UnixNanos::from(start_time);
1953        let interval_ns = interval.as_nanos() as u64;
1954        assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1955        assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1956    }
1957
1958    #[rstest]
1959    fn test_set_timer_fire_immediately_default_impl(mut test_clock: TestClock) {
1960        let current_time = test_clock.utc_now();
1961        let start_time = current_time + chrono::Duration::seconds(1);
1962        let interval = Duration::from_millis(500);
1963
1964        // Test with fire_immediately=true
1965        test_clock
1966            .set_timer(
1967                "immediate_timer",
1968                interval,
1969                Some(start_time),
1970                None,
1971                None,
1972                None,
1973                Some(true),
1974            )
1975            .unwrap();
1976
1977        let start_ns = UnixNanos::from(start_time);
1978        let interval_ns = interval.as_nanos() as u64;
1979
1980        // Advance to start time + 1 interval
1981        let events = test_clock.advance_time(start_ns + interval_ns, true);
1982
1983        // Should fire immediately at start_time, then again at start_time + interval
1984        assert_eq!(events.len(), 2);
1985        assert_eq!(*events[0].ts_event, *start_ns); // Immediate fire
1986        assert_eq!(*events[1].ts_event, *start_ns + interval_ns); // Regular interval
1987    }
1988
1989    #[rstest]
1990    fn test_set_time_alert_when_alert_time_equals_current_time(mut test_clock: TestClock) {
1991        let current_time = test_clock.timestamp_ns();
1992
1993        // Set time alert for exactly the current time
1994        test_clock
1995            .set_time_alert_ns("alert_at_current_time", current_time, None, None)
1996            .unwrap();
1997
1998        assert_eq!(test_clock.timer_count(), 1);
1999
2000        // Advance time by exactly 0 (to current time) - should fire immediately
2001        let events = test_clock.advance_time(current_time, true);
2002
2003        // Should fire immediately since alert_time_ns == ts_now
2004        assert_eq!(events.len(), 1);
2005        assert_eq!(events[0].name.as_str(), "alert_at_current_time");
2006        assert_eq!(*events[0].ts_event, *current_time);
2007    }
2008}