1use std::{
19 any::Any,
20 collections::{BTreeMap, BinaryHeap},
21 fmt::Debug,
22 ops::Deref,
23 time::Duration,
24};
25
26use ahash::AHashMap;
27use chrono::{DateTime, Utc};
28use nautilus_core::{
29 AtomicTime, UnixNanos,
30 correctness::{check_positive_u64, check_predicate_true, check_valid_string_utf8},
31 formatting::Separable,
32};
33use ustr::Ustr;
34
35use crate::timer::{
36 ScheduledTimeEvent, TestTimer, TimeEvent, TimeEventCallback, TimeEventHandlerV2,
37 create_valid_interval,
38};
39
40pub trait Clock: Debug + Any {
46 fn utc_now(&self) -> DateTime<Utc> {
48 DateTime::from_timestamp_nanos(self.timestamp_ns().as_i64())
49 }
50
51 fn timestamp_ns(&self) -> UnixNanos;
53
54 fn timestamp_us(&self) -> u64;
56
57 fn timestamp_ms(&self) -> u64;
59
60 fn timestamp(&self) -> f64;
62
63 fn timer_names(&self) -> Vec<&str>;
65
66 fn timer_count(&self) -> usize;
68
69 fn timer_exists(&self, name: &Ustr) -> bool;
71
72 fn register_default_handler(&mut self, callback: TimeEventCallback);
75
76 fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2;
80
81 #[allow(clippy::too_many_arguments)]
95 fn set_time_alert(
96 &mut self,
97 name: &str,
98 alert_time: DateTime<Utc>,
99 callback: Option<TimeEventCallback>,
100 allow_past: Option<bool>,
101 ) -> anyhow::Result<()> {
102 self.set_time_alert_ns(name, alert_time.into(), callback, allow_past)
103 }
104
105 #[allow(clippy::too_many_arguments)]
126 fn set_time_alert_ns(
127 &mut self,
128 name: &str,
129 alert_time_ns: UnixNanos,
130 callback: Option<TimeEventCallback>,
131 allow_past: Option<bool>,
132 ) -> anyhow::Result<()>;
133
134 #[allow(clippy::too_many_arguments)]
150 fn set_timer(
151 &mut self,
152 name: &str,
153 interval: Duration,
154 start_time: Option<DateTime<Utc>>,
155 stop_time: Option<DateTime<Utc>>,
156 callback: Option<TimeEventCallback>,
157 allow_past: Option<bool>,
158 fire_immediately: Option<bool>,
159 ) -> anyhow::Result<()> {
160 self.set_timer_ns(
161 name,
162 interval.as_nanos() as u64,
163 start_time.map(UnixNanos::from),
164 stop_time.map(UnixNanos::from),
165 callback,
166 allow_past,
167 fire_immediately,
168 )
169 }
170
171 #[allow(clippy::too_many_arguments)]
199 fn set_timer_ns(
200 &mut self,
201 name: &str,
202 interval_ns: u64,
203 start_time_ns: Option<UnixNanos>,
204 stop_time_ns: Option<UnixNanos>,
205 callback: Option<TimeEventCallback>,
206 allow_past: Option<bool>,
207 fire_immediately: Option<bool>,
208 ) -> anyhow::Result<()>;
209
210 fn next_time_ns(&self, name: &str) -> Option<UnixNanos>;
214
215 fn cancel_timer(&mut self, name: &str);
217
218 fn cancel_timers(&mut self);
220
221 fn reset(&mut self);
223}
224
225impl dyn Clock {
226 pub fn as_any(&self) -> &dyn std::any::Any {
228 self
229 }
230 pub fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
232 self
233 }
234}
235
236#[derive(Debug, Default)]
241pub struct CallbackRegistry {
242 default_callback: Option<TimeEventCallback>,
243 callbacks: AHashMap<Ustr, TimeEventCallback>,
244}
245
246impl CallbackRegistry {
247 #[must_use]
249 pub fn new() -> Self {
250 Self {
251 default_callback: None,
252 callbacks: AHashMap::new(),
253 }
254 }
255
256 pub fn register_default_handler(&mut self, callback: TimeEventCallback) {
258 self.default_callback = Some(callback);
259 }
260
261 pub fn register_callback(&mut self, name: Ustr, callback: TimeEventCallback) {
263 self.callbacks.insert(name, callback);
264 }
265
266 #[must_use]
268 pub fn has_any_callback(&self, name: &Ustr) -> bool {
269 self.callbacks.contains_key(name) || self.default_callback.is_some()
270 }
271
272 #[must_use]
274 pub fn get_callback(&self, name: &Ustr) -> Option<TimeEventCallback> {
275 self.callbacks
276 .get(name)
277 .cloned()
278 .or_else(|| self.default_callback.clone())
279 }
280
281 #[must_use]
287 pub fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
288 let callback = self
289 .get_callback(&event.name)
290 .unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
291
292 TimeEventHandlerV2::new(event, callback)
293 }
294
295 pub fn clear(&mut self) {
297 self.callbacks.clear();
298 }
299}
300
301pub fn validate_and_prepare_time_alert(
309 name: &str,
310 mut alert_time_ns: UnixNanos,
311 allow_past: Option<bool>,
312 ts_now: UnixNanos,
313) -> anyhow::Result<(Ustr, UnixNanos)> {
314 check_valid_string_utf8(name, stringify!(name))?;
315
316 let name = Ustr::from(name);
317 let allow_past = allow_past.unwrap_or(true);
318
319 if alert_time_ns < ts_now {
320 if allow_past {
321 alert_time_ns = ts_now;
322 log::warn!(
323 "Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
324 alert_time_ns.to_rfc3339(),
325 );
326 } else {
327 anyhow::bail!(
328 "Timer '{name}' alert time {} was in the past (current time is {ts_now})",
329 alert_time_ns.to_rfc3339(),
330 );
331 }
332 }
333
334 Ok((name, alert_time_ns))
335}
336
337pub fn validate_and_prepare_timer(
346 name: &str,
347 interval_ns: u64,
348 start_time_ns: Option<UnixNanos>,
349 stop_time_ns: Option<UnixNanos>,
350 allow_past: Option<bool>,
351 fire_immediately: Option<bool>,
352 ts_now: UnixNanos,
353) -> anyhow::Result<(Ustr, UnixNanos, Option<UnixNanos>, bool, bool)> {
354 check_valid_string_utf8(name, stringify!(name))?;
355 check_positive_u64(interval_ns, stringify!(interval_ns))?;
356
357 let name = Ustr::from(name);
358 let allow_past = allow_past.unwrap_or(true);
359 let fire_immediately = fire_immediately.unwrap_or(false);
360
361 let mut start_time_ns = start_time_ns.unwrap_or_default();
362
363 if start_time_ns == 0 {
364 start_time_ns = ts_now;
366 } else if !allow_past {
367 let next_event_time = if fire_immediately {
368 start_time_ns
369 } else {
370 start_time_ns + interval_ns
371 };
372
373 if next_event_time < ts_now {
374 anyhow::bail!(
375 "Timer '{name}' next event time {} would be in the past (current time is {ts_now})",
376 next_event_time.to_rfc3339(),
377 );
378 }
379 }
380
381 if let Some(stop_time) = stop_time_ns {
382 if stop_time <= start_time_ns {
383 anyhow::bail!(
384 "Timer '{name}' stop time {} must be after start time {}",
385 stop_time.to_rfc3339(),
386 start_time_ns.to_rfc3339(),
387 );
388 }
389 if !allow_past && stop_time <= ts_now {
390 anyhow::bail!(
391 "Timer '{name}' stop time {} is in the past (current time is {ts_now})",
392 stop_time.to_rfc3339(),
393 );
394 }
395 }
396
397 Ok((
398 name,
399 start_time_ns,
400 stop_time_ns,
401 allow_past,
402 fire_immediately,
403 ))
404}
405
406#[derive(Debug)]
414pub struct TestClock {
415 time: AtomicTime,
416 timers: BTreeMap<Ustr, TestTimer>,
418 callbacks: CallbackRegistry,
419 heap: BinaryHeap<ScheduledTimeEvent>, }
421
422impl TestClock {
423 #[must_use]
425 pub fn new() -> Self {
426 Self {
427 time: AtomicTime::new(false, UnixNanos::default()),
428 timers: BTreeMap::new(),
429 callbacks: CallbackRegistry::new(),
430 heap: BinaryHeap::new(),
431 }
432 }
433
434 #[must_use]
436 pub const fn get_timers(&self) -> &BTreeMap<Ustr, TestTimer> {
437 &self.timers
438 }
439
440 pub fn advance_time(&mut self, to_time_ns: UnixNanos, set_time: bool) -> Vec<TimeEvent> {
457 const WARN_TIME_EVENTS_THRESHOLD: usize = 1_000_000;
458
459 let from_time_ns = self.time.get_time_ns();
460
461 assert!(
462 to_time_ns >= from_time_ns,
463 "Invariant violated: time must be non-decreasing, `to_time_ns` {to_time_ns} < `from_time_ns` {from_time_ns}"
464 );
465
466 if set_time {
467 self.time.set_time(to_time_ns);
468 }
469
470 let mut events: Vec<TimeEvent> = Vec::new();
472 self.timers.retain(|_, timer| {
473 timer.advance(to_time_ns).for_each(|event| {
474 events.push(event);
475 });
476
477 !timer.is_expired()
478 });
479
480 if events.len() >= WARN_TIME_EVENTS_THRESHOLD {
481 log::warn!(
482 "Allocated {} time events during clock advancement from {} to {}, \
483 consider stopping the timer between large time ranges with no data points",
484 events.len().separate_with_commas(),
485 from_time_ns,
486 to_time_ns
487 );
488 }
489
490 events.sort_by(|a, b| a.ts_event.cmp(&b.ts_event));
491 events
492 }
493
494 pub fn advance_to_time_on_heap(&mut self, to_time_ns: UnixNanos) {
508 const WARN_HEAP_SIZE_THRESHOLD: usize = 100_000;
509
510 let from_time_ns = self.time.get_time_ns();
511
512 assert!(
513 to_time_ns >= from_time_ns,
514 "Invariant violated: time must be non-decreasing, `to_time_ns` {to_time_ns} < `from_time_ns` {from_time_ns}"
515 );
516
517 self.time.set_time(to_time_ns);
518
519 if self.heap.len() > WARN_HEAP_SIZE_THRESHOLD {
520 log::warn!(
521 "TestClock heap size {} exceeds recommended limit",
522 self.heap.len()
523 );
524 }
525
526 self.timers.retain(|_, timer| {
528 timer.advance(to_time_ns).for_each(|event| {
529 self.heap.push(ScheduledTimeEvent::new(event));
530 });
531
532 !timer.is_expired()
533 });
534 }
535
536 #[must_use]
546 pub fn match_handlers(&self, events: Vec<TimeEvent>) -> Vec<TimeEventHandlerV2> {
547 events
548 .into_iter()
549 .map(|event| self.callbacks.get_handler(event))
550 .collect()
551 }
552
553 fn replace_existing_timer_if_needed(&mut self, name: &Ustr) {
554 if self.timer_exists(name) {
555 self.cancel_timer(name.as_str());
556 log::warn!("Timer '{name}' replaced");
557 }
558 }
559}
560
561impl Iterator for TestClock {
562 type Item = TimeEventHandlerV2;
563
564 fn next(&mut self) -> Option<Self::Item> {
565 self.heap
566 .pop()
567 .map(|event| self.get_handler(event.into_inner()))
568 }
569}
570
571impl Default for TestClock {
572 fn default() -> Self {
574 Self::new()
575 }
576}
577
578impl Deref for TestClock {
579 type Target = AtomicTime;
580
581 fn deref(&self) -> &Self::Target {
582 &self.time
583 }
584}
585
586impl Clock for TestClock {
587 fn timestamp_ns(&self) -> UnixNanos {
588 self.time.get_time_ns()
589 }
590
591 fn timestamp_us(&self) -> u64 {
592 self.time.get_time_us()
593 }
594
595 fn timestamp_ms(&self) -> u64 {
596 self.time.get_time_ms()
597 }
598
599 fn timestamp(&self) -> f64 {
600 self.time.get_time()
601 }
602
603 fn timer_names(&self) -> Vec<&str> {
604 self.timers
605 .iter()
606 .filter(|(_, timer)| !timer.is_expired())
607 .map(|(k, _)| k.as_str())
608 .collect()
609 }
610
611 fn timer_count(&self) -> usize {
612 self.timers
613 .iter()
614 .filter(|(_, timer)| !timer.is_expired())
615 .count()
616 }
617
618 fn timer_exists(&self, name: &Ustr) -> bool {
619 self.timers.contains_key(name)
620 }
621
622 fn register_default_handler(&mut self, callback: TimeEventCallback) {
623 self.callbacks.register_default_handler(callback);
624 }
625
626 fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
632 self.callbacks.get_handler(event)
633 }
634
635 fn set_time_alert_ns(
636 &mut self,
637 name: &str,
638 alert_time_ns: UnixNanos,
639 callback: Option<TimeEventCallback>,
640 allow_past: Option<bool>,
641 ) -> anyhow::Result<()> {
642 let ts_now = self.get_time_ns();
643 let (name, alert_time_ns) =
644 validate_and_prepare_time_alert(name, alert_time_ns, allow_past, ts_now)?;
645
646 self.replace_existing_timer_if_needed(&name);
647
648 check_predicate_true(
649 callback.is_some() | self.callbacks.has_any_callback(&name),
650 "No callbacks provided",
651 )?;
652
653 if let Some(callback) = callback {
654 self.callbacks.register_callback(name, callback);
655 }
656
657 let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
659 let fire_immediately = alert_time_ns == ts_now;
660
661 let timer = TestTimer::new(
662 name,
663 interval_ns,
664 ts_now,
665 Some(alert_time_ns),
666 fire_immediately,
667 );
668 self.timers.insert(name, timer);
669
670 Ok(())
671 }
672
673 fn set_timer_ns(
674 &mut self,
675 name: &str,
676 interval_ns: u64,
677 start_time_ns: Option<UnixNanos>,
678 stop_time_ns: Option<UnixNanos>,
679 callback: Option<TimeEventCallback>,
680 allow_past: Option<bool>,
681 fire_immediately: Option<bool>,
682 ) -> anyhow::Result<()> {
683 let ts_now = self.get_time_ns();
684 let (name, start_time_ns, stop_time_ns, _allow_past, fire_immediately) =
685 validate_and_prepare_timer(
686 name,
687 interval_ns,
688 start_time_ns,
689 stop_time_ns,
690 allow_past,
691 fire_immediately,
692 ts_now,
693 )?;
694
695 check_predicate_true(
696 callback.is_some() | self.callbacks.has_any_callback(&name),
697 "No callbacks provided",
698 )?;
699
700 self.replace_existing_timer_if_needed(&name);
701
702 if let Some(callback) = callback {
703 self.callbacks.register_callback(name, callback);
704 }
705
706 let interval_ns = create_valid_interval(interval_ns);
707
708 let timer = TestTimer::new(
709 name,
710 interval_ns,
711 start_time_ns,
712 stop_time_ns,
713 fire_immediately,
714 );
715 self.timers.insert(name, timer);
716
717 Ok(())
718 }
719
720 fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
721 self.timers
722 .get(&Ustr::from(name))
723 .map(|timer| timer.next_time_ns())
724 }
725
726 fn cancel_timer(&mut self, name: &str) {
727 let timer = self.timers.remove(&Ustr::from(name));
728 if let Some(mut timer) = timer {
729 timer.cancel();
730 }
731 }
732
733 fn cancel_timers(&mut self) {
734 for timer in &mut self.timers.values_mut() {
735 timer.cancel();
736 }
737
738 self.timers.clear();
739 }
740
741 fn reset(&mut self) {
742 self.time = AtomicTime::new(false, UnixNanos::default());
743 self.timers = BTreeMap::new();
744 self.heap = BinaryHeap::new();
745 self.callbacks.clear();
746 }
747}
748
749#[cfg(test)]
750mod tests {
751 use std::{
752 sync::{Arc, Mutex},
753 time::Duration,
754 };
755
756 use nautilus_core::{MUTEX_POISONED, UnixNanos};
757 use rstest::{fixture, rstest};
758 use ustr::Ustr;
759
760 use super::*;
761 use crate::timer::{TimeEvent, TimeEventCallback};
762
763 #[derive(Debug, Default)]
764 struct TestCallback {
765 called: Arc<Mutex<bool>>,
767 }
768
769 impl TestCallback {
770 fn new(called: Arc<Mutex<bool>>) -> Self {
771 Self { called }
772 }
773 }
774
775 impl From<TestCallback> for TimeEventCallback {
776 fn from(callback: TestCallback) -> Self {
777 Self::from(move |_event: TimeEvent| {
778 if let Ok(mut called) = callback.called.lock() {
779 *called = true;
780 }
781 })
782 }
783 }
784
785 #[fixture]
786 pub fn test_clock() -> TestClock {
787 let mut clock = TestClock::new();
788 clock.register_default_handler(TestCallback::default().into());
789 clock
790 }
791
792 #[rstest]
793 fn test_time_monotonicity(mut test_clock: TestClock) {
794 let initial_time = test_clock.timestamp_ns();
795 test_clock.advance_time(UnixNanos::from(*initial_time + 1000), true);
796 assert!(test_clock.timestamp_ns() > initial_time);
797 }
798
799 #[rstest]
800 fn test_timer_registration(mut test_clock: TestClock) {
801 test_clock
802 .set_time_alert_ns(
803 "test_timer",
804 (*test_clock.timestamp_ns() + 1000).into(),
805 None,
806 None,
807 )
808 .unwrap();
809 assert_eq!(test_clock.timer_count(), 1);
810 assert_eq!(test_clock.timer_names(), vec!["test_timer"]);
811 }
812
813 #[rstest]
814 fn test_timer_expiration(mut test_clock: TestClock) {
815 let alert_time = (*test_clock.timestamp_ns() + 1000).into();
816 test_clock
817 .set_time_alert_ns("test_timer", alert_time, None, None)
818 .unwrap();
819 let events = test_clock.advance_time(alert_time, true);
820 assert_eq!(events.len(), 1);
821 assert_eq!(events[0].name.as_str(), "test_timer");
822 }
823
824 #[rstest]
825 fn test_timer_cancellation(mut test_clock: TestClock) {
826 test_clock
827 .set_time_alert_ns(
828 "test_timer",
829 (*test_clock.timestamp_ns() + 1000).into(),
830 None,
831 None,
832 )
833 .unwrap();
834 assert_eq!(test_clock.timer_count(), 1);
835 test_clock.cancel_timer("test_timer");
836 assert_eq!(test_clock.timer_count(), 0);
837 }
838
839 #[rstest]
840 fn test_time_advancement(mut test_clock: TestClock) {
841 let start_time = test_clock.timestamp_ns();
842 test_clock
843 .set_timer_ns("test_timer", 1000, Some(start_time), None, None, None, None)
844 .unwrap();
845 let events = test_clock.advance_time(UnixNanos::from(*start_time + 2500), true);
846 assert_eq!(events.len(), 2);
847 assert_eq!(*events[0].ts_event, *start_time + 1000);
848 assert_eq!(*events[1].ts_event, *start_time + 2000);
849 }
850
851 #[rstest]
852 fn test_default_and_custom_callbacks() {
853 let mut clock = TestClock::new();
854 let default_called = Arc::new(Mutex::new(false));
855 let custom_called = Arc::new(Mutex::new(false));
856
857 let default_callback = TestCallback::new(Arc::clone(&default_called));
858 let custom_callback = TestCallback::new(Arc::clone(&custom_called));
859
860 clock.register_default_handler(TimeEventCallback::from(default_callback));
861 clock
862 .set_time_alert_ns(
863 "default_timer",
864 (*clock.timestamp_ns() + 1000).into(),
865 None,
866 None,
867 )
868 .unwrap();
869 clock
870 .set_time_alert_ns(
871 "custom_timer",
872 (*clock.timestamp_ns() + 1000).into(),
873 Some(TimeEventCallback::from(custom_callback)),
874 None,
875 )
876 .unwrap();
877
878 let events = clock.advance_time(UnixNanos::from(*clock.timestamp_ns() + 1000), true);
879 let handlers = clock.match_handlers(events);
880
881 for handler in handlers {
882 handler.callback.call(handler.event);
883 }
884
885 assert!(*default_called.lock().expect(MUTEX_POISONED));
886 assert!(*custom_called.lock().expect(MUTEX_POISONED));
887 }
888
889 #[rstest]
890 fn test_timer_with_rust_local_callback() {
891 use std::{cell::RefCell, rc::Rc};
892
893 let mut clock = TestClock::new();
894 let call_count = Rc::new(RefCell::new(0_u32));
895 let call_count_clone = Rc::clone(&call_count);
896
897 let callback: Rc<dyn Fn(TimeEvent)> = Rc::new(move |_event: TimeEvent| {
899 *call_count_clone.borrow_mut() += 1;
900 });
901
902 clock
903 .set_time_alert_ns(
904 "local_timer",
905 (*clock.timestamp_ns() + 1000).into(),
906 Some(TimeEventCallback::from(callback)),
907 None,
908 )
909 .unwrap();
910
911 let events = clock.advance_time(UnixNanos::from(*clock.timestamp_ns() + 1000), true);
912 let handlers = clock.match_handlers(events);
913
914 for handler in handlers {
915 handler.callback.call(handler.event);
916 }
917
918 assert_eq!(*call_count.borrow(), 1);
919 }
920
921 #[rstest]
922 fn test_multiple_timers(mut test_clock: TestClock) {
923 let start_time = test_clock.timestamp_ns();
924 test_clock
925 .set_timer_ns("timer1", 1000, Some(start_time), None, None, None, None)
926 .unwrap();
927 test_clock
928 .set_timer_ns("timer2", 2000, Some(start_time), None, None, None, None)
929 .unwrap();
930 let events = test_clock.advance_time(UnixNanos::from(*start_time + 2000), true);
931 assert_eq!(events.len(), 3);
932 assert_eq!(events[0].name.as_str(), "timer1");
933 assert_eq!(events[1].name.as_str(), "timer1");
934 assert_eq!(events[2].name.as_str(), "timer2");
935 }
936
937 #[rstest]
938 fn test_allow_past_parameter_true(mut test_clock: TestClock) {
939 test_clock.set_time(UnixNanos::from(2000));
940 let current_time = test_clock.timestamp_ns();
941 let past_time = UnixNanos::from(current_time.as_u64() - 1000);
942
943 test_clock
945 .set_time_alert_ns("past_timer", past_time, None, Some(true))
946 .unwrap();
947
948 assert_eq!(test_clock.timer_count(), 1);
950 assert_eq!(test_clock.timer_names(), vec!["past_timer"]);
951
952 let next_time = test_clock.next_time_ns("past_timer").unwrap();
954 assert!(next_time >= current_time);
955 }
956
957 #[rstest]
958 fn test_allow_past_parameter_false(mut test_clock: TestClock) {
959 test_clock.set_time(UnixNanos::from(2000));
960 let current_time = test_clock.timestamp_ns();
961 let past_time = current_time - 1000;
962
963 let result = test_clock.set_time_alert_ns("past_timer", past_time, None, Some(false));
965
966 assert!(result.is_err());
968 assert!(format!("{}", result.unwrap_err()).contains("was in the past"));
969
970 assert_eq!(test_clock.timer_count(), 0);
972 assert!(test_clock.timer_names().is_empty());
973 }
974
975 #[rstest]
976 fn test_invalid_stop_time_validation(mut test_clock: TestClock) {
977 test_clock.set_time(UnixNanos::from(2000));
978 let current_time = test_clock.timestamp_ns();
979 let start_time = current_time + 1000;
980 let stop_time = current_time + 500; let result = test_clock.set_timer_ns(
984 "invalid_timer",
985 100,
986 Some(start_time),
987 Some(stop_time),
988 None,
989 None,
990 None,
991 );
992
993 assert!(result.is_err());
995 assert!(format!("{}", result.unwrap_err()).contains("must be after start time"));
996
997 assert_eq!(test_clock.timer_count(), 0);
999 }
1000
1001 #[rstest]
1002 fn test_set_timer_ns_fire_immediately_true(mut test_clock: TestClock) {
1003 let start_time = test_clock.timestamp_ns();
1004 let interval_ns = 1000;
1005
1006 test_clock
1007 .set_timer_ns(
1008 "fire_immediately_timer",
1009 interval_ns,
1010 Some(start_time),
1011 None,
1012 None,
1013 None,
1014 Some(true),
1015 )
1016 .unwrap();
1017
1018 let events = test_clock.advance_time(start_time + 2500, true);
1020
1021 assert_eq!(events.len(), 3);
1023 assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *start_time + 1000); assert_eq!(*events[2].ts_event, *start_time + 2000); }
1027
1028 #[rstest]
1029 fn test_set_timer_ns_fire_immediately_false(mut test_clock: TestClock) {
1030 let start_time = test_clock.timestamp_ns();
1031 let interval_ns = 1000;
1032
1033 test_clock
1034 .set_timer_ns(
1035 "normal_timer",
1036 interval_ns,
1037 Some(start_time),
1038 None,
1039 None,
1040 None,
1041 Some(false),
1042 )
1043 .unwrap();
1044
1045 let events = test_clock.advance_time(start_time + 2500, true);
1047
1048 assert_eq!(events.len(), 2);
1050 assert_eq!(*events[0].ts_event, *start_time + 1000); assert_eq!(*events[1].ts_event, *start_time + 2000); }
1053
1054 #[rstest]
1055 fn test_set_timer_ns_fire_immediately_default_is_false(mut test_clock: TestClock) {
1056 let start_time = test_clock.timestamp_ns();
1057 let interval_ns = 1000;
1058
1059 test_clock
1061 .set_timer_ns(
1062 "default_timer",
1063 interval_ns,
1064 Some(start_time),
1065 None,
1066 None,
1067 None,
1068 None,
1069 )
1070 .unwrap();
1071
1072 let events = test_clock.advance_time(start_time + 1500, true);
1073
1074 assert_eq!(events.len(), 1);
1076 assert_eq!(*events[0].ts_event, *start_time + 1000); }
1078
1079 #[rstest]
1080 fn test_set_timer_ns_fire_immediately_with_zero_start_time(mut test_clock: TestClock) {
1081 test_clock.set_time(5000.into());
1082 let interval_ns = 1000;
1083
1084 test_clock
1085 .set_timer_ns(
1086 "zero_start_timer",
1087 interval_ns,
1088 None,
1089 None,
1090 None,
1091 None,
1092 Some(true),
1093 )
1094 .unwrap();
1095
1096 let events = test_clock.advance_time(UnixNanos::from(7000), true);
1097
1098 assert_eq!(events.len(), 3);
1101 assert_eq!(*events[0].ts_event, 5000); assert_eq!(*events[1].ts_event, 6000);
1103 assert_eq!(*events[2].ts_event, 7000);
1104 }
1105
1106 #[rstest]
1107 fn test_multiple_timers_different_fire_immediately_settings(mut test_clock: TestClock) {
1108 let start_time = test_clock.timestamp_ns();
1109 let interval_ns = 1000;
1110
1111 test_clock
1113 .set_timer_ns(
1114 "immediate_timer",
1115 interval_ns,
1116 Some(start_time),
1117 None,
1118 None,
1119 None,
1120 Some(true),
1121 )
1122 .unwrap();
1123
1124 test_clock
1126 .set_timer_ns(
1127 "normal_timer",
1128 interval_ns,
1129 Some(start_time),
1130 None,
1131 None,
1132 None,
1133 Some(false),
1134 )
1135 .unwrap();
1136
1137 let events = test_clock.advance_time(start_time + 1500, true);
1138
1139 assert_eq!(events.len(), 3);
1141
1142 let mut event_times: Vec<u64> = events.iter().map(|e| e.ts_event.as_u64()).collect();
1144 event_times.sort();
1145
1146 assert_eq!(event_times[0], start_time.as_u64()); assert_eq!(event_times[1], start_time.as_u64() + 1000); assert_eq!(event_times[2], start_time.as_u64() + 1000); }
1150
1151 #[rstest]
1152 fn test_timer_name_collision_overwrites(mut test_clock: TestClock) {
1153 let start_time = test_clock.timestamp_ns();
1154
1155 test_clock
1157 .set_timer_ns(
1158 "collision_timer",
1159 1000,
1160 Some(start_time),
1161 None,
1162 None,
1163 None,
1164 None,
1165 )
1166 .unwrap();
1167
1168 let result = test_clock.set_timer_ns(
1170 "collision_timer",
1171 2000,
1172 Some(start_time),
1173 None,
1174 None,
1175 None,
1176 None,
1177 );
1178
1179 assert!(result.is_ok());
1180 assert_eq!(test_clock.timer_count(), 1);
1182
1183 let next_time = test_clock.next_time_ns("collision_timer").unwrap();
1185 assert_eq!(next_time, start_time + 2000);
1187 }
1188
1189 #[rstest]
1190 fn test_timer_zero_interval_error(mut test_clock: TestClock) {
1191 let start_time = test_clock.timestamp_ns();
1192
1193 let result =
1195 test_clock.set_timer_ns("zero_interval", 0, Some(start_time), None, None, None, None);
1196
1197 assert!(result.is_err());
1198 assert_eq!(test_clock.timer_count(), 0);
1199 }
1200
1201 #[rstest]
1202 fn test_timer_empty_name_error(mut test_clock: TestClock) {
1203 let start_time = test_clock.timestamp_ns();
1204
1205 let result = test_clock.set_timer_ns("", 1000, Some(start_time), None, None, None, None);
1207
1208 assert!(result.is_err());
1209 assert_eq!(test_clock.timer_count(), 0);
1210 }
1211
1212 #[rstest]
1213 fn test_timer_exists(mut test_clock: TestClock) {
1214 let name = Ustr::from("exists_timer");
1215 assert!(!test_clock.timer_exists(&name));
1216
1217 test_clock
1218 .set_time_alert_ns(
1219 name.as_str(),
1220 (*test_clock.timestamp_ns() + 1_000).into(),
1221 None,
1222 None,
1223 )
1224 .unwrap();
1225
1226 assert!(test_clock.timer_exists(&name));
1227 }
1228
1229 #[rstest]
1230 fn test_timer_rejects_past_stop_time_when_not_allowed(mut test_clock: TestClock) {
1231 test_clock.set_time(UnixNanos::from(10_000));
1232 let current = test_clock.timestamp_ns();
1233
1234 let result = test_clock.set_timer_ns(
1235 "past_stop",
1236 10_000,
1237 Some(current - 500),
1238 Some(current - 100),
1239 None,
1240 Some(false),
1241 None,
1242 );
1243
1244 let err = result.expect_err("expected stop time validation error");
1245 let err_msg = err.to_string();
1246 assert!(err_msg.contains("stop time"));
1247 assert!(err_msg.contains("in the past"));
1248 }
1249
1250 #[rstest]
1251 fn test_timer_accepts_future_stop_time(mut test_clock: TestClock) {
1252 let current = test_clock.timestamp_ns();
1253
1254 let result = test_clock.set_timer_ns(
1255 "future_stop",
1256 1_000,
1257 Some(current),
1258 Some(current + 10_000),
1259 None,
1260 Some(false),
1261 None,
1262 );
1263
1264 assert!(result.is_ok());
1265 }
1266
1267 #[rstest]
1268 fn test_timer_fire_immediately_at_exact_stop_time(mut test_clock: TestClock) {
1269 let start_time = test_clock.timestamp_ns();
1270 let interval_ns = 1000;
1271 let stop_time = start_time + interval_ns; test_clock
1274 .set_timer_ns(
1275 "exact_stop",
1276 interval_ns,
1277 Some(start_time),
1278 Some(stop_time),
1279 None,
1280 None,
1281 Some(true),
1282 )
1283 .unwrap();
1284
1285 let events = test_clock.advance_time(stop_time, true);
1286
1287 assert_eq!(events.len(), 2);
1289 assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *stop_time); }
1292
1293 #[rstest]
1294 fn test_timer_advance_to_exact_next_time(mut test_clock: TestClock) {
1295 let start_time = test_clock.timestamp_ns();
1296 let interval_ns = 1000;
1297
1298 test_clock
1299 .set_timer_ns(
1300 "exact_advance",
1301 interval_ns,
1302 Some(start_time),
1303 None,
1304 None,
1305 None,
1306 Some(false),
1307 )
1308 .unwrap();
1309
1310 let next_time = test_clock.next_time_ns("exact_advance").unwrap();
1312 let events = test_clock.advance_time(next_time, true);
1313
1314 assert_eq!(events.len(), 1);
1315 assert_eq!(*events[0].ts_event, *next_time);
1316 }
1317
1318 #[rstest]
1319 fn test_allow_past_bar_aggregation_use_case(mut test_clock: TestClock) {
1320 test_clock.set_time(UnixNanos::from(100_500)); let bar_start_time = UnixNanos::from(100_000); let interval_ns = 1000; let result = test_clock.set_timer_ns(
1330 "bar_timer",
1331 interval_ns,
1332 Some(bar_start_time),
1333 None,
1334 None,
1335 Some(false), Some(false), );
1338
1339 assert!(result.is_ok());
1341 assert_eq!(test_clock.timer_count(), 1);
1342
1343 let next_time = test_clock.next_time_ns("bar_timer").unwrap();
1345 assert_eq!(*next_time, 101_000);
1346 }
1347
1348 #[rstest]
1349 fn test_allow_past_false_rejects_when_next_event_in_past(mut test_clock: TestClock) {
1350 test_clock.set_time(UnixNanos::from(102_000)); let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000; let result = test_clock.set_timer_ns(
1359 "past_event_timer",
1360 interval_ns,
1361 Some(past_start_time),
1362 None,
1363 None,
1364 Some(false), Some(false), );
1367
1368 assert!(result.is_err());
1370 assert!(
1371 result
1372 .unwrap_err()
1373 .to_string()
1374 .contains("would be in the past")
1375 );
1376 }
1377
1378 #[rstest]
1379 fn test_allow_past_false_with_fire_immediately_true(mut test_clock: TestClock) {
1380 test_clock.set_time(UnixNanos::from(100_500)); let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000;
1384
1385 let result = test_clock.set_timer_ns(
1388 "immediate_past_timer",
1389 interval_ns,
1390 Some(past_start_time),
1391 None,
1392 None,
1393 Some(false), Some(true), );
1396
1397 assert!(result.is_err());
1399 assert!(
1400 result
1401 .unwrap_err()
1402 .to_string()
1403 .contains("would be in the past")
1404 );
1405 }
1406
1407 #[rstest]
1408 fn test_cancel_timer_during_execution(mut test_clock: TestClock) {
1409 let start_time = test_clock.timestamp_ns();
1410
1411 test_clock
1412 .set_timer_ns(
1413 "cancel_test",
1414 1000,
1415 Some(start_time),
1416 None,
1417 None,
1418 None,
1419 None,
1420 )
1421 .unwrap();
1422
1423 assert_eq!(test_clock.timer_count(), 1);
1424
1425 test_clock.cancel_timer("cancel_test");
1427
1428 assert_eq!(test_clock.timer_count(), 0);
1429
1430 let events = test_clock.advance_time(start_time + 2000, true);
1432 assert_eq!(events.len(), 0);
1433 }
1434
1435 #[rstest]
1436 fn test_cancel_all_timers(mut test_clock: TestClock) {
1437 test_clock
1439 .set_timer_ns("timer1", 1000, None, None, None, None, None)
1440 .unwrap();
1441 test_clock
1442 .set_timer_ns("timer2", 1500, None, None, None, None, None)
1443 .unwrap();
1444 test_clock
1445 .set_timer_ns("timer3", 2000, None, None, None, None, None)
1446 .unwrap();
1447
1448 assert_eq!(test_clock.timer_count(), 3);
1449
1450 test_clock.cancel_timers();
1452
1453 assert_eq!(test_clock.timer_count(), 0);
1454
1455 let events = test_clock.advance_time(UnixNanos::from(5000), true);
1457 assert_eq!(events.len(), 0);
1458 }
1459
1460 #[rstest]
1461 fn test_clock_reset_clears_timers(mut test_clock: TestClock) {
1462 test_clock
1463 .set_timer_ns("reset_test", 1000, None, None, None, None, None)
1464 .unwrap();
1465
1466 assert_eq!(test_clock.timer_count(), 1);
1467
1468 test_clock.reset();
1470
1471 assert_eq!(test_clock.timer_count(), 0);
1472 assert_eq!(test_clock.timestamp_ns(), UnixNanos::default()); }
1474
1475 #[rstest]
1476 fn test_set_time_alert_default_impl(mut test_clock: TestClock) {
1477 let current_time = test_clock.utc_now();
1478 let alert_time = current_time + chrono::Duration::seconds(1);
1479
1480 test_clock
1482 .set_time_alert("alert_test", alert_time, None, None)
1483 .unwrap();
1484
1485 assert_eq!(test_clock.timer_count(), 1);
1486 assert_eq!(test_clock.timer_names(), vec!["alert_test"]);
1487
1488 let expected_ns = UnixNanos::from(alert_time);
1490 let next_time = test_clock.next_time_ns("alert_test").unwrap();
1491
1492 let diff = if next_time >= expected_ns {
1494 next_time.as_u64() - expected_ns.as_u64()
1495 } else {
1496 expected_ns.as_u64() - next_time.as_u64()
1497 };
1498 assert!(
1499 diff < 1000,
1500 "Timer should be set within 1 microsecond of expected time"
1501 );
1502 }
1503
1504 #[rstest]
1505 fn test_set_timer_default_impl(mut test_clock: TestClock) {
1506 let current_time = test_clock.utc_now();
1507 let start_time = current_time + chrono::Duration::seconds(1);
1508 let interval = Duration::from_millis(500);
1509
1510 test_clock
1512 .set_timer(
1513 "timer_test",
1514 interval,
1515 Some(start_time),
1516 None,
1517 None,
1518 None,
1519 None,
1520 )
1521 .unwrap();
1522
1523 assert_eq!(test_clock.timer_count(), 1);
1524 assert_eq!(test_clock.timer_names(), vec!["timer_test"]);
1525
1526 let start_ns = UnixNanos::from(start_time);
1528 let interval_ns = interval.as_nanos() as u64;
1529
1530 let events = test_clock.advance_time(start_ns + interval_ns * 3, true);
1531 assert_eq!(events.len(), 3); assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1535 assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1536 assert_eq!(*events[2].ts_event, *start_ns + interval_ns * 3);
1537 }
1538
1539 #[rstest]
1540 fn test_set_timer_with_stop_time_default_impl(mut test_clock: TestClock) {
1541 let current_time = test_clock.utc_now();
1542 let start_time = current_time + chrono::Duration::seconds(1);
1543 let stop_time = current_time + chrono::Duration::seconds(3);
1544 let interval = Duration::from_secs(1);
1545
1546 test_clock
1548 .set_timer(
1549 "timer_with_stop",
1550 interval,
1551 Some(start_time),
1552 Some(stop_time),
1553 None,
1554 None,
1555 None,
1556 )
1557 .unwrap();
1558
1559 assert_eq!(test_clock.timer_count(), 1);
1560
1561 let stop_ns = UnixNanos::from(stop_time);
1563 let events = test_clock.advance_time(stop_ns + 1000, true);
1564
1565 assert_eq!(events.len(), 2);
1567
1568 let start_ns = UnixNanos::from(start_time);
1569 let interval_ns = interval.as_nanos() as u64;
1570 assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1571 assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1572 }
1573
1574 #[rstest]
1575 fn test_set_timer_fire_immediately_default_impl(mut test_clock: TestClock) {
1576 let current_time = test_clock.utc_now();
1577 let start_time = current_time + chrono::Duration::seconds(1);
1578 let interval = Duration::from_millis(500);
1579
1580 test_clock
1582 .set_timer(
1583 "immediate_timer",
1584 interval,
1585 Some(start_time),
1586 None,
1587 None,
1588 None,
1589 Some(true),
1590 )
1591 .unwrap();
1592
1593 let start_ns = UnixNanos::from(start_time);
1594 let interval_ns = interval.as_nanos() as u64;
1595
1596 let events = test_clock.advance_time(start_ns + interval_ns, true);
1598
1599 assert_eq!(events.len(), 2);
1601 assert_eq!(*events[0].ts_event, *start_ns); assert_eq!(*events[1].ts_event, *start_ns + interval_ns); }
1604
1605 #[rstest]
1606 fn test_set_time_alert_when_alert_time_equals_current_time(mut test_clock: TestClock) {
1607 let current_time = test_clock.timestamp_ns();
1608
1609 test_clock
1611 .set_time_alert_ns("alert_at_current_time", current_time, None, None)
1612 .unwrap();
1613
1614 assert_eq!(test_clock.timer_count(), 1);
1615
1616 let events = test_clock.advance_time(current_time, true);
1618
1619 assert_eq!(events.len(), 1);
1621 assert_eq!(events[0].name.as_str(), "alert_at_current_time");
1622 assert_eq!(*events[0].ts_event, *current_time);
1623 }
1624}