nautilus_common/
clock.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Real-time and static `Clock` implementations.
17
18use std::{
19    collections::{BTreeMap, BinaryHeap, HashMap},
20    fmt::Debug,
21    ops::Deref,
22    pin::Pin,
23    sync::Arc,
24    task::{Context, Poll},
25    time::Duration,
26};
27
28use chrono::{DateTime, Utc};
29use futures::Stream;
30use nautilus_core::{
31    AtomicTime, UnixNanos,
32    correctness::{check_positive_u64, check_predicate_true, check_valid_string_ascii},
33    time::get_atomic_clock_realtime,
34};
35use thousands::Separable;
36use tokio::sync::Mutex;
37use ustr::Ustr;
38
39use crate::{
40    runner::{TimeEventSender, get_time_event_sender},
41    timer::{
42        LiveTimer, ScheduledTimeEvent, TestTimer, TimeEvent, TimeEventCallback, TimeEventHandlerV2,
43        create_valid_interval,
44    },
45};
46
47/// Represents a type of clock.
48///
49/// # Notes
50///
51/// An active timer is one which has not expired (`timer.is_expired == False`).
52pub trait Clock: Debug {
53    /// Returns the current date and time as a timezone-aware `DateTime<UTC>`.
54    fn utc_now(&self) -> DateTime<Utc> {
55        DateTime::from_timestamp_nanos(self.timestamp_ns().as_i64())
56    }
57
58    /// Returns the current UNIX timestamp in nanoseconds (ns).
59    fn timestamp_ns(&self) -> UnixNanos;
60
61    /// Returns the current UNIX timestamp in microseconds (μs).
62    fn timestamp_us(&self) -> u64;
63
64    /// Returns the current UNIX timestamp in milliseconds (ms).
65    fn timestamp_ms(&self) -> u64;
66
67    /// Returns the current UNIX timestamp in seconds.
68    fn timestamp(&self) -> f64;
69
70    /// Returns the names of active timers in the clock.
71    fn timer_names(&self) -> Vec<&str>;
72
73    /// Returns the count of active timers in the clock.
74    fn timer_count(&self) -> usize;
75
76    /// If a timer with the `name` exists.
77    fn timer_exists(&self, name: &Ustr) -> bool;
78
79    /// Register a default event handler for the clock. If a timer
80    /// does not have an event handler, then this handler is used.
81    fn register_default_handler(&mut self, callback: TimeEventCallback);
82
83    /// Get handler for [`TimeEvent`].
84    ///
85    /// Note: Panics if the event does not have an associated handler
86    fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2;
87
88    /// Set a timer to alert at the specified time.
89    ///
90    /// See [`Clock::set_time_alert_ns`] for flag semantics.
91    ///
92    /// # Callback
93    ///
94    /// - `callback`: Some, then callback handles the time event.
95    /// - `callback`: None, then the clock’s default time event callback is used.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if `name` is invalid, `alert_time` is in the past when not allowed,
100    /// or any predicate check fails.
101    #[allow(clippy::too_many_arguments)]
102    fn set_time_alert(
103        &mut self,
104        name: &str,
105        alert_time: DateTime<Utc>,
106        callback: Option<TimeEventCallback>,
107        allow_past: Option<bool>,
108    ) -> anyhow::Result<()> {
109        self.set_time_alert_ns(name, alert_time.into(), callback, allow_past)
110    }
111
112    /// Set a timer to alert at the specified time.
113    ///
114    /// Any existing timer registered under the same `name` is cancelled with a warning before the new alert is scheduled.
115    ///
116    /// # Flags
117    ///
118    /// | `allow_past` | Behavior                                                                                |
119    /// |--------------|-----------------------------------------------------------------------------------------|
120    /// | `true`       | If alert time is **in the past**, the alert fires immediately; otherwise at alert time. |
121    /// | `false`      | Returns an error if alert time is earlier than now.                                     |
122    ///
123    /// # Callback
124    ///
125    /// - `callback`: Some, then callback handles the time event.
126    /// - `callback`: None, then the clock’s default time event callback is used.
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if `name` is invalid, `alert_time_ns` is earlier than now when not allowed,
131    /// or any predicate check fails.
132    #[allow(clippy::too_many_arguments)]
133    fn set_time_alert_ns(
134        &mut self,
135        name: &str,
136        alert_time_ns: UnixNanos,
137        callback: Option<TimeEventCallback>,
138        allow_past: Option<bool>,
139    ) -> anyhow::Result<()>;
140
141    /// Set a timer to fire time events at every interval between start and stop time.
142    ///
143    /// Any existing timer registered under the same `name` is cancelled with a warning before the new timer is scheduled.
144    ///
145    /// See [`Clock::set_timer_ns`] for flag semantics.
146    ///
147    /// # Callback
148    ///
149    /// - `callback`: Some, then callback handles the time event.
150    /// - `callback`: None, then the clock’s default time event callback is used.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if `name` is invalid, `interval` is not positive,
155    /// or if any predicate check fails.
156    #[allow(clippy::too_many_arguments)]
157    fn set_timer(
158        &mut self,
159        name: &str,
160        interval: Duration,
161        start_time: Option<DateTime<Utc>>,
162        stop_time: Option<DateTime<Utc>>,
163        callback: Option<TimeEventCallback>,
164        allow_past: Option<bool>,
165        fire_immediately: Option<bool>,
166    ) -> anyhow::Result<()> {
167        self.set_timer_ns(
168            name,
169            interval.as_nanos() as u64,
170            start_time.map(UnixNanos::from),
171            stop_time.map(UnixNanos::from),
172            callback,
173            allow_past,
174            fire_immediately,
175        )
176    }
177
178    /// Set a timer to fire time events at every interval between start and stop time.
179    ///
180    /// Any existing timer registered under the same `name` is cancelled before the new timer is scheduled.
181    ///
182    /// # Start Time
183    ///
184    /// - `None` or `Some(0)`: Uses the current time as start time.
185    /// - `Some(non_zero)`: Uses the specified timestamp as start time.
186    ///
187    /// # Flags
188    ///
189    /// | `allow_past` | `fire_immediately` | Behavior                                                                              |
190    /// |--------------|--------------------|---------------------------------------------------------------------------------------|
191    /// | `true`       | `true`             | First event fires immediately at start time, even if start time is in the past.       |
192    /// | `true`       | `false`            | First event fires at start time + interval, even if start time is in the past.        |
193    /// | `false`      | `true`             | Returns error if start time is in the past (first event would be immediate but past). |
194    /// | `false`      | `false`            | Returns error if start time + interval is in the past.                                |
195    ///
196    /// # Callback
197    ///
198    /// - `callback`: Some, then callback handles the time event.
199    /// - `callback`: None, then the clock's default time event callback is used.
200    ///
201    /// # Errors
202    ///
203    /// Returns an error if `name` is invalid, `interval_ns` is not positive,
204    /// or if any predicate check fails.
205    #[allow(clippy::too_many_arguments)]
206    fn set_timer_ns(
207        &mut self,
208        name: &str,
209        interval_ns: u64,
210        start_time_ns: Option<UnixNanos>,
211        stop_time_ns: Option<UnixNanos>,
212        callback: Option<TimeEventCallback>,
213        allow_past: Option<bool>,
214        fire_immediately: Option<bool>,
215    ) -> anyhow::Result<()>;
216
217    /// Returns the time interval in which the timer `name` is triggered.
218    ///
219    /// If the timer doesn't exist `None` is returned.
220    fn next_time_ns(&self, name: &str) -> Option<UnixNanos>;
221
222    /// Cancels the timer with `name`.
223    fn cancel_timer(&mut self, name: &str);
224
225    /// Cancels all timers.
226    fn cancel_timers(&mut self);
227
228    /// Resets the clock by clearing it's internal state.
229    fn reset(&mut self);
230}
231
232/// A static test clock.
233///
234/// Stores the current timestamp internally which can be advanced.
235///
236/// # Threading
237///
238/// This clock is thread-affine; use it only from the thread that created it.
239#[derive(Debug)]
240pub struct TestClock {
241    time: AtomicTime,
242    // Use btree map to ensure stable ordering when scanning for timers in `advance_time`
243    timers: BTreeMap<Ustr, TestTimer>,
244    default_callback: Option<TimeEventCallback>,
245    callbacks: HashMap<Ustr, TimeEventCallback>,
246    heap: BinaryHeap<ScheduledTimeEvent>, // TODO: Deprecated - move to global time event heap
247}
248
249impl TestClock {
250    /// Creates a new [`TestClock`] instance.
251    #[must_use]
252    pub fn new() -> Self {
253        Self {
254            time: AtomicTime::new(false, UnixNanos::default()),
255            timers: BTreeMap::new(),
256            default_callback: None,
257            callbacks: HashMap::new(),
258            heap: BinaryHeap::new(),
259        }
260    }
261
262    /// Returns a reference to the internal timers for the clock.
263    #[must_use]
264    pub const fn get_timers(&self) -> &BTreeMap<Ustr, TestTimer> {
265        &self.timers
266    }
267
268    /// Advances the internal clock to the specified `to_time_ns` and optionally sets the clock to that time.
269    ///
270    /// This function ensures that the clock behaves in a non-decreasing manner. If `set_time` is `true`,
271    /// the internal clock will be updated to the value of `to_time_ns`. Otherwise, the clock will advance
272    /// without explicitly setting the time.
273    ///
274    /// The method processes active timers, advancing them to `to_time_ns`, and collects any `TimeEvent`
275    /// objects that are triggered as a result. Only timers that are not expired are processed.
276    ///
277    /// # Warnings
278    ///
279    /// Logs a warning if >= 1,000,000 time events are allocated during advancement.
280    ///
281    /// # Panics
282    ///
283    /// Panics if `to_time_ns` is less than the current internal clock time.
284    pub fn advance_time(&mut self, to_time_ns: UnixNanos, set_time: bool) -> Vec<TimeEvent> {
285        const WARN_TIME_EVENTS_THRESHOLD: usize = 1_000_000;
286
287        let from_time_ns = self.time.get_time_ns();
288
289        // Time should be non-decreasing
290        assert!(
291            to_time_ns >= from_time_ns,
292            "`to_time_ns` {to_time_ns} was < `from_time_ns` {}",
293            from_time_ns
294        );
295
296        if set_time {
297            self.time.set_time(to_time_ns);
298        }
299
300        // Iterate and advance timers and collect events, only retain alive timers
301        let mut events: Vec<TimeEvent> = Vec::new();
302        self.timers.retain(|_, timer| {
303            timer.advance(to_time_ns).for_each(|event| {
304                events.push(event);
305            });
306
307            !timer.is_expired()
308        });
309
310        if events.len() >= WARN_TIME_EVENTS_THRESHOLD {
311            log::warn!(
312                "Allocated {} time events during clock advancement from {} to {}, \
313                 consider stopping the timer between large time ranges with no data points",
314                events.len().separate_with_commas(),
315                from_time_ns,
316                to_time_ns
317            );
318        }
319
320        events.sort_by(|a, b| a.ts_event.cmp(&b.ts_event));
321        events
322    }
323
324    /// Advances the internal clock to the specified `to_time_ns` and optionally sets the clock to that time.
325    ///
326    /// Pushes the [`TimeEvent`]s on the heap to ensure ordering
327    ///
328    /// Note: `set_time` is not used but present to keep backward compatible api call
329    ///
330    /// # Warnings
331    ///
332    /// Logs a warning when the internal heap already exceeds 100,000 scheduled events before pushing new ones.
333    ///
334    /// # Panics
335    ///
336    /// Panics if `to_time_ns` is less than the current internal clock time.
337    pub fn advance_to_time_on_heap(&mut self, to_time_ns: UnixNanos) {
338        const WARN_HEAP_SIZE_THRESHOLD: usize = 100_000;
339
340        let from_time_ns = self.time.get_time_ns();
341
342        // Time should be non-decreasing
343        assert!(
344            to_time_ns >= from_time_ns,
345            "`to_time_ns` {to_time_ns} was < `from_time_ns` {}",
346            from_time_ns
347        );
348
349        self.time.set_time(to_time_ns);
350
351        if self.heap.len() > WARN_HEAP_SIZE_THRESHOLD {
352            log::warn!(
353                "TestClock heap size {} exceeds recommended limit",
354                self.heap.len()
355            );
356        }
357
358        // Iterate and advance timers and push events to heap. Only retain alive timers.
359        self.timers.retain(|_, timer| {
360            timer.advance(to_time_ns).for_each(|event| {
361                self.heap.push(ScheduledTimeEvent::new(event));
362            });
363
364            !timer.is_expired()
365        });
366    }
367
368    /// Matches `TimeEvent` objects with their corresponding event handlers.
369    ///
370    /// This function takes an `events` vector of `TimeEvent` objects, assumes they are already sorted
371    /// by their `ts_event`, and matches them with the appropriate callback handler from the internal
372    /// registry of callbacks. If no specific callback is found for an event, the default callback is used.
373    ///
374    /// # Panics
375    ///
376    /// Panics if the default callback is not set for the clock when matching handlers.
377    #[must_use]
378    pub fn match_handlers(&self, events: Vec<TimeEvent>) -> Vec<TimeEventHandlerV2> {
379        events
380            .into_iter()
381            .map(|event| {
382                let callback = self.callbacks.get(&event.name).cloned().unwrap_or_else(|| {
383                    // If callback_py is None, use the default_callback_py
384                    // TODO: clone for now
385                    self.default_callback
386                        .clone()
387                        .expect("Default callback should exist")
388                });
389                TimeEventHandlerV2::new(event, callback)
390            })
391            .collect()
392    }
393}
394
395impl Iterator for TestClock {
396    type Item = TimeEventHandlerV2;
397
398    fn next(&mut self) -> Option<Self::Item> {
399        self.heap
400            .pop()
401            .map(|event| self.get_handler(event.into_inner()))
402    }
403}
404
405impl Default for TestClock {
406    /// Creates a new default [`TestClock`] instance.
407    fn default() -> Self {
408        Self::new()
409    }
410}
411
412impl Deref for TestClock {
413    type Target = AtomicTime;
414
415    fn deref(&self) -> &Self::Target {
416        &self.time
417    }
418}
419
420impl Clock for TestClock {
421    fn timestamp_ns(&self) -> UnixNanos {
422        self.time.get_time_ns()
423    }
424
425    fn timestamp_us(&self) -> u64 {
426        self.time.get_time_us()
427    }
428
429    fn timestamp_ms(&self) -> u64 {
430        self.time.get_time_ms()
431    }
432
433    fn timestamp(&self) -> f64 {
434        self.time.get_time()
435    }
436
437    fn timer_names(&self) -> Vec<&str> {
438        self.timers
439            .iter()
440            .filter(|(_, timer)| !timer.is_expired())
441            .map(|(k, _)| k.as_str())
442            .collect()
443    }
444
445    fn timer_count(&self) -> usize {
446        self.timers
447            .iter()
448            .filter(|(_, timer)| !timer.is_expired())
449            .count()
450    }
451
452    fn timer_exists(&self, name: &Ustr) -> bool {
453        self.timers.contains_key(name)
454    }
455
456    fn register_default_handler(&mut self, callback: TimeEventCallback) {
457        self.default_callback = Some(callback);
458    }
459
460    /// Returns the handler for the given `TimeEvent`.
461    ///
462    /// # Panics
463    ///
464    /// Panics if no event-specific or default callback has been registered for the event.
465    fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
466        // Get the callback from either the event-specific callbacks or default callback
467        let callback = self
468            .callbacks
469            .get(&event.name)
470            .cloned()
471            .or_else(|| self.default_callback.clone())
472            .unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
473
474        TimeEventHandlerV2::new(event, callback)
475    }
476
477    fn set_time_alert_ns(
478        &mut self,
479        name: &str,
480        mut alert_time_ns: UnixNanos, // mut allows adjustment based on allow_past
481        callback: Option<TimeEventCallback>,
482        allow_past: Option<bool>,
483    ) -> anyhow::Result<()> {
484        check_valid_string_ascii(name, stringify!(name))?;
485
486        let name = Ustr::from(name);
487        let allow_past = allow_past.unwrap_or(true);
488
489        if self.timer_exists(&name) {
490            self.cancel_timer(name.as_str());
491            log::warn!("Timer '{name}' replaced");
492        }
493
494        check_predicate_true(
495            callback.is_some()
496                | self.callbacks.contains_key(&name)
497                | self.default_callback.is_some(),
498            "No callbacks provided",
499        )?;
500
501        match callback {
502            Some(callback_py) => self.callbacks.insert(name, callback_py),
503            None => None,
504        };
505
506        let ts_now = self.get_time_ns();
507
508        if alert_time_ns < ts_now {
509            if allow_past {
510                alert_time_ns = ts_now;
511                log::warn!(
512                    "Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
513                    alert_time_ns.to_rfc3339(),
514                );
515            } else {
516                anyhow::bail!(
517                    "Timer '{name}' alert time {} was in the past (current time is {})",
518                    alert_time_ns.to_rfc3339(),
519                    ts_now.to_rfc3339(),
520                );
521            }
522        }
523
524        // Safe to calculate interval now that we've ensured alert_time_ns >= ts_now
525        let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
526        // When alert time equals current time, fire immediately
527        let fire_immediately = alert_time_ns == ts_now;
528
529        let timer = TestTimer::new(
530            name,
531            interval_ns,
532            ts_now,
533            Some(alert_time_ns),
534            fire_immediately,
535        );
536        self.timers.insert(name, timer);
537
538        Ok(())
539    }
540
541    fn set_timer_ns(
542        &mut self,
543        name: &str,
544        interval_ns: u64,
545        start_time_ns: Option<UnixNanos>,
546        stop_time_ns: Option<UnixNanos>,
547        callback: Option<TimeEventCallback>,
548        allow_past: Option<bool>,
549        fire_immediately: Option<bool>,
550    ) -> anyhow::Result<()> {
551        check_valid_string_ascii(name, stringify!(name))?;
552        check_positive_u64(interval_ns, stringify!(interval_ns))?;
553        check_predicate_true(
554            callback.is_some() | self.default_callback.is_some(),
555            "No callbacks provided",
556        )?;
557
558        let name = Ustr::from(name);
559        let allow_past = allow_past.unwrap_or(true);
560        let fire_immediately = fire_immediately.unwrap_or(false);
561
562        if self.timer_exists(&name) {
563            self.cancel_timer(name.as_str());
564            log::warn!("Timer '{name}' replaced");
565        }
566
567        match callback {
568            Some(callback_py) => self.callbacks.insert(name, callback_py),
569            None => None,
570        };
571
572        let mut start_time_ns = start_time_ns.unwrap_or_default();
573        let ts_now = self.get_time_ns();
574
575        if start_time_ns == 0 {
576            // Zero start time indicates no explicit start; we use the current time
577            start_time_ns = self.timestamp_ns();
578        } else if !allow_past {
579            // Calculate the next event time based on fire_immediately flag
580            let next_event_time = if fire_immediately {
581                start_time_ns
582            } else {
583                start_time_ns + interval_ns
584            };
585
586            // Check if the next event would be in the past
587            if next_event_time < ts_now {
588                anyhow::bail!(
589                    "Timer '{name}' next event time {} would be in the past (current time is {})",
590                    next_event_time.to_rfc3339(),
591                    ts_now.to_rfc3339(),
592                );
593            }
594        }
595
596        if let Some(stop_time) = stop_time_ns {
597            if stop_time <= start_time_ns {
598                anyhow::bail!(
599                    "Timer '{name}' stop time {} must be after start time {}",
600                    stop_time.to_rfc3339(),
601                    start_time_ns.to_rfc3339(),
602                );
603            }
604            if !allow_past && stop_time <= ts_now {
605                anyhow::bail!(
606                    "Timer '{name}' stop time {} is in the past (current time is {})",
607                    stop_time.to_rfc3339(),
608                    ts_now.to_rfc3339(),
609                );
610            }
611        }
612
613        let interval_ns = create_valid_interval(interval_ns);
614
615        let timer = TestTimer::new(
616            name,
617            interval_ns,
618            start_time_ns,
619            stop_time_ns,
620            fire_immediately,
621        );
622        self.timers.insert(name, timer);
623
624        Ok(())
625    }
626
627    fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
628        self.timers
629            .get(&Ustr::from(name))
630            .map(|timer| timer.next_time_ns())
631    }
632
633    fn cancel_timer(&mut self, name: &str) {
634        let timer = self.timers.remove(&Ustr::from(name));
635        if let Some(mut timer) = timer {
636            timer.cancel();
637        }
638    }
639
640    fn cancel_timers(&mut self) {
641        for timer in &mut self.timers.values_mut() {
642            timer.cancel();
643        }
644
645        self.timers.clear();
646    }
647
648    fn reset(&mut self) {
649        self.time = AtomicTime::new(false, UnixNanos::default());
650        self.timers = BTreeMap::new();
651        self.heap = BinaryHeap::new();
652        self.callbacks = HashMap::new();
653    }
654}
655
656/// A real-time clock which uses system time.
657///
658/// Timestamps are guaranteed to be unique and monotonically increasing.
659///
660/// # Threading
661///
662/// The clock holds thread-local runtime state and must remain on its originating thread.
663#[derive(Debug)]
664pub struct LiveClock {
665    time: &'static AtomicTime,
666    timers: HashMap<Ustr, LiveTimer>,
667    default_callback: Option<TimeEventCallback>,
668    callbacks: HashMap<Ustr, TimeEventCallback>,
669    sender: Option<Arc<dyn TimeEventSender>>,
670}
671
672impl LiveClock {
673    /// Creates a new [`LiveClock`] instance.
674    #[must_use]
675    pub fn new(sender: Option<Arc<dyn TimeEventSender>>) -> Self {
676        Self {
677            time: get_atomic_clock_realtime(),
678            timers: HashMap::new(),
679            default_callback: None,
680            callbacks: HashMap::new(),
681            sender,
682        }
683    }
684
685    #[must_use]
686    pub const fn get_timers(&self) -> &HashMap<Ustr, LiveTimer> {
687        &self.timers
688    }
689
690    // Clean up expired timers. Retain only live ones
691    fn clear_expired_timers(&mut self) {
692        self.timers.retain(|_, timer| !timer.is_expired());
693    }
694}
695
696impl Default for LiveClock {
697    /// Creates a new default [`LiveClock`] instance.
698    fn default() -> Self {
699        Self::new(Some(get_time_event_sender()))
700    }
701}
702
703impl Deref for LiveClock {
704    type Target = AtomicTime;
705
706    fn deref(&self) -> &Self::Target {
707        self.time
708    }
709}
710
711impl Clock for LiveClock {
712    fn timestamp_ns(&self) -> UnixNanos {
713        self.time.get_time_ns()
714    }
715
716    fn timestamp_us(&self) -> u64 {
717        self.time.get_time_us()
718    }
719
720    fn timestamp_ms(&self) -> u64 {
721        self.time.get_time_ms()
722    }
723
724    fn timestamp(&self) -> f64 {
725        self.time.get_time()
726    }
727
728    fn timer_names(&self) -> Vec<&str> {
729        self.timers
730            .iter()
731            .filter(|(_, timer)| !timer.is_expired())
732            .map(|(k, _)| k.as_str())
733            .collect()
734    }
735
736    fn timer_count(&self) -> usize {
737        self.timers
738            .iter()
739            .filter(|(_, timer)| !timer.is_expired())
740            .count()
741    }
742
743    fn timer_exists(&self, name: &Ustr) -> bool {
744        self.timers.contains_key(name)
745    }
746
747    fn register_default_handler(&mut self, handler: TimeEventCallback) {
748        self.default_callback = Some(handler);
749    }
750
751    /// # Panics
752    ///
753    /// This function panics if:
754    /// - The event does not have an associated handler (see trait documentation).
755    #[allow(unused_variables)]
756    fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
757        // Get the callback from either the event-specific callbacks or default callback
758        let callback = self
759            .callbacks
760            .get(&event.name)
761            .cloned()
762            .or_else(|| self.default_callback.clone())
763            .unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
764
765        TimeEventHandlerV2::new(event, callback)
766    }
767
768    fn set_time_alert_ns(
769        &mut self,
770        name: &str,
771        mut alert_time_ns: UnixNanos, // mut allows adjustment based on allow_past
772        callback: Option<TimeEventCallback>,
773        allow_past: Option<bool>,
774    ) -> anyhow::Result<()> {
775        check_valid_string_ascii(name, stringify!(name))?;
776
777        let name = Ustr::from(name);
778        let allow_past = allow_past.unwrap_or(true);
779
780        if self.timer_exists(&name) {
781            self.cancel_timer(name.as_str());
782            log::warn!("Timer '{name}' replaced");
783        }
784
785        check_predicate_true(
786            callback.is_some()
787                | self.callbacks.contains_key(&name)
788                | self.default_callback.is_some(),
789            "No callbacks provided",
790        )?;
791
792        let callback = if let Some(callback) = callback {
793            self.callbacks.insert(name, callback.clone());
794            callback
795        } else if let Some(existing) = self.callbacks.get(&name) {
796            existing.clone()
797        } else {
798            let default = self
799                .default_callback
800                .clone()
801                .expect("Default callback should exist");
802            self.callbacks.insert(name, default.clone());
803            default
804        };
805
806        let ts_now = self.get_time_ns();
807
808        // Handle past timestamps based on flag
809        if alert_time_ns < ts_now {
810            if allow_past {
811                alert_time_ns = ts_now;
812                log::warn!(
813                    "Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
814                    alert_time_ns.to_rfc3339(),
815                );
816            } else {
817                anyhow::bail!(
818                    "Timer '{name}' alert time {} was in the past (current time is {})",
819                    alert_time_ns.to_rfc3339(),
820                    ts_now.to_rfc3339(),
821                );
822            }
823        }
824
825        // Safe to calculate interval now that we've ensured alert_time_ns >= ts_now
826        let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
827
828        let mut timer = LiveTimer::new(
829            name,
830            interval_ns,
831            ts_now,
832            Some(alert_time_ns),
833            callback,
834            false,
835            self.sender.clone(),
836        );
837
838        timer.start();
839
840        self.clear_expired_timers();
841        self.timers.insert(name, timer);
842
843        Ok(())
844    }
845
846    fn set_timer_ns(
847        &mut self,
848        name: &str,
849        interval_ns: u64,
850        start_time_ns: Option<UnixNanos>,
851        stop_time_ns: Option<UnixNanos>,
852        callback: Option<TimeEventCallback>,
853        allow_past: Option<bool>,
854        fire_immediately: Option<bool>,
855    ) -> anyhow::Result<()> {
856        check_valid_string_ascii(name, stringify!(name))?;
857        check_positive_u64(interval_ns, stringify!(interval_ns))?;
858        check_predicate_true(
859            callback.is_some() | self.default_callback.is_some(),
860            "No callbacks provided",
861        )?;
862
863        let name = Ustr::from(name);
864        let allow_past = allow_past.unwrap_or(true);
865        let fire_immediately = fire_immediately.unwrap_or(false);
866
867        if self.timer_exists(&name) {
868            self.cancel_timer(name.as_str());
869            log::warn!("Timer '{name}' replaced");
870        }
871
872        let callback = match callback {
873            Some(callback) => callback,
874            None => self.default_callback.clone().unwrap(),
875        };
876
877        self.callbacks.insert(name, callback.clone());
878
879        let mut start_time_ns = start_time_ns.unwrap_or_default();
880        let ts_now = self.get_time_ns();
881
882        if start_time_ns == 0 {
883            // Zero start time indicates no explicit start; we use the current time
884            start_time_ns = self.timestamp_ns();
885        } else if start_time_ns < ts_now && !allow_past {
886            anyhow::bail!(
887                "Timer '{name}' start time {} was in the past (current time is {})",
888                start_time_ns.to_rfc3339(),
889                ts_now.to_rfc3339(),
890            );
891        }
892
893        if let Some(stop_time) = stop_time_ns {
894            if stop_time <= start_time_ns {
895                anyhow::bail!(
896                    "Timer '{name}' stop time {} must be after start time {}",
897                    stop_time.to_rfc3339(),
898                    start_time_ns.to_rfc3339(),
899                );
900            }
901            if !allow_past && stop_time <= ts_now {
902                anyhow::bail!(
903                    "Timer '{name}' stop time {} is in the past (current time is {})",
904                    stop_time.to_rfc3339(),
905                    ts_now.to_rfc3339(),
906                );
907            }
908        }
909
910        let interval_ns = create_valid_interval(interval_ns);
911
912        let mut timer = LiveTimer::new(
913            name,
914            interval_ns,
915            start_time_ns,
916            stop_time_ns,
917            callback,
918            fire_immediately,
919            self.sender.clone(),
920        );
921        timer.start();
922
923        self.clear_expired_timers();
924        self.timers.insert(name, timer);
925
926        Ok(())
927    }
928
929    fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
930        self.timers
931            .get(&Ustr::from(name))
932            .map(|timer| timer.next_time_ns())
933    }
934
935    fn cancel_timer(&mut self, name: &str) {
936        let timer = self.timers.remove(&Ustr::from(name));
937        if let Some(mut timer) = timer {
938            timer.cancel();
939        }
940    }
941
942    fn cancel_timers(&mut self) {
943        for timer in &mut self.timers.values_mut() {
944            timer.cancel();
945        }
946
947        self.timers.clear();
948    }
949
950    fn reset(&mut self) {
951        self.cancel_timers();
952        self.callbacks.clear();
953    }
954}
955
956// Helper struct to stream events from the heap
957#[derive(Debug)]
958pub struct TimeEventStream {
959    heap: Arc<Mutex<BinaryHeap<ScheduledTimeEvent>>>,
960}
961
962impl TimeEventStream {
963    pub const fn new(heap: Arc<Mutex<BinaryHeap<ScheduledTimeEvent>>>) -> Self {
964        Self { heap }
965    }
966}
967
968impl Stream for TimeEventStream {
969    type Item = TimeEvent;
970
971    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
972        let mut heap = match self.heap.try_lock() {
973            Ok(guard) => guard,
974            Err(e) => {
975                tracing::error!("Unable to get LiveClock heap lock: {e}");
976                cx.waker().wake_by_ref();
977                return Poll::Pending;
978            }
979        };
980
981        if let Some(event) = heap.pop() {
982            Poll::Ready(Some(event.into_inner()))
983        } else {
984            cx.waker().wake_by_ref();
985            Poll::Pending
986        }
987    }
988}
989
990////////////////////////////////////////////////////////////////////////////////
991// Tests
992////////////////////////////////////////////////////////////////////////////////
993
994#[cfg(test)]
995mod tests {
996    use std::{
997        sync::{Arc, Mutex},
998        time::Duration,
999    };
1000
1001    use nautilus_core::time::get_atomic_clock_realtime;
1002    use rstest::{fixture, rstest};
1003    use ustr::Ustr;
1004
1005    use super::*;
1006    use crate::{runner::TimeEventSender, testing::wait_until};
1007
1008    #[derive(Debug, Default)]
1009    struct TestCallback {
1010        /// Shared flag updated from within the timer callback; Mutex keeps the closure `Send` for tests.
1011        called: Arc<Mutex<bool>>,
1012    }
1013
1014    impl TestCallback {
1015        fn new(called: Arc<Mutex<bool>>) -> Self {
1016            Self { called }
1017        }
1018    }
1019
1020    impl From<TestCallback> for TimeEventCallback {
1021        fn from(callback: TestCallback) -> Self {
1022            TimeEventCallback::from(move |_event: TimeEvent| {
1023                if let Ok(mut called) = callback.called.lock() {
1024                    *called = true;
1025                }
1026            })
1027        }
1028    }
1029
1030    #[derive(Debug)]
1031    struct CollectingSender {
1032        events: Arc<Mutex<Vec<(TimeEvent, UnixNanos)>>>,
1033    }
1034
1035    impl CollectingSender {
1036        fn new(events: Arc<Mutex<Vec<(TimeEvent, UnixNanos)>>>) -> Self {
1037            Self { events }
1038        }
1039    }
1040
1041    impl TimeEventSender for CollectingSender {
1042        fn send(&self, handler: TimeEventHandlerV2) {
1043            let TimeEventHandlerV2 { event, callback } = handler;
1044            let now_ns = get_atomic_clock_realtime().get_time_ns();
1045            let event_clone = event.clone();
1046            callback.call(event);
1047            self.events.lock().unwrap().push((event_clone, now_ns));
1048        }
1049    }
1050
1051    fn wait_for_events(
1052        events: &Arc<Mutex<Vec<(TimeEvent, UnixNanos)>>>,
1053        target: usize,
1054        timeout: Duration,
1055    ) {
1056        wait_until(|| events.lock().unwrap().len() >= target, timeout);
1057    }
1058
1059    #[fixture]
1060    pub fn test_clock() -> TestClock {
1061        let mut clock = TestClock::new();
1062        clock.register_default_handler(TestCallback::default().into());
1063        clock
1064    }
1065
1066    #[rstest]
1067    fn test_time_monotonicity(mut test_clock: TestClock) {
1068        let initial_time = test_clock.timestamp_ns();
1069        test_clock.advance_time(UnixNanos::from(*initial_time + 1000), true);
1070        assert!(test_clock.timestamp_ns() > initial_time);
1071    }
1072
1073    #[rstest]
1074    fn test_timer_registration(mut test_clock: TestClock) {
1075        test_clock
1076            .set_time_alert_ns(
1077                "test_timer",
1078                (*test_clock.timestamp_ns() + 1000).into(),
1079                None,
1080                None,
1081            )
1082            .unwrap();
1083        assert_eq!(test_clock.timer_count(), 1);
1084        assert_eq!(test_clock.timer_names(), vec!["test_timer"]);
1085    }
1086
1087    #[rstest]
1088    fn test_timer_expiration(mut test_clock: TestClock) {
1089        let alert_time = (*test_clock.timestamp_ns() + 1000).into();
1090        test_clock
1091            .set_time_alert_ns("test_timer", alert_time, None, None)
1092            .unwrap();
1093        let events = test_clock.advance_time(alert_time, true);
1094        assert_eq!(events.len(), 1);
1095        assert_eq!(events[0].name.as_str(), "test_timer");
1096    }
1097
1098    #[rstest]
1099    fn test_timer_cancellation(mut test_clock: TestClock) {
1100        test_clock
1101            .set_time_alert_ns(
1102                "test_timer",
1103                (*test_clock.timestamp_ns() + 1000).into(),
1104                None,
1105                None,
1106            )
1107            .unwrap();
1108        assert_eq!(test_clock.timer_count(), 1);
1109        test_clock.cancel_timer("test_timer");
1110        assert_eq!(test_clock.timer_count(), 0);
1111    }
1112
1113    #[rstest]
1114    fn test_time_advancement(mut test_clock: TestClock) {
1115        let start_time = test_clock.timestamp_ns();
1116        test_clock
1117            .set_timer_ns("test_timer", 1000, Some(start_time), None, None, None, None)
1118            .unwrap();
1119        let events = test_clock.advance_time(UnixNanos::from(*start_time + 2500), true);
1120        assert_eq!(events.len(), 2);
1121        assert_eq!(*events[0].ts_event, *start_time + 1000);
1122        assert_eq!(*events[1].ts_event, *start_time + 2000);
1123    }
1124
1125    #[rstest]
1126    fn test_default_and_custom_callbacks() {
1127        let mut clock = TestClock::new();
1128        let default_called = Arc::new(Mutex::new(false));
1129        let custom_called = Arc::new(Mutex::new(false));
1130
1131        let default_callback = TestCallback::new(Arc::clone(&default_called));
1132        let custom_callback = TestCallback::new(Arc::clone(&custom_called));
1133
1134        clock.register_default_handler(TimeEventCallback::from(default_callback));
1135        clock
1136            .set_time_alert_ns(
1137                "default_timer",
1138                (*clock.timestamp_ns() + 1000).into(),
1139                None,
1140                None,
1141            )
1142            .unwrap();
1143        clock
1144            .set_time_alert_ns(
1145                "custom_timer",
1146                (*clock.timestamp_ns() + 1000).into(),
1147                Some(TimeEventCallback::from(custom_callback)),
1148                None,
1149            )
1150            .unwrap();
1151
1152        let events = clock.advance_time(UnixNanos::from(*clock.timestamp_ns() + 1000), true);
1153        let handlers = clock.match_handlers(events);
1154
1155        for handler in handlers {
1156            handler.callback.call(handler.event);
1157        }
1158
1159        assert!(*default_called.lock().unwrap());
1160        assert!(*custom_called.lock().unwrap());
1161    }
1162
1163    #[rstest]
1164    fn test_multiple_timers(mut test_clock: TestClock) {
1165        let start_time = test_clock.timestamp_ns();
1166        test_clock
1167            .set_timer_ns("timer1", 1000, Some(start_time), None, None, None, None)
1168            .unwrap();
1169        test_clock
1170            .set_timer_ns("timer2", 2000, Some(start_time), None, None, None, None)
1171            .unwrap();
1172        let events = test_clock.advance_time(UnixNanos::from(*start_time + 2000), true);
1173        assert_eq!(events.len(), 3);
1174        assert_eq!(events[0].name.as_str(), "timer1");
1175        assert_eq!(events[1].name.as_str(), "timer1");
1176        assert_eq!(events[2].name.as_str(), "timer2");
1177    }
1178
1179    #[rstest]
1180    fn test_allow_past_parameter_true(mut test_clock: TestClock) {
1181        test_clock.set_time(UnixNanos::from(2000));
1182        let current_time = test_clock.timestamp_ns();
1183        let past_time = UnixNanos::from(current_time.as_u64() - 1000);
1184
1185        // With allow_past=true (default), should adjust to current time and succeed
1186        test_clock
1187            .set_time_alert_ns("past_timer", past_time, None, Some(true))
1188            .unwrap();
1189
1190        // Verify timer was created with adjusted time
1191        assert_eq!(test_clock.timer_count(), 1);
1192        assert_eq!(test_clock.timer_names(), vec!["past_timer"]);
1193
1194        // Next time should be at or after current time, not in the past
1195        let next_time = test_clock.next_time_ns("past_timer").unwrap();
1196        assert!(next_time >= current_time);
1197    }
1198
1199    #[rstest]
1200    fn test_allow_past_parameter_false(mut test_clock: TestClock) {
1201        test_clock.set_time(UnixNanos::from(2000));
1202        let current_time = test_clock.timestamp_ns();
1203        let past_time = current_time - 1000;
1204
1205        // With allow_past=false, should fail for past times
1206        let result = test_clock.set_time_alert_ns("past_timer", past_time, None, Some(false));
1207
1208        // Verify the operation failed with appropriate error
1209        assert!(result.is_err());
1210        assert!(format!("{}", result.unwrap_err()).contains("was in the past"));
1211
1212        // Verify no timer was created
1213        assert_eq!(test_clock.timer_count(), 0);
1214        assert!(test_clock.timer_names().is_empty());
1215    }
1216
1217    #[rstest]
1218    fn test_invalid_stop_time_validation(mut test_clock: TestClock) {
1219        test_clock.set_time(UnixNanos::from(2000));
1220        let current_time = test_clock.timestamp_ns();
1221        let start_time = current_time + 1000;
1222        let stop_time = current_time + 500; // Stop time before start time
1223
1224        // Should fail because stop_time < start_time
1225        let result = test_clock.set_timer_ns(
1226            "invalid_timer",
1227            100,
1228            Some(start_time),
1229            Some(stop_time),
1230            None,
1231            None,
1232            None,
1233        );
1234
1235        // Verify the operation failed with appropriate error
1236        assert!(result.is_err());
1237        assert!(format!("{}", result.unwrap_err()).contains("must be after start time"));
1238
1239        // Verify no timer was created
1240        assert_eq!(test_clock.timer_count(), 0);
1241    }
1242
1243    #[rstest]
1244    fn test_set_timer_ns_fire_immediately_true(mut test_clock: TestClock) {
1245        let start_time = test_clock.timestamp_ns();
1246        let interval_ns = 1000;
1247
1248        test_clock
1249            .set_timer_ns(
1250                "fire_immediately_timer",
1251                interval_ns,
1252                Some(start_time),
1253                None,
1254                None,
1255                None,
1256                Some(true),
1257            )
1258            .unwrap();
1259
1260        // Advance time to check immediate firing and subsequent intervals
1261        let events = test_clock.advance_time(start_time + 2500, true);
1262
1263        // Should fire immediately at start_time (0), then at start_time+1000, then at start_time+2000
1264        assert_eq!(events.len(), 3);
1265        assert_eq!(*events[0].ts_event, *start_time); // Fires immediately
1266        assert_eq!(*events[1].ts_event, *start_time + 1000); // Then after interval
1267        assert_eq!(*events[2].ts_event, *start_time + 2000); // Then after second interval
1268    }
1269
1270    #[rstest]
1271    fn test_set_timer_ns_fire_immediately_false(mut test_clock: TestClock) {
1272        let start_time = test_clock.timestamp_ns();
1273        let interval_ns = 1000;
1274
1275        test_clock
1276            .set_timer_ns(
1277                "normal_timer",
1278                interval_ns,
1279                Some(start_time),
1280                None,
1281                None,
1282                None,
1283                Some(false),
1284            )
1285            .unwrap();
1286
1287        // Advance time to check normal behavior
1288        let events = test_clock.advance_time(start_time + 2500, true);
1289
1290        // Should fire after first interval, not immediately
1291        assert_eq!(events.len(), 2);
1292        assert_eq!(*events[0].ts_event, *start_time + 1000); // Fires after first interval
1293        assert_eq!(*events[1].ts_event, *start_time + 2000); // Then after second interval
1294    }
1295
1296    #[rstest]
1297    fn test_set_timer_ns_fire_immediately_default_is_false(mut test_clock: TestClock) {
1298        let start_time = test_clock.timestamp_ns();
1299        let interval_ns = 1000;
1300
1301        // Don't specify fire_immediately (should default to false)
1302        test_clock
1303            .set_timer_ns(
1304                "default_timer",
1305                interval_ns,
1306                Some(start_time),
1307                None,
1308                None,
1309                None,
1310                None,
1311            )
1312            .unwrap();
1313
1314        let events = test_clock.advance_time(start_time + 1500, true);
1315
1316        // Should behave the same as fire_immediately=false
1317        assert_eq!(events.len(), 1);
1318        assert_eq!(*events[0].ts_event, *start_time + 1000); // Fires after first interval
1319    }
1320
1321    #[rstest]
1322    fn test_set_timer_ns_fire_immediately_with_zero_start_time(mut test_clock: TestClock) {
1323        test_clock.set_time(5000.into());
1324        let interval_ns = 1000;
1325
1326        test_clock
1327            .set_timer_ns(
1328                "zero_start_timer",
1329                interval_ns,
1330                None,
1331                None,
1332                None,
1333                None,
1334                Some(true),
1335            )
1336            .unwrap();
1337
1338        let events = test_clock.advance_time(UnixNanos::from(7000), true);
1339
1340        // With zero start time, should use current time as start
1341        // Fire immediately at current time (5000), then at 6000, 7000
1342        assert_eq!(events.len(), 3);
1343        assert_eq!(*events[0].ts_event, 5000); // Immediate fire at current time
1344        assert_eq!(*events[1].ts_event, 6000);
1345        assert_eq!(*events[2].ts_event, 7000);
1346    }
1347
1348    #[rstest]
1349    fn test_multiple_timers_different_fire_immediately_settings(mut test_clock: TestClock) {
1350        let start_time = test_clock.timestamp_ns();
1351        let interval_ns = 1000;
1352
1353        // One timer with fire_immediately=true
1354        test_clock
1355            .set_timer_ns(
1356                "immediate_timer",
1357                interval_ns,
1358                Some(start_time),
1359                None,
1360                None,
1361                None,
1362                Some(true),
1363            )
1364            .unwrap();
1365
1366        // One timer with fire_immediately=false
1367        test_clock
1368            .set_timer_ns(
1369                "normal_timer",
1370                interval_ns,
1371                Some(start_time),
1372                None,
1373                None,
1374                None,
1375                Some(false),
1376            )
1377            .unwrap();
1378
1379        let events = test_clock.advance_time(start_time + 1500, true);
1380
1381        // Should have 3 events total: immediate_timer fires at start & 1000, normal_timer fires at 1000
1382        assert_eq!(events.len(), 3);
1383
1384        // Sort events by timestamp to check order
1385        let mut event_times: Vec<u64> = events.iter().map(|e| e.ts_event.as_u64()).collect();
1386        event_times.sort();
1387
1388        assert_eq!(event_times[0], start_time.as_u64()); // immediate_timer fires immediately
1389        assert_eq!(event_times[1], start_time.as_u64() + 1000); // both timers fire at 1000
1390        assert_eq!(event_times[2], start_time.as_u64() + 1000); // both timers fire at 1000
1391    }
1392
1393    #[rstest]
1394    fn test_timer_name_collision_overwrites(mut test_clock: TestClock) {
1395        let start_time = test_clock.timestamp_ns();
1396
1397        // Set first timer
1398        test_clock
1399            .set_timer_ns(
1400                "collision_timer",
1401                1000,
1402                Some(start_time),
1403                None,
1404                None,
1405                None,
1406                None,
1407            )
1408            .unwrap();
1409
1410        // Setting timer with same name should overwrite the existing one
1411        let result = test_clock.set_timer_ns(
1412            "collision_timer",
1413            2000,
1414            Some(start_time),
1415            None,
1416            None,
1417            None,
1418            None,
1419        );
1420
1421        assert!(result.is_ok());
1422        // Should still only have one timer (overwritten)
1423        assert_eq!(test_clock.timer_count(), 1);
1424
1425        // The timer should have the new interval
1426        let next_time = test_clock.next_time_ns("collision_timer").unwrap();
1427        // With interval 2000 and start at start_time, next time should be start_time + 2000
1428        assert_eq!(next_time, start_time + 2000);
1429    }
1430
1431    #[rstest]
1432    fn test_timer_zero_interval_error(mut test_clock: TestClock) {
1433        let start_time = test_clock.timestamp_ns();
1434
1435        // Attempt to set timer with zero interval should fail
1436        let result =
1437            test_clock.set_timer_ns("zero_interval", 0, Some(start_time), None, None, None, None);
1438
1439        assert!(result.is_err());
1440        assert_eq!(test_clock.timer_count(), 0);
1441    }
1442
1443    #[rstest]
1444    fn test_timer_empty_name_error(mut test_clock: TestClock) {
1445        let start_time = test_clock.timestamp_ns();
1446
1447        // Attempt to set timer with empty name should fail
1448        let result = test_clock.set_timer_ns("", 1000, Some(start_time), None, None, None, None);
1449
1450        assert!(result.is_err());
1451        assert_eq!(test_clock.timer_count(), 0);
1452    }
1453
1454    #[rstest]
1455    fn test_timer_exists(mut test_clock: TestClock) {
1456        let name = Ustr::from("exists_timer");
1457        assert!(!test_clock.timer_exists(&name));
1458
1459        test_clock
1460            .set_time_alert_ns(
1461                name.as_str(),
1462                (*test_clock.timestamp_ns() + 1_000).into(),
1463                None,
1464                None,
1465            )
1466            .unwrap();
1467
1468        assert!(test_clock.timer_exists(&name));
1469    }
1470
1471    #[rstest]
1472    fn test_timer_rejects_past_stop_time_when_not_allowed(mut test_clock: TestClock) {
1473        test_clock.set_time(UnixNanos::from(10_000));
1474        let current = test_clock.timestamp_ns();
1475
1476        let result = test_clock.set_timer_ns(
1477            "past_stop",
1478            10_000,
1479            Some(current - 500),
1480            Some(current - 100),
1481            None,
1482            Some(false),
1483            None,
1484        );
1485
1486        let err = result.expect_err("expected stop time validation error");
1487        let err_msg = err.to_string();
1488        assert!(err_msg.contains("stop time"));
1489        assert!(err_msg.contains("in the past"));
1490    }
1491
1492    #[rstest]
1493    fn test_timer_accepts_future_stop_time(mut test_clock: TestClock) {
1494        let current = test_clock.timestamp_ns();
1495
1496        let result = test_clock.set_timer_ns(
1497            "future_stop",
1498            1_000,
1499            Some(current),
1500            Some(current + 10_000),
1501            None,
1502            Some(false),
1503            None,
1504        );
1505
1506        assert!(result.is_ok());
1507    }
1508
1509    #[rstest]
1510    fn test_live_clock_timer_replacement_cancels_previous_task() {
1511        let events = Arc::new(Mutex::new(Vec::new()));
1512        let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
1513
1514        let mut clock = LiveClock::new(Some(sender));
1515        clock.register_default_handler(TimeEventCallback::from(|_| {}));
1516
1517        let fast_interval = Duration::from_millis(10).as_nanos() as u64;
1518        clock
1519            .set_timer_ns("replace", fast_interval, None, None, None, None, None)
1520            .unwrap();
1521
1522        wait_for_events(&events, 2, Duration::from_millis(200));
1523        events.lock().unwrap().clear();
1524
1525        let slow_interval = Duration::from_millis(30).as_nanos() as u64;
1526        clock
1527            .set_timer_ns("replace", slow_interval, None, None, None, None, None)
1528            .unwrap();
1529
1530        wait_for_events(&events, 3, Duration::from_millis(300));
1531
1532        let snapshot = events.lock().unwrap().clone();
1533        let diffs: Vec<u64> = snapshot
1534            .windows(2)
1535            .map(|pair| pair[1].0.ts_event.as_u64() - pair[0].0.ts_event.as_u64())
1536            .collect();
1537
1538        assert!(!diffs.is_empty());
1539        for diff in diffs {
1540            assert_ne!(diff, fast_interval);
1541        }
1542
1543        clock.cancel_timers();
1544    }
1545
1546    #[rstest]
1547    fn test_live_clock_time_alert_persists_callback() {
1548        let events = Arc::new(Mutex::new(Vec::new()));
1549        let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
1550
1551        let mut clock = LiveClock::new(Some(sender));
1552        clock.register_default_handler(TimeEventCallback::from(|_| {}));
1553
1554        let now = clock.timestamp_ns();
1555        let alert_time = now + 1_000_u64;
1556
1557        clock
1558            .set_time_alert_ns("alert-callback", alert_time, None, None)
1559            .unwrap();
1560
1561        assert!(clock.callbacks.contains_key(&Ustr::from("alert-callback")));
1562
1563        clock.cancel_timers();
1564    }
1565
1566    #[rstest]
1567    fn test_live_clock_reset_stops_active_timers() {
1568        let events = Arc::new(Mutex::new(Vec::new()));
1569        let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
1570
1571        let mut clock = LiveClock::new(Some(sender));
1572        clock.register_default_handler(TimeEventCallback::from(|_| {}));
1573
1574        clock
1575            .set_timer_ns(
1576                "reset-test",
1577                Duration::from_millis(15).as_nanos() as u64,
1578                None,
1579                None,
1580                None,
1581                None,
1582                None,
1583            )
1584            .unwrap();
1585
1586        wait_for_events(&events, 2, Duration::from_millis(250));
1587
1588        clock.reset();
1589
1590        // Wait for any in-flight events to arrive
1591        let start = std::time::Instant::now();
1592        wait_until(
1593            || start.elapsed() >= Duration::from_millis(50),
1594            Duration::from_secs(2),
1595        );
1596
1597        // Clear any events that arrived before reset took effect
1598        events.lock().unwrap().clear();
1599
1600        // Verify no new events arrive (timer should be stopped)
1601        let start = std::time::Instant::now();
1602        wait_until(
1603            || start.elapsed() >= Duration::from_millis(50),
1604            Duration::from_secs(2),
1605        );
1606        assert!(events.lock().unwrap().is_empty());
1607    }
1608
1609    #[rstest]
1610    fn test_live_timer_short_delay_not_early() {
1611        let events = Arc::new(Mutex::new(Vec::new()));
1612        let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
1613
1614        let mut clock = LiveClock::new(Some(sender));
1615        clock.register_default_handler(TimeEventCallback::from(|_| {}));
1616
1617        let now = clock.timestamp_ns();
1618        let start_time = UnixNanos::from(*now + 500_000); // 0.5 ms in the future
1619        let interval_ns = 1_000_000;
1620
1621        clock
1622            .set_timer_ns(
1623                "short-delay",
1624                interval_ns,
1625                Some(start_time),
1626                None,
1627                None,
1628                None,
1629                Some(true),
1630            )
1631            .unwrap();
1632
1633        wait_for_events(&events, 1, Duration::from_millis(100));
1634
1635        let snapshot = events.lock().unwrap().clone();
1636        assert!(!snapshot.is_empty());
1637
1638        for (event, actual_ts) in &snapshot {
1639            assert!(actual_ts.as_u64() >= event.ts_event.as_u64());
1640        }
1641
1642        clock.cancel_timers();
1643    }
1644
1645    #[rstest]
1646    fn test_timer_fire_immediately_at_exact_stop_time(mut test_clock: TestClock) {
1647        let start_time = test_clock.timestamp_ns();
1648        let interval_ns = 1000;
1649        let stop_time = start_time + interval_ns; // Stop exactly at first interval
1650
1651        test_clock
1652            .set_timer_ns(
1653                "exact_stop",
1654                interval_ns,
1655                Some(start_time),
1656                Some(stop_time),
1657                None,
1658                None,
1659                Some(true),
1660            )
1661            .unwrap();
1662
1663        let events = test_clock.advance_time(stop_time, true);
1664
1665        // Should fire immediately at start, then at stop time (which equals first interval)
1666        assert_eq!(events.len(), 2);
1667        assert_eq!(*events[0].ts_event, *start_time); // Immediate fire
1668        assert_eq!(*events[1].ts_event, *stop_time); // Fire at stop time
1669    }
1670
1671    #[rstest]
1672    fn test_timer_advance_to_exact_next_time(mut test_clock: TestClock) {
1673        let start_time = test_clock.timestamp_ns();
1674        let interval_ns = 1000;
1675
1676        test_clock
1677            .set_timer_ns(
1678                "exact_advance",
1679                interval_ns,
1680                Some(start_time),
1681                None,
1682                None,
1683                None,
1684                Some(false),
1685            )
1686            .unwrap();
1687
1688        // Advance to exactly the next fire time
1689        let next_time = test_clock.next_time_ns("exact_advance").unwrap();
1690        let events = test_clock.advance_time(next_time, true);
1691
1692        assert_eq!(events.len(), 1);
1693        assert_eq!(*events[0].ts_event, *next_time);
1694    }
1695
1696    #[rstest]
1697    fn test_allow_past_bar_aggregation_use_case(mut test_clock: TestClock) {
1698        // Simulate bar aggregation scenario: current time is in middle of a bar window
1699        test_clock.set_time(UnixNanos::from(100_500)); // 100.5 seconds
1700
1701        let bar_start_time = UnixNanos::from(100_000); // 100 seconds (0.5 sec ago)
1702        let interval_ns = 1000; // 1 second bars
1703
1704        // With allow_past=false and fire_immediately=false:
1705        // start_time is in past (100 sec) but next event (101 sec) is in future
1706        // This should be ALLOWED for bar aggregation
1707        let result = test_clock.set_timer_ns(
1708            "bar_timer",
1709            interval_ns,
1710            Some(bar_start_time),
1711            None,
1712            None,
1713            Some(false), // allow_past = false
1714            Some(false), // fire_immediately = false
1715        );
1716
1717        // Should succeed because next event time (100_000 + 1000 = 101_000) > current time (100_500)
1718        assert!(result.is_ok());
1719        assert_eq!(test_clock.timer_count(), 1);
1720
1721        // Next event should be at bar_start_time + interval = 101_000
1722        let next_time = test_clock.next_time_ns("bar_timer").unwrap();
1723        assert_eq!(*next_time, 101_000);
1724    }
1725
1726    #[rstest]
1727    fn test_allow_past_false_rejects_when_next_event_in_past(mut test_clock: TestClock) {
1728        test_clock.set_time(UnixNanos::from(102_000)); // 102 seconds
1729
1730        let past_start_time = UnixNanos::from(100_000); // 100 seconds (2 sec ago)
1731        let interval_ns = 1000; // 1 second interval
1732
1733        // With allow_past=false and fire_immediately=false:
1734        // Next event would be 100_000 + 1000 = 101_000, which is < current time (102_000)
1735        // This should be REJECTED
1736        let result = test_clock.set_timer_ns(
1737            "past_event_timer",
1738            interval_ns,
1739            Some(past_start_time),
1740            None,
1741            None,
1742            Some(false), // allow_past = false
1743            Some(false), // fire_immediately = false
1744        );
1745
1746        // Should fail because next event time (101_000) < current time (102_000)
1747        assert!(result.is_err());
1748        assert!(
1749            result
1750                .unwrap_err()
1751                .to_string()
1752                .contains("would be in the past")
1753        );
1754    }
1755
1756    #[rstest]
1757    fn test_allow_past_false_with_fire_immediately_true(mut test_clock: TestClock) {
1758        test_clock.set_time(UnixNanos::from(100_500)); // 100.5 seconds
1759
1760        let past_start_time = UnixNanos::from(100_000); // 100 seconds (0.5 sec ago)
1761        let interval_ns = 1000;
1762
1763        // With fire_immediately=true, next event = start_time (which is in past)
1764        // This should be REJECTED with allow_past=false
1765        let result = test_clock.set_timer_ns(
1766            "immediate_past_timer",
1767            interval_ns,
1768            Some(past_start_time),
1769            None,
1770            None,
1771            Some(false), // allow_past = false
1772            Some(true),  // fire_immediately = true
1773        );
1774
1775        // Should fail because next event time (100_000) < current time (100_500)
1776        assert!(result.is_err());
1777        assert!(
1778            result
1779                .unwrap_err()
1780                .to_string()
1781                .contains("would be in the past")
1782        );
1783    }
1784
1785    #[rstest]
1786    fn test_cancel_timer_during_execution(mut test_clock: TestClock) {
1787        let start_time = test_clock.timestamp_ns();
1788
1789        test_clock
1790            .set_timer_ns(
1791                "cancel_test",
1792                1000,
1793                Some(start_time),
1794                None,
1795                None,
1796                None,
1797                None,
1798            )
1799            .unwrap();
1800
1801        assert_eq!(test_clock.timer_count(), 1);
1802
1803        // Cancel the timer
1804        test_clock.cancel_timer("cancel_test");
1805
1806        assert_eq!(test_clock.timer_count(), 0);
1807
1808        // Advance time - should get no events from cancelled timer
1809        let events = test_clock.advance_time(start_time + 2000, true);
1810        assert_eq!(events.len(), 0);
1811    }
1812
1813    #[rstest]
1814    fn test_cancel_all_timers(mut test_clock: TestClock) {
1815        // Create multiple timers
1816        test_clock
1817            .set_timer_ns("timer1", 1000, None, None, None, None, None)
1818            .unwrap();
1819        test_clock
1820            .set_timer_ns("timer2", 1500, None, None, None, None, None)
1821            .unwrap();
1822        test_clock
1823            .set_timer_ns("timer3", 2000, None, None, None, None, None)
1824            .unwrap();
1825
1826        assert_eq!(test_clock.timer_count(), 3);
1827
1828        // Cancel all timers
1829        test_clock.cancel_timers();
1830
1831        assert_eq!(test_clock.timer_count(), 0);
1832
1833        // Advance time - should get no events
1834        let events = test_clock.advance_time(UnixNanos::from(5000), true);
1835        assert_eq!(events.len(), 0);
1836    }
1837
1838    #[rstest]
1839    fn test_clock_reset_clears_timers(mut test_clock: TestClock) {
1840        test_clock
1841            .set_timer_ns("reset_test", 1000, None, None, None, None, None)
1842            .unwrap();
1843
1844        assert_eq!(test_clock.timer_count(), 1);
1845
1846        // Reset the clock
1847        test_clock.reset();
1848
1849        assert_eq!(test_clock.timer_count(), 0);
1850        assert_eq!(test_clock.timestamp_ns(), UnixNanos::default()); // Time reset to zero
1851    }
1852
1853    #[rstest]
1854    fn test_set_time_alert_default_impl(mut test_clock: TestClock) {
1855        let current_time = test_clock.utc_now();
1856        let alert_time = current_time + chrono::Duration::seconds(1);
1857
1858        // Test the default implementation that delegates to set_time_alert_ns
1859        test_clock
1860            .set_time_alert("alert_test", alert_time, None, None)
1861            .unwrap();
1862
1863        assert_eq!(test_clock.timer_count(), 1);
1864        assert_eq!(test_clock.timer_names(), vec!["alert_test"]);
1865
1866        // Verify the timer is set for the correct time
1867        let expected_ns = UnixNanos::from(alert_time);
1868        let next_time = test_clock.next_time_ns("alert_test").unwrap();
1869
1870        // Should be very close (within a few nanoseconds due to conversion)
1871        let diff = if next_time >= expected_ns {
1872            next_time.as_u64() - expected_ns.as_u64()
1873        } else {
1874            expected_ns.as_u64() - next_time.as_u64()
1875        };
1876        assert!(
1877            diff < 1000,
1878            "Timer should be set within 1 microsecond of expected time"
1879        );
1880    }
1881
1882    #[rstest]
1883    fn test_set_timer_default_impl(mut test_clock: TestClock) {
1884        let current_time = test_clock.utc_now();
1885        let start_time = current_time + chrono::Duration::seconds(1);
1886        let interval = Duration::from_millis(500);
1887
1888        // Test the default implementation that delegates to set_timer_ns
1889        test_clock
1890            .set_timer(
1891                "timer_test",
1892                interval,
1893                Some(start_time),
1894                None,
1895                None,
1896                None,
1897                None,
1898            )
1899            .unwrap();
1900
1901        assert_eq!(test_clock.timer_count(), 1);
1902        assert_eq!(test_clock.timer_names(), vec!["timer_test"]);
1903
1904        // Advance time and verify timer fires at correct intervals
1905        let start_ns = UnixNanos::from(start_time);
1906        let interval_ns = interval.as_nanos() as u64;
1907
1908        let events = test_clock.advance_time(start_ns + interval_ns * 3, true);
1909        assert_eq!(events.len(), 3); // Should fire 3 times
1910
1911        // Verify timing
1912        assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1913        assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1914        assert_eq!(*events[2].ts_event, *start_ns + interval_ns * 3);
1915    }
1916
1917    #[rstest]
1918    fn test_set_timer_with_stop_time_default_impl(mut test_clock: TestClock) {
1919        let current_time = test_clock.utc_now();
1920        let start_time = current_time + chrono::Duration::seconds(1);
1921        let stop_time = current_time + chrono::Duration::seconds(3);
1922        let interval = Duration::from_secs(1);
1923
1924        // Test with stop time
1925        test_clock
1926            .set_timer(
1927                "timer_with_stop",
1928                interval,
1929                Some(start_time),
1930                Some(stop_time),
1931                None,
1932                None,
1933                None,
1934            )
1935            .unwrap();
1936
1937        assert_eq!(test_clock.timer_count(), 1);
1938
1939        // Advance beyond stop time
1940        let stop_ns = UnixNanos::from(stop_time);
1941        let events = test_clock.advance_time(stop_ns + 1000, true);
1942
1943        // Should fire twice: at start_time + 1s and start_time + 2s, but not at start_time + 3s since that would be at stop_time
1944        assert_eq!(events.len(), 2);
1945
1946        let start_ns = UnixNanos::from(start_time);
1947        let interval_ns = interval.as_nanos() as u64;
1948        assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1949        assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1950    }
1951
1952    #[rstest]
1953    fn test_set_timer_fire_immediately_default_impl(mut test_clock: TestClock) {
1954        let current_time = test_clock.utc_now();
1955        let start_time = current_time + chrono::Duration::seconds(1);
1956        let interval = Duration::from_millis(500);
1957
1958        // Test with fire_immediately=true
1959        test_clock
1960            .set_timer(
1961                "immediate_timer",
1962                interval,
1963                Some(start_time),
1964                None,
1965                None,
1966                None,
1967                Some(true),
1968            )
1969            .unwrap();
1970
1971        let start_ns = UnixNanos::from(start_time);
1972        let interval_ns = interval.as_nanos() as u64;
1973
1974        // Advance to start time + 1 interval
1975        let events = test_clock.advance_time(start_ns + interval_ns, true);
1976
1977        // Should fire immediately at start_time, then again at start_time + interval
1978        assert_eq!(events.len(), 2);
1979        assert_eq!(*events[0].ts_event, *start_ns); // Immediate fire
1980        assert_eq!(*events[1].ts_event, *start_ns + interval_ns); // Regular interval
1981    }
1982
1983    #[rstest]
1984    fn test_set_time_alert_when_alert_time_equals_current_time(mut test_clock: TestClock) {
1985        let current_time = test_clock.timestamp_ns();
1986
1987        // Set time alert for exactly the current time
1988        test_clock
1989            .set_time_alert_ns("alert_at_current_time", current_time, None, None)
1990            .unwrap();
1991
1992        assert_eq!(test_clock.timer_count(), 1);
1993
1994        // Advance time by exactly 0 (to current time) - should fire immediately
1995        let events = test_clock.advance_time(current_time, true);
1996
1997        // Should fire immediately since alert_time_ns == ts_now
1998        assert_eq!(events.len(), 1);
1999        assert_eq!(events[0].name.as_str(), "alert_at_current_time");
2000        assert_eq!(*events[0].ts_event, *current_time);
2001    }
2002}