1use std::{
19 any::Any,
20 collections::{BTreeMap, BinaryHeap},
21 fmt::Debug,
22 ops::Deref,
23 time::Duration,
24};
25
26use ahash::AHashMap;
27use chrono::{DateTime, Utc};
28use nautilus_core::{
29 AtomicTime, UnixNanos,
30 correctness::{check_positive_u64, check_predicate_true, check_valid_string_utf8},
31};
32use thousands::Separable;
33use ustr::Ustr;
34
35use crate::timer::{
36 ScheduledTimeEvent, TestTimer, TimeEvent, TimeEventCallback, TimeEventHandlerV2,
37 create_valid_interval,
38};
39
40pub trait Clock: Debug + Any {
46 fn utc_now(&self) -> DateTime<Utc> {
48 DateTime::from_timestamp_nanos(self.timestamp_ns().as_i64())
49 }
50
51 fn timestamp_ns(&self) -> UnixNanos;
53
54 fn timestamp_us(&self) -> u64;
56
57 fn timestamp_ms(&self) -> u64;
59
60 fn timestamp(&self) -> f64;
62
63 fn timer_names(&self) -> Vec<&str>;
65
66 fn timer_count(&self) -> usize;
68
69 fn timer_exists(&self, name: &Ustr) -> bool;
71
72 fn register_default_handler(&mut self, callback: TimeEventCallback);
75
76 fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2;
80
81 #[allow(clippy::too_many_arguments)]
95 fn set_time_alert(
96 &mut self,
97 name: &str,
98 alert_time: DateTime<Utc>,
99 callback: Option<TimeEventCallback>,
100 allow_past: Option<bool>,
101 ) -> anyhow::Result<()> {
102 self.set_time_alert_ns(name, alert_time.into(), callback, allow_past)
103 }
104
105 #[allow(clippy::too_many_arguments)]
126 fn set_time_alert_ns(
127 &mut self,
128 name: &str,
129 alert_time_ns: UnixNanos,
130 callback: Option<TimeEventCallback>,
131 allow_past: Option<bool>,
132 ) -> anyhow::Result<()>;
133
134 #[allow(clippy::too_many_arguments)]
150 fn set_timer(
151 &mut self,
152 name: &str,
153 interval: Duration,
154 start_time: Option<DateTime<Utc>>,
155 stop_time: Option<DateTime<Utc>>,
156 callback: Option<TimeEventCallback>,
157 allow_past: Option<bool>,
158 fire_immediately: Option<bool>,
159 ) -> anyhow::Result<()> {
160 self.set_timer_ns(
161 name,
162 interval.as_nanos() as u64,
163 start_time.map(UnixNanos::from),
164 stop_time.map(UnixNanos::from),
165 callback,
166 allow_past,
167 fire_immediately,
168 )
169 }
170
171 #[allow(clippy::too_many_arguments)]
199 fn set_timer_ns(
200 &mut self,
201 name: &str,
202 interval_ns: u64,
203 start_time_ns: Option<UnixNanos>,
204 stop_time_ns: Option<UnixNanos>,
205 callback: Option<TimeEventCallback>,
206 allow_past: Option<bool>,
207 fire_immediately: Option<bool>,
208 ) -> anyhow::Result<()>;
209
210 fn next_time_ns(&self, name: &str) -> Option<UnixNanos>;
214
215 fn cancel_timer(&mut self, name: &str);
217
218 fn cancel_timers(&mut self);
220
221 fn reset(&mut self);
223}
224
225impl dyn Clock {
226 pub fn as_any(&self) -> &dyn std::any::Any {
228 self
229 }
230 pub fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
232 self
233 }
234}
235
236#[derive(Debug, Default)]
241pub struct CallbackRegistry {
242 default_callback: Option<TimeEventCallback>,
243 callbacks: AHashMap<Ustr, TimeEventCallback>,
244}
245
246impl CallbackRegistry {
247 #[must_use]
249 pub fn new() -> Self {
250 Self {
251 default_callback: None,
252 callbacks: AHashMap::new(),
253 }
254 }
255
256 pub fn register_default_handler(&mut self, callback: TimeEventCallback) {
258 self.default_callback = Some(callback);
259 }
260
261 pub fn register_callback(&mut self, name: Ustr, callback: TimeEventCallback) {
263 self.callbacks.insert(name, callback);
264 }
265
266 #[must_use]
268 pub fn has_any_callback(&self, name: &Ustr) -> bool {
269 self.callbacks.contains_key(name) || self.default_callback.is_some()
270 }
271
272 #[must_use]
274 pub fn get_callback(&self, name: &Ustr) -> Option<TimeEventCallback> {
275 self.callbacks
276 .get(name)
277 .cloned()
278 .or_else(|| self.default_callback.clone())
279 }
280
281 #[must_use]
287 pub fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
288 let callback = self
289 .get_callback(&event.name)
290 .unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
291
292 TimeEventHandlerV2::new(event, callback)
293 }
294
295 pub fn clear(&mut self) {
297 self.callbacks.clear();
298 }
299}
300
301pub fn validate_and_prepare_time_alert(
309 name: &str,
310 mut alert_time_ns: UnixNanos,
311 allow_past: Option<bool>,
312 ts_now: UnixNanos,
313) -> anyhow::Result<(Ustr, UnixNanos)> {
314 check_valid_string_utf8(name, stringify!(name))?;
315
316 let name = Ustr::from(name);
317 let allow_past = allow_past.unwrap_or(true);
318
319 if alert_time_ns < ts_now {
320 if allow_past {
321 alert_time_ns = ts_now;
322 log::warn!(
323 "Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
324 alert_time_ns.to_rfc3339(),
325 );
326 } else {
327 anyhow::bail!(
328 "Timer '{name}' alert time {} was in the past (current time is {ts_now})",
329 alert_time_ns.to_rfc3339(),
330 );
331 }
332 }
333
334 Ok((name, alert_time_ns))
335}
336
337pub fn validate_and_prepare_timer(
346 name: &str,
347 interval_ns: u64,
348 start_time_ns: Option<UnixNanos>,
349 stop_time_ns: Option<UnixNanos>,
350 allow_past: Option<bool>,
351 fire_immediately: Option<bool>,
352 ts_now: UnixNanos,
353) -> anyhow::Result<(Ustr, UnixNanos, Option<UnixNanos>, bool, bool)> {
354 check_valid_string_utf8(name, stringify!(name))?;
355 check_positive_u64(interval_ns, stringify!(interval_ns))?;
356
357 let name = Ustr::from(name);
358 let allow_past = allow_past.unwrap_or(true);
359 let fire_immediately = fire_immediately.unwrap_or(false);
360
361 let mut start_time_ns = start_time_ns.unwrap_or_default();
362
363 if start_time_ns == 0 {
364 start_time_ns = ts_now;
366 } else if !allow_past {
367 let next_event_time = if fire_immediately {
368 start_time_ns
369 } else {
370 start_time_ns + interval_ns
371 };
372
373 if next_event_time < ts_now {
374 anyhow::bail!(
375 "Timer '{name}' next event time {} would be in the past (current time is {ts_now})",
376 next_event_time.to_rfc3339(),
377 );
378 }
379 }
380
381 if let Some(stop_time) = stop_time_ns {
382 if stop_time <= start_time_ns {
383 anyhow::bail!(
384 "Timer '{name}' stop time {} must be after start time {}",
385 stop_time.to_rfc3339(),
386 start_time_ns.to_rfc3339(),
387 );
388 }
389 if !allow_past && stop_time <= ts_now {
390 anyhow::bail!(
391 "Timer '{name}' stop time {} is in the past (current time is {ts_now})",
392 stop_time.to_rfc3339(),
393 );
394 }
395 }
396
397 Ok((
398 name,
399 start_time_ns,
400 stop_time_ns,
401 allow_past,
402 fire_immediately,
403 ))
404}
405
406#[derive(Debug)]
414pub struct TestClock {
415 time: AtomicTime,
416 timers: BTreeMap<Ustr, TestTimer>,
418 callbacks: CallbackRegistry,
419 heap: BinaryHeap<ScheduledTimeEvent>, }
421
422impl TestClock {
423 #[must_use]
425 pub fn new() -> Self {
426 Self {
427 time: AtomicTime::new(false, UnixNanos::default()),
428 timers: BTreeMap::new(),
429 callbacks: CallbackRegistry::new(),
430 heap: BinaryHeap::new(),
431 }
432 }
433
434 #[must_use]
436 pub const fn get_timers(&self) -> &BTreeMap<Ustr, TestTimer> {
437 &self.timers
438 }
439
440 pub fn advance_time(&mut self, to_time_ns: UnixNanos, set_time: bool) -> Vec<TimeEvent> {
457 const WARN_TIME_EVENTS_THRESHOLD: usize = 1_000_000;
458
459 let from_time_ns = self.time.get_time_ns();
460
461 assert!(
463 to_time_ns >= from_time_ns,
464 "`to_time_ns` {to_time_ns} was < `from_time_ns` {from_time_ns}"
465 );
466
467 if set_time {
468 self.time.set_time(to_time_ns);
469 }
470
471 let mut events: Vec<TimeEvent> = Vec::new();
473 self.timers.retain(|_, timer| {
474 timer.advance(to_time_ns).for_each(|event| {
475 events.push(event);
476 });
477
478 !timer.is_expired()
479 });
480
481 if events.len() >= WARN_TIME_EVENTS_THRESHOLD {
482 log::warn!(
483 "Allocated {} time events during clock advancement from {} to {}, \
484 consider stopping the timer between large time ranges with no data points",
485 events.len().separate_with_commas(),
486 from_time_ns,
487 to_time_ns
488 );
489 }
490
491 events.sort_by(|a, b| a.ts_event.cmp(&b.ts_event));
492 events
493 }
494
495 pub fn advance_to_time_on_heap(&mut self, to_time_ns: UnixNanos) {
509 const WARN_HEAP_SIZE_THRESHOLD: usize = 100_000;
510
511 let from_time_ns = self.time.get_time_ns();
512
513 assert!(
515 to_time_ns >= from_time_ns,
516 "`to_time_ns` {to_time_ns} was < `from_time_ns` {from_time_ns}"
517 );
518
519 self.time.set_time(to_time_ns);
520
521 if self.heap.len() > WARN_HEAP_SIZE_THRESHOLD {
522 log::warn!(
523 "TestClock heap size {} exceeds recommended limit",
524 self.heap.len()
525 );
526 }
527
528 self.timers.retain(|_, timer| {
530 timer.advance(to_time_ns).for_each(|event| {
531 self.heap.push(ScheduledTimeEvent::new(event));
532 });
533
534 !timer.is_expired()
535 });
536 }
537
538 #[must_use]
548 pub fn match_handlers(&self, events: Vec<TimeEvent>) -> Vec<TimeEventHandlerV2> {
549 events
550 .into_iter()
551 .map(|event| self.callbacks.get_handler(event))
552 .collect()
553 }
554
555 fn replace_existing_timer_if_needed(&mut self, name: &Ustr) {
556 if self.timer_exists(name) {
557 self.cancel_timer(name.as_str());
558 log::warn!("Timer '{name}' replaced");
559 }
560 }
561}
562
563impl Iterator for TestClock {
564 type Item = TimeEventHandlerV2;
565
566 fn next(&mut self) -> Option<Self::Item> {
567 self.heap
568 .pop()
569 .map(|event| self.get_handler(event.into_inner()))
570 }
571}
572
573impl Default for TestClock {
574 fn default() -> Self {
576 Self::new()
577 }
578}
579
580impl Deref for TestClock {
581 type Target = AtomicTime;
582
583 fn deref(&self) -> &Self::Target {
584 &self.time
585 }
586}
587
588impl Clock for TestClock {
589 fn timestamp_ns(&self) -> UnixNanos {
590 self.time.get_time_ns()
591 }
592
593 fn timestamp_us(&self) -> u64 {
594 self.time.get_time_us()
595 }
596
597 fn timestamp_ms(&self) -> u64 {
598 self.time.get_time_ms()
599 }
600
601 fn timestamp(&self) -> f64 {
602 self.time.get_time()
603 }
604
605 fn timer_names(&self) -> Vec<&str> {
606 self.timers
607 .iter()
608 .filter(|(_, timer)| !timer.is_expired())
609 .map(|(k, _)| k.as_str())
610 .collect()
611 }
612
613 fn timer_count(&self) -> usize {
614 self.timers
615 .iter()
616 .filter(|(_, timer)| !timer.is_expired())
617 .count()
618 }
619
620 fn timer_exists(&self, name: &Ustr) -> bool {
621 self.timers.contains_key(name)
622 }
623
624 fn register_default_handler(&mut self, callback: TimeEventCallback) {
625 self.callbacks.register_default_handler(callback);
626 }
627
628 fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
634 self.callbacks.get_handler(event)
635 }
636
637 fn set_time_alert_ns(
638 &mut self,
639 name: &str,
640 alert_time_ns: UnixNanos,
641 callback: Option<TimeEventCallback>,
642 allow_past: Option<bool>,
643 ) -> anyhow::Result<()> {
644 let ts_now = self.get_time_ns();
645 let (name, alert_time_ns) =
646 validate_and_prepare_time_alert(name, alert_time_ns, allow_past, ts_now)?;
647
648 self.replace_existing_timer_if_needed(&name);
649
650 check_predicate_true(
651 callback.is_some() | self.callbacks.has_any_callback(&name),
652 "No callbacks provided",
653 )?;
654
655 if let Some(callback) = callback {
656 self.callbacks.register_callback(name, callback);
657 }
658
659 let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
661 let fire_immediately = alert_time_ns == ts_now;
662
663 let timer = TestTimer::new(
664 name,
665 interval_ns,
666 ts_now,
667 Some(alert_time_ns),
668 fire_immediately,
669 );
670 self.timers.insert(name, timer);
671
672 Ok(())
673 }
674
675 fn set_timer_ns(
676 &mut self,
677 name: &str,
678 interval_ns: u64,
679 start_time_ns: Option<UnixNanos>,
680 stop_time_ns: Option<UnixNanos>,
681 callback: Option<TimeEventCallback>,
682 allow_past: Option<bool>,
683 fire_immediately: Option<bool>,
684 ) -> anyhow::Result<()> {
685 let ts_now = self.get_time_ns();
686 let (name, start_time_ns, stop_time_ns, _allow_past, fire_immediately) =
687 validate_and_prepare_timer(
688 name,
689 interval_ns,
690 start_time_ns,
691 stop_time_ns,
692 allow_past,
693 fire_immediately,
694 ts_now,
695 )?;
696
697 check_predicate_true(
698 callback.is_some() | self.callbacks.has_any_callback(&name),
699 "No callbacks provided",
700 )?;
701
702 self.replace_existing_timer_if_needed(&name);
703
704 if let Some(callback) = callback {
705 self.callbacks.register_callback(name, callback);
706 }
707
708 let interval_ns = create_valid_interval(interval_ns);
709
710 let timer = TestTimer::new(
711 name,
712 interval_ns,
713 start_time_ns,
714 stop_time_ns,
715 fire_immediately,
716 );
717 self.timers.insert(name, timer);
718
719 Ok(())
720 }
721
722 fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
723 self.timers
724 .get(&Ustr::from(name))
725 .map(|timer| timer.next_time_ns())
726 }
727
728 fn cancel_timer(&mut self, name: &str) {
729 let timer = self.timers.remove(&Ustr::from(name));
730 if let Some(mut timer) = timer {
731 timer.cancel();
732 }
733 }
734
735 fn cancel_timers(&mut self) {
736 for timer in &mut self.timers.values_mut() {
737 timer.cancel();
738 }
739
740 self.timers.clear();
741 }
742
743 fn reset(&mut self) {
744 self.time = AtomicTime::new(false, UnixNanos::default());
745 self.timers = BTreeMap::new();
746 self.heap = BinaryHeap::new();
747 self.callbacks.clear();
748 }
749}
750
751#[cfg(test)]
752mod tests {
753 use std::{
754 sync::{Arc, Mutex},
755 time::Duration,
756 };
757
758 use nautilus_core::{MUTEX_POISONED, UnixNanos};
759 use rstest::{fixture, rstest};
760 use ustr::Ustr;
761
762 use super::*;
763 use crate::timer::{TimeEvent, TimeEventCallback};
764
765 #[derive(Debug, Default)]
766 struct TestCallback {
767 called: Arc<Mutex<bool>>,
769 }
770
771 impl TestCallback {
772 fn new(called: Arc<Mutex<bool>>) -> Self {
773 Self { called }
774 }
775 }
776
777 impl From<TestCallback> for TimeEventCallback {
778 fn from(callback: TestCallback) -> Self {
779 Self::from(move |_event: TimeEvent| {
780 if let Ok(mut called) = callback.called.lock() {
781 *called = true;
782 }
783 })
784 }
785 }
786
787 #[fixture]
788 pub fn test_clock() -> TestClock {
789 let mut clock = TestClock::new();
790 clock.register_default_handler(TestCallback::default().into());
791 clock
792 }
793
794 #[rstest]
795 fn test_time_monotonicity(mut test_clock: TestClock) {
796 let initial_time = test_clock.timestamp_ns();
797 test_clock.advance_time(UnixNanos::from(*initial_time + 1000), true);
798 assert!(test_clock.timestamp_ns() > initial_time);
799 }
800
801 #[rstest]
802 fn test_timer_registration(mut test_clock: TestClock) {
803 test_clock
804 .set_time_alert_ns(
805 "test_timer",
806 (*test_clock.timestamp_ns() + 1000).into(),
807 None,
808 None,
809 )
810 .unwrap();
811 assert_eq!(test_clock.timer_count(), 1);
812 assert_eq!(test_clock.timer_names(), vec!["test_timer"]);
813 }
814
815 #[rstest]
816 fn test_timer_expiration(mut test_clock: TestClock) {
817 let alert_time = (*test_clock.timestamp_ns() + 1000).into();
818 test_clock
819 .set_time_alert_ns("test_timer", alert_time, None, None)
820 .unwrap();
821 let events = test_clock.advance_time(alert_time, true);
822 assert_eq!(events.len(), 1);
823 assert_eq!(events[0].name.as_str(), "test_timer");
824 }
825
826 #[rstest]
827 fn test_timer_cancellation(mut test_clock: TestClock) {
828 test_clock
829 .set_time_alert_ns(
830 "test_timer",
831 (*test_clock.timestamp_ns() + 1000).into(),
832 None,
833 None,
834 )
835 .unwrap();
836 assert_eq!(test_clock.timer_count(), 1);
837 test_clock.cancel_timer("test_timer");
838 assert_eq!(test_clock.timer_count(), 0);
839 }
840
841 #[rstest]
842 fn test_time_advancement(mut test_clock: TestClock) {
843 let start_time = test_clock.timestamp_ns();
844 test_clock
845 .set_timer_ns("test_timer", 1000, Some(start_time), None, None, None, None)
846 .unwrap();
847 let events = test_clock.advance_time(UnixNanos::from(*start_time + 2500), true);
848 assert_eq!(events.len(), 2);
849 assert_eq!(*events[0].ts_event, *start_time + 1000);
850 assert_eq!(*events[1].ts_event, *start_time + 2000);
851 }
852
853 #[rstest]
854 fn test_default_and_custom_callbacks() {
855 let mut clock = TestClock::new();
856 let default_called = Arc::new(Mutex::new(false));
857 let custom_called = Arc::new(Mutex::new(false));
858
859 let default_callback = TestCallback::new(Arc::clone(&default_called));
860 let custom_callback = TestCallback::new(Arc::clone(&custom_called));
861
862 clock.register_default_handler(TimeEventCallback::from(default_callback));
863 clock
864 .set_time_alert_ns(
865 "default_timer",
866 (*clock.timestamp_ns() + 1000).into(),
867 None,
868 None,
869 )
870 .unwrap();
871 clock
872 .set_time_alert_ns(
873 "custom_timer",
874 (*clock.timestamp_ns() + 1000).into(),
875 Some(TimeEventCallback::from(custom_callback)),
876 None,
877 )
878 .unwrap();
879
880 let events = clock.advance_time(UnixNanos::from(*clock.timestamp_ns() + 1000), true);
881 let handlers = clock.match_handlers(events);
882
883 for handler in handlers {
884 handler.callback.call(handler.event);
885 }
886
887 assert!(*default_called.lock().expect(MUTEX_POISONED));
888 assert!(*custom_called.lock().expect(MUTEX_POISONED));
889 }
890
891 #[rstest]
892 fn test_multiple_timers(mut test_clock: TestClock) {
893 let start_time = test_clock.timestamp_ns();
894 test_clock
895 .set_timer_ns("timer1", 1000, Some(start_time), None, None, None, None)
896 .unwrap();
897 test_clock
898 .set_timer_ns("timer2", 2000, Some(start_time), None, None, None, None)
899 .unwrap();
900 let events = test_clock.advance_time(UnixNanos::from(*start_time + 2000), true);
901 assert_eq!(events.len(), 3);
902 assert_eq!(events[0].name.as_str(), "timer1");
903 assert_eq!(events[1].name.as_str(), "timer1");
904 assert_eq!(events[2].name.as_str(), "timer2");
905 }
906
907 #[rstest]
908 fn test_allow_past_parameter_true(mut test_clock: TestClock) {
909 test_clock.set_time(UnixNanos::from(2000));
910 let current_time = test_clock.timestamp_ns();
911 let past_time = UnixNanos::from(current_time.as_u64() - 1000);
912
913 test_clock
915 .set_time_alert_ns("past_timer", past_time, None, Some(true))
916 .unwrap();
917
918 assert_eq!(test_clock.timer_count(), 1);
920 assert_eq!(test_clock.timer_names(), vec!["past_timer"]);
921
922 let next_time = test_clock.next_time_ns("past_timer").unwrap();
924 assert!(next_time >= current_time);
925 }
926
927 #[rstest]
928 fn test_allow_past_parameter_false(mut test_clock: TestClock) {
929 test_clock.set_time(UnixNanos::from(2000));
930 let current_time = test_clock.timestamp_ns();
931 let past_time = current_time - 1000;
932
933 let result = test_clock.set_time_alert_ns("past_timer", past_time, None, Some(false));
935
936 assert!(result.is_err());
938 assert!(format!("{}", result.unwrap_err()).contains("was in the past"));
939
940 assert_eq!(test_clock.timer_count(), 0);
942 assert!(test_clock.timer_names().is_empty());
943 }
944
945 #[rstest]
946 fn test_invalid_stop_time_validation(mut test_clock: TestClock) {
947 test_clock.set_time(UnixNanos::from(2000));
948 let current_time = test_clock.timestamp_ns();
949 let start_time = current_time + 1000;
950 let stop_time = current_time + 500; let result = test_clock.set_timer_ns(
954 "invalid_timer",
955 100,
956 Some(start_time),
957 Some(stop_time),
958 None,
959 None,
960 None,
961 );
962
963 assert!(result.is_err());
965 assert!(format!("{}", result.unwrap_err()).contains("must be after start time"));
966
967 assert_eq!(test_clock.timer_count(), 0);
969 }
970
971 #[rstest]
972 fn test_set_timer_ns_fire_immediately_true(mut test_clock: TestClock) {
973 let start_time = test_clock.timestamp_ns();
974 let interval_ns = 1000;
975
976 test_clock
977 .set_timer_ns(
978 "fire_immediately_timer",
979 interval_ns,
980 Some(start_time),
981 None,
982 None,
983 None,
984 Some(true),
985 )
986 .unwrap();
987
988 let events = test_clock.advance_time(start_time + 2500, true);
990
991 assert_eq!(events.len(), 3);
993 assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *start_time + 1000); assert_eq!(*events[2].ts_event, *start_time + 2000); }
997
998 #[rstest]
999 fn test_set_timer_ns_fire_immediately_false(mut test_clock: TestClock) {
1000 let start_time = test_clock.timestamp_ns();
1001 let interval_ns = 1000;
1002
1003 test_clock
1004 .set_timer_ns(
1005 "normal_timer",
1006 interval_ns,
1007 Some(start_time),
1008 None,
1009 None,
1010 None,
1011 Some(false),
1012 )
1013 .unwrap();
1014
1015 let events = test_clock.advance_time(start_time + 2500, true);
1017
1018 assert_eq!(events.len(), 2);
1020 assert_eq!(*events[0].ts_event, *start_time + 1000); assert_eq!(*events[1].ts_event, *start_time + 2000); }
1023
1024 #[rstest]
1025 fn test_set_timer_ns_fire_immediately_default_is_false(mut test_clock: TestClock) {
1026 let start_time = test_clock.timestamp_ns();
1027 let interval_ns = 1000;
1028
1029 test_clock
1031 .set_timer_ns(
1032 "default_timer",
1033 interval_ns,
1034 Some(start_time),
1035 None,
1036 None,
1037 None,
1038 None,
1039 )
1040 .unwrap();
1041
1042 let events = test_clock.advance_time(start_time + 1500, true);
1043
1044 assert_eq!(events.len(), 1);
1046 assert_eq!(*events[0].ts_event, *start_time + 1000); }
1048
1049 #[rstest]
1050 fn test_set_timer_ns_fire_immediately_with_zero_start_time(mut test_clock: TestClock) {
1051 test_clock.set_time(5000.into());
1052 let interval_ns = 1000;
1053
1054 test_clock
1055 .set_timer_ns(
1056 "zero_start_timer",
1057 interval_ns,
1058 None,
1059 None,
1060 None,
1061 None,
1062 Some(true),
1063 )
1064 .unwrap();
1065
1066 let events = test_clock.advance_time(UnixNanos::from(7000), true);
1067
1068 assert_eq!(events.len(), 3);
1071 assert_eq!(*events[0].ts_event, 5000); assert_eq!(*events[1].ts_event, 6000);
1073 assert_eq!(*events[2].ts_event, 7000);
1074 }
1075
1076 #[rstest]
1077 fn test_multiple_timers_different_fire_immediately_settings(mut test_clock: TestClock) {
1078 let start_time = test_clock.timestamp_ns();
1079 let interval_ns = 1000;
1080
1081 test_clock
1083 .set_timer_ns(
1084 "immediate_timer",
1085 interval_ns,
1086 Some(start_time),
1087 None,
1088 None,
1089 None,
1090 Some(true),
1091 )
1092 .unwrap();
1093
1094 test_clock
1096 .set_timer_ns(
1097 "normal_timer",
1098 interval_ns,
1099 Some(start_time),
1100 None,
1101 None,
1102 None,
1103 Some(false),
1104 )
1105 .unwrap();
1106
1107 let events = test_clock.advance_time(start_time + 1500, true);
1108
1109 assert_eq!(events.len(), 3);
1111
1112 let mut event_times: Vec<u64> = events.iter().map(|e| e.ts_event.as_u64()).collect();
1114 event_times.sort();
1115
1116 assert_eq!(event_times[0], start_time.as_u64()); assert_eq!(event_times[1], start_time.as_u64() + 1000); assert_eq!(event_times[2], start_time.as_u64() + 1000); }
1120
1121 #[rstest]
1122 fn test_timer_name_collision_overwrites(mut test_clock: TestClock) {
1123 let start_time = test_clock.timestamp_ns();
1124
1125 test_clock
1127 .set_timer_ns(
1128 "collision_timer",
1129 1000,
1130 Some(start_time),
1131 None,
1132 None,
1133 None,
1134 None,
1135 )
1136 .unwrap();
1137
1138 let result = test_clock.set_timer_ns(
1140 "collision_timer",
1141 2000,
1142 Some(start_time),
1143 None,
1144 None,
1145 None,
1146 None,
1147 );
1148
1149 assert!(result.is_ok());
1150 assert_eq!(test_clock.timer_count(), 1);
1152
1153 let next_time = test_clock.next_time_ns("collision_timer").unwrap();
1155 assert_eq!(next_time, start_time + 2000);
1157 }
1158
1159 #[rstest]
1160 fn test_timer_zero_interval_error(mut test_clock: TestClock) {
1161 let start_time = test_clock.timestamp_ns();
1162
1163 let result =
1165 test_clock.set_timer_ns("zero_interval", 0, Some(start_time), None, None, None, None);
1166
1167 assert!(result.is_err());
1168 assert_eq!(test_clock.timer_count(), 0);
1169 }
1170
1171 #[rstest]
1172 fn test_timer_empty_name_error(mut test_clock: TestClock) {
1173 let start_time = test_clock.timestamp_ns();
1174
1175 let result = test_clock.set_timer_ns("", 1000, Some(start_time), None, None, None, None);
1177
1178 assert!(result.is_err());
1179 assert_eq!(test_clock.timer_count(), 0);
1180 }
1181
1182 #[rstest]
1183 fn test_timer_exists(mut test_clock: TestClock) {
1184 let name = Ustr::from("exists_timer");
1185 assert!(!test_clock.timer_exists(&name));
1186
1187 test_clock
1188 .set_time_alert_ns(
1189 name.as_str(),
1190 (*test_clock.timestamp_ns() + 1_000).into(),
1191 None,
1192 None,
1193 )
1194 .unwrap();
1195
1196 assert!(test_clock.timer_exists(&name));
1197 }
1198
1199 #[rstest]
1200 fn test_timer_rejects_past_stop_time_when_not_allowed(mut test_clock: TestClock) {
1201 test_clock.set_time(UnixNanos::from(10_000));
1202 let current = test_clock.timestamp_ns();
1203
1204 let result = test_clock.set_timer_ns(
1205 "past_stop",
1206 10_000,
1207 Some(current - 500),
1208 Some(current - 100),
1209 None,
1210 Some(false),
1211 None,
1212 );
1213
1214 let err = result.expect_err("expected stop time validation error");
1215 let err_msg = err.to_string();
1216 assert!(err_msg.contains("stop time"));
1217 assert!(err_msg.contains("in the past"));
1218 }
1219
1220 #[rstest]
1221 fn test_timer_accepts_future_stop_time(mut test_clock: TestClock) {
1222 let current = test_clock.timestamp_ns();
1223
1224 let result = test_clock.set_timer_ns(
1225 "future_stop",
1226 1_000,
1227 Some(current),
1228 Some(current + 10_000),
1229 None,
1230 Some(false),
1231 None,
1232 );
1233
1234 assert!(result.is_ok());
1235 }
1236
1237 #[rstest]
1238 fn test_timer_fire_immediately_at_exact_stop_time(mut test_clock: TestClock) {
1239 let start_time = test_clock.timestamp_ns();
1240 let interval_ns = 1000;
1241 let stop_time = start_time + interval_ns; test_clock
1244 .set_timer_ns(
1245 "exact_stop",
1246 interval_ns,
1247 Some(start_time),
1248 Some(stop_time),
1249 None,
1250 None,
1251 Some(true),
1252 )
1253 .unwrap();
1254
1255 let events = test_clock.advance_time(stop_time, true);
1256
1257 assert_eq!(events.len(), 2);
1259 assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *stop_time); }
1262
1263 #[rstest]
1264 fn test_timer_advance_to_exact_next_time(mut test_clock: TestClock) {
1265 let start_time = test_clock.timestamp_ns();
1266 let interval_ns = 1000;
1267
1268 test_clock
1269 .set_timer_ns(
1270 "exact_advance",
1271 interval_ns,
1272 Some(start_time),
1273 None,
1274 None,
1275 None,
1276 Some(false),
1277 )
1278 .unwrap();
1279
1280 let next_time = test_clock.next_time_ns("exact_advance").unwrap();
1282 let events = test_clock.advance_time(next_time, true);
1283
1284 assert_eq!(events.len(), 1);
1285 assert_eq!(*events[0].ts_event, *next_time);
1286 }
1287
1288 #[rstest]
1289 fn test_allow_past_bar_aggregation_use_case(mut test_clock: TestClock) {
1290 test_clock.set_time(UnixNanos::from(100_500)); let bar_start_time = UnixNanos::from(100_000); let interval_ns = 1000; let result = test_clock.set_timer_ns(
1300 "bar_timer",
1301 interval_ns,
1302 Some(bar_start_time),
1303 None,
1304 None,
1305 Some(false), Some(false), );
1308
1309 assert!(result.is_ok());
1311 assert_eq!(test_clock.timer_count(), 1);
1312
1313 let next_time = test_clock.next_time_ns("bar_timer").unwrap();
1315 assert_eq!(*next_time, 101_000);
1316 }
1317
1318 #[rstest]
1319 fn test_allow_past_false_rejects_when_next_event_in_past(mut test_clock: TestClock) {
1320 test_clock.set_time(UnixNanos::from(102_000)); let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000; let result = test_clock.set_timer_ns(
1329 "past_event_timer",
1330 interval_ns,
1331 Some(past_start_time),
1332 None,
1333 None,
1334 Some(false), Some(false), );
1337
1338 assert!(result.is_err());
1340 assert!(
1341 result
1342 .unwrap_err()
1343 .to_string()
1344 .contains("would be in the past")
1345 );
1346 }
1347
1348 #[rstest]
1349 fn test_allow_past_false_with_fire_immediately_true(mut test_clock: TestClock) {
1350 test_clock.set_time(UnixNanos::from(100_500)); let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000;
1354
1355 let result = test_clock.set_timer_ns(
1358 "immediate_past_timer",
1359 interval_ns,
1360 Some(past_start_time),
1361 None,
1362 None,
1363 Some(false), Some(true), );
1366
1367 assert!(result.is_err());
1369 assert!(
1370 result
1371 .unwrap_err()
1372 .to_string()
1373 .contains("would be in the past")
1374 );
1375 }
1376
1377 #[rstest]
1378 fn test_cancel_timer_during_execution(mut test_clock: TestClock) {
1379 let start_time = test_clock.timestamp_ns();
1380
1381 test_clock
1382 .set_timer_ns(
1383 "cancel_test",
1384 1000,
1385 Some(start_time),
1386 None,
1387 None,
1388 None,
1389 None,
1390 )
1391 .unwrap();
1392
1393 assert_eq!(test_clock.timer_count(), 1);
1394
1395 test_clock.cancel_timer("cancel_test");
1397
1398 assert_eq!(test_clock.timer_count(), 0);
1399
1400 let events = test_clock.advance_time(start_time + 2000, true);
1402 assert_eq!(events.len(), 0);
1403 }
1404
1405 #[rstest]
1406 fn test_cancel_all_timers(mut test_clock: TestClock) {
1407 test_clock
1409 .set_timer_ns("timer1", 1000, None, None, None, None, None)
1410 .unwrap();
1411 test_clock
1412 .set_timer_ns("timer2", 1500, None, None, None, None, None)
1413 .unwrap();
1414 test_clock
1415 .set_timer_ns("timer3", 2000, None, None, None, None, None)
1416 .unwrap();
1417
1418 assert_eq!(test_clock.timer_count(), 3);
1419
1420 test_clock.cancel_timers();
1422
1423 assert_eq!(test_clock.timer_count(), 0);
1424
1425 let events = test_clock.advance_time(UnixNanos::from(5000), true);
1427 assert_eq!(events.len(), 0);
1428 }
1429
1430 #[rstest]
1431 fn test_clock_reset_clears_timers(mut test_clock: TestClock) {
1432 test_clock
1433 .set_timer_ns("reset_test", 1000, None, None, None, None, None)
1434 .unwrap();
1435
1436 assert_eq!(test_clock.timer_count(), 1);
1437
1438 test_clock.reset();
1440
1441 assert_eq!(test_clock.timer_count(), 0);
1442 assert_eq!(test_clock.timestamp_ns(), UnixNanos::default()); }
1444
1445 #[rstest]
1446 fn test_set_time_alert_default_impl(mut test_clock: TestClock) {
1447 let current_time = test_clock.utc_now();
1448 let alert_time = current_time + chrono::Duration::seconds(1);
1449
1450 test_clock
1452 .set_time_alert("alert_test", alert_time, None, None)
1453 .unwrap();
1454
1455 assert_eq!(test_clock.timer_count(), 1);
1456 assert_eq!(test_clock.timer_names(), vec!["alert_test"]);
1457
1458 let expected_ns = UnixNanos::from(alert_time);
1460 let next_time = test_clock.next_time_ns("alert_test").unwrap();
1461
1462 let diff = if next_time >= expected_ns {
1464 next_time.as_u64() - expected_ns.as_u64()
1465 } else {
1466 expected_ns.as_u64() - next_time.as_u64()
1467 };
1468 assert!(
1469 diff < 1000,
1470 "Timer should be set within 1 microsecond of expected time"
1471 );
1472 }
1473
1474 #[rstest]
1475 fn test_set_timer_default_impl(mut test_clock: TestClock) {
1476 let current_time = test_clock.utc_now();
1477 let start_time = current_time + chrono::Duration::seconds(1);
1478 let interval = Duration::from_millis(500);
1479
1480 test_clock
1482 .set_timer(
1483 "timer_test",
1484 interval,
1485 Some(start_time),
1486 None,
1487 None,
1488 None,
1489 None,
1490 )
1491 .unwrap();
1492
1493 assert_eq!(test_clock.timer_count(), 1);
1494 assert_eq!(test_clock.timer_names(), vec!["timer_test"]);
1495
1496 let start_ns = UnixNanos::from(start_time);
1498 let interval_ns = interval.as_nanos() as u64;
1499
1500 let events = test_clock.advance_time(start_ns + interval_ns * 3, true);
1501 assert_eq!(events.len(), 3); assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1505 assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1506 assert_eq!(*events[2].ts_event, *start_ns + interval_ns * 3);
1507 }
1508
1509 #[rstest]
1510 fn test_set_timer_with_stop_time_default_impl(mut test_clock: TestClock) {
1511 let current_time = test_clock.utc_now();
1512 let start_time = current_time + chrono::Duration::seconds(1);
1513 let stop_time = current_time + chrono::Duration::seconds(3);
1514 let interval = Duration::from_secs(1);
1515
1516 test_clock
1518 .set_timer(
1519 "timer_with_stop",
1520 interval,
1521 Some(start_time),
1522 Some(stop_time),
1523 None,
1524 None,
1525 None,
1526 )
1527 .unwrap();
1528
1529 assert_eq!(test_clock.timer_count(), 1);
1530
1531 let stop_ns = UnixNanos::from(stop_time);
1533 let events = test_clock.advance_time(stop_ns + 1000, true);
1534
1535 assert_eq!(events.len(), 2);
1537
1538 let start_ns = UnixNanos::from(start_time);
1539 let interval_ns = interval.as_nanos() as u64;
1540 assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1541 assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1542 }
1543
1544 #[rstest]
1545 fn test_set_timer_fire_immediately_default_impl(mut test_clock: TestClock) {
1546 let current_time = test_clock.utc_now();
1547 let start_time = current_time + chrono::Duration::seconds(1);
1548 let interval = Duration::from_millis(500);
1549
1550 test_clock
1552 .set_timer(
1553 "immediate_timer",
1554 interval,
1555 Some(start_time),
1556 None,
1557 None,
1558 None,
1559 Some(true),
1560 )
1561 .unwrap();
1562
1563 let start_ns = UnixNanos::from(start_time);
1564 let interval_ns = interval.as_nanos() as u64;
1565
1566 let events = test_clock.advance_time(start_ns + interval_ns, true);
1568
1569 assert_eq!(events.len(), 2);
1571 assert_eq!(*events[0].ts_event, *start_ns); assert_eq!(*events[1].ts_event, *start_ns + interval_ns); }
1574
1575 #[rstest]
1576 fn test_set_time_alert_when_alert_time_equals_current_time(mut test_clock: TestClock) {
1577 let current_time = test_clock.timestamp_ns();
1578
1579 test_clock
1581 .set_time_alert_ns("alert_at_current_time", current_time, None, None)
1582 .unwrap();
1583
1584 assert_eq!(test_clock.timer_count(), 1);
1585
1586 let events = test_clock.advance_time(current_time, true);
1588
1589 assert_eq!(events.len(), 1);
1591 assert_eq!(events[0].name.as_str(), "alert_at_current_time");
1592 assert_eq!(*events[0].ts_event, *current_time);
1593 }
1594}