1use std::{
19 collections::{BTreeMap, BinaryHeap, HashMap},
20 fmt::Debug,
21 ops::Deref,
22 pin::Pin,
23 sync::Arc,
24 task::{Context, Poll},
25 time::Duration,
26};
27
28use chrono::{DateTime, Utc};
29use futures::Stream;
30use nautilus_core::{
31 AtomicTime, UnixNanos,
32 correctness::{check_positive_u64, check_predicate_true, check_valid_string_ascii},
33 time::get_atomic_clock_realtime,
34};
35use thousands::Separable;
36use tokio::sync::Mutex;
37use ustr::Ustr;
38
39use crate::{
40 runner::{TimeEventSender, get_time_event_sender},
41 timer::{
42 LiveTimer, ScheduledTimeEvent, TestTimer, TimeEvent, TimeEventCallback, TimeEventHandlerV2,
43 create_valid_interval,
44 },
45};
46
47pub trait Clock: Debug {
53 fn utc_now(&self) -> DateTime<Utc> {
55 DateTime::from_timestamp_nanos(self.timestamp_ns().as_i64())
56 }
57
58 fn timestamp_ns(&self) -> UnixNanos;
60
61 fn timestamp_us(&self) -> u64;
63
64 fn timestamp_ms(&self) -> u64;
66
67 fn timestamp(&self) -> f64;
69
70 fn timer_names(&self) -> Vec<&str>;
72
73 fn timer_count(&self) -> usize;
75
76 fn timer_exists(&self, name: &Ustr) -> bool;
78
79 fn register_default_handler(&mut self, callback: TimeEventCallback);
82
83 fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2;
87
88 #[allow(clippy::too_many_arguments)]
102 fn set_time_alert(
103 &mut self,
104 name: &str,
105 alert_time: DateTime<Utc>,
106 callback: Option<TimeEventCallback>,
107 allow_past: Option<bool>,
108 ) -> anyhow::Result<()> {
109 self.set_time_alert_ns(name, alert_time.into(), callback, allow_past)
110 }
111
112 #[allow(clippy::too_many_arguments)]
133 fn set_time_alert_ns(
134 &mut self,
135 name: &str,
136 alert_time_ns: UnixNanos,
137 callback: Option<TimeEventCallback>,
138 allow_past: Option<bool>,
139 ) -> anyhow::Result<()>;
140
141 #[allow(clippy::too_many_arguments)]
157 fn set_timer(
158 &mut self,
159 name: &str,
160 interval: Duration,
161 start_time: Option<DateTime<Utc>>,
162 stop_time: Option<DateTime<Utc>>,
163 callback: Option<TimeEventCallback>,
164 allow_past: Option<bool>,
165 fire_immediately: Option<bool>,
166 ) -> anyhow::Result<()> {
167 self.set_timer_ns(
168 name,
169 interval.as_nanos() as u64,
170 start_time.map(UnixNanos::from),
171 stop_time.map(UnixNanos::from),
172 callback,
173 allow_past,
174 fire_immediately,
175 )
176 }
177
178 #[allow(clippy::too_many_arguments)]
206 fn set_timer_ns(
207 &mut self,
208 name: &str,
209 interval_ns: u64,
210 start_time_ns: Option<UnixNanos>,
211 stop_time_ns: Option<UnixNanos>,
212 callback: Option<TimeEventCallback>,
213 allow_past: Option<bool>,
214 fire_immediately: Option<bool>,
215 ) -> anyhow::Result<()>;
216
217 fn next_time_ns(&self, name: &str) -> Option<UnixNanos>;
221
222 fn cancel_timer(&mut self, name: &str);
224
225 fn cancel_timers(&mut self);
227
228 fn reset(&mut self);
230}
231
232#[derive(Debug)]
240pub struct TestClock {
241 time: AtomicTime,
242 timers: BTreeMap<Ustr, TestTimer>,
244 default_callback: Option<TimeEventCallback>,
245 callbacks: HashMap<Ustr, TimeEventCallback>,
246 heap: BinaryHeap<ScheduledTimeEvent>, }
248
249impl TestClock {
250 #[must_use]
252 pub fn new() -> Self {
253 Self {
254 time: AtomicTime::new(false, UnixNanos::default()),
255 timers: BTreeMap::new(),
256 default_callback: None,
257 callbacks: HashMap::new(),
258 heap: BinaryHeap::new(),
259 }
260 }
261
262 #[must_use]
264 pub const fn get_timers(&self) -> &BTreeMap<Ustr, TestTimer> {
265 &self.timers
266 }
267
268 pub fn advance_time(&mut self, to_time_ns: UnixNanos, set_time: bool) -> Vec<TimeEvent> {
285 const WARN_TIME_EVENTS_THRESHOLD: usize = 1_000_000;
286
287 let from_time_ns = self.time.get_time_ns();
288
289 assert!(
291 to_time_ns >= from_time_ns,
292 "`to_time_ns` {to_time_ns} was < `from_time_ns` {}",
293 from_time_ns
294 );
295
296 if set_time {
297 self.time.set_time(to_time_ns);
298 }
299
300 let mut events: Vec<TimeEvent> = Vec::new();
302 self.timers.retain(|_, timer| {
303 timer.advance(to_time_ns).for_each(|event| {
304 events.push(event);
305 });
306
307 !timer.is_expired()
308 });
309
310 if events.len() >= WARN_TIME_EVENTS_THRESHOLD {
311 log::warn!(
312 "Allocated {} time events during clock advancement from {} to {}, \
313 consider stopping the timer between large time ranges with no data points",
314 events.len().separate_with_commas(),
315 from_time_ns,
316 to_time_ns
317 );
318 }
319
320 events.sort_by(|a, b| a.ts_event.cmp(&b.ts_event));
321 events
322 }
323
324 pub fn advance_to_time_on_heap(&mut self, to_time_ns: UnixNanos) {
338 const WARN_HEAP_SIZE_THRESHOLD: usize = 100_000;
339
340 let from_time_ns = self.time.get_time_ns();
341
342 assert!(
344 to_time_ns >= from_time_ns,
345 "`to_time_ns` {to_time_ns} was < `from_time_ns` {}",
346 from_time_ns
347 );
348
349 self.time.set_time(to_time_ns);
350
351 if self.heap.len() > WARN_HEAP_SIZE_THRESHOLD {
352 log::warn!(
353 "TestClock heap size {} exceeds recommended limit",
354 self.heap.len()
355 );
356 }
357
358 self.timers.retain(|_, timer| {
360 timer.advance(to_time_ns).for_each(|event| {
361 self.heap.push(ScheduledTimeEvent::new(event));
362 });
363
364 !timer.is_expired()
365 });
366 }
367
368 #[must_use]
378 pub fn match_handlers(&self, events: Vec<TimeEvent>) -> Vec<TimeEventHandlerV2> {
379 events
380 .into_iter()
381 .map(|event| {
382 let callback = self.callbacks.get(&event.name).cloned().unwrap_or_else(|| {
383 self.default_callback
386 .clone()
387 .expect("Default callback should exist")
388 });
389 TimeEventHandlerV2::new(event, callback)
390 })
391 .collect()
392 }
393}
394
395impl Iterator for TestClock {
396 type Item = TimeEventHandlerV2;
397
398 fn next(&mut self) -> Option<Self::Item> {
399 self.heap
400 .pop()
401 .map(|event| self.get_handler(event.into_inner()))
402 }
403}
404
405impl Default for TestClock {
406 fn default() -> Self {
408 Self::new()
409 }
410}
411
412impl Deref for TestClock {
413 type Target = AtomicTime;
414
415 fn deref(&self) -> &Self::Target {
416 &self.time
417 }
418}
419
420impl Clock for TestClock {
421 fn timestamp_ns(&self) -> UnixNanos {
422 self.time.get_time_ns()
423 }
424
425 fn timestamp_us(&self) -> u64 {
426 self.time.get_time_us()
427 }
428
429 fn timestamp_ms(&self) -> u64 {
430 self.time.get_time_ms()
431 }
432
433 fn timestamp(&self) -> f64 {
434 self.time.get_time()
435 }
436
437 fn timer_names(&self) -> Vec<&str> {
438 self.timers
439 .iter()
440 .filter(|(_, timer)| !timer.is_expired())
441 .map(|(k, _)| k.as_str())
442 .collect()
443 }
444
445 fn timer_count(&self) -> usize {
446 self.timers
447 .iter()
448 .filter(|(_, timer)| !timer.is_expired())
449 .count()
450 }
451
452 fn timer_exists(&self, name: &Ustr) -> bool {
453 self.timers.contains_key(name)
454 }
455
456 fn register_default_handler(&mut self, callback: TimeEventCallback) {
457 self.default_callback = Some(callback);
458 }
459
460 fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
466 let callback = self
468 .callbacks
469 .get(&event.name)
470 .cloned()
471 .or_else(|| self.default_callback.clone())
472 .unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
473
474 TimeEventHandlerV2::new(event, callback)
475 }
476
477 fn set_time_alert_ns(
478 &mut self,
479 name: &str,
480 mut alert_time_ns: UnixNanos, callback: Option<TimeEventCallback>,
482 allow_past: Option<bool>,
483 ) -> anyhow::Result<()> {
484 check_valid_string_ascii(name, stringify!(name))?;
485
486 let name = Ustr::from(name);
487 let allow_past = allow_past.unwrap_or(true);
488
489 if self.timer_exists(&name) {
490 self.cancel_timer(name.as_str());
491 log::warn!("Timer '{name}' replaced");
492 }
493
494 check_predicate_true(
495 callback.is_some()
496 | self.callbacks.contains_key(&name)
497 | self.default_callback.is_some(),
498 "No callbacks provided",
499 )?;
500
501 match callback {
502 Some(callback_py) => self.callbacks.insert(name, callback_py),
503 None => None,
504 };
505
506 let ts_now = self.get_time_ns();
507
508 if alert_time_ns < ts_now {
509 if allow_past {
510 alert_time_ns = ts_now;
511 log::warn!(
512 "Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
513 alert_time_ns.to_rfc3339(),
514 );
515 } else {
516 anyhow::bail!(
517 "Timer '{name}' alert time {} was in the past (current time is {})",
518 alert_time_ns.to_rfc3339(),
519 ts_now.to_rfc3339(),
520 );
521 }
522 }
523
524 let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
526 let fire_immediately = alert_time_ns == ts_now;
528
529 let timer = TestTimer::new(
530 name,
531 interval_ns,
532 ts_now,
533 Some(alert_time_ns),
534 fire_immediately,
535 );
536 self.timers.insert(name, timer);
537
538 Ok(())
539 }
540
541 fn set_timer_ns(
542 &mut self,
543 name: &str,
544 interval_ns: u64,
545 start_time_ns: Option<UnixNanos>,
546 stop_time_ns: Option<UnixNanos>,
547 callback: Option<TimeEventCallback>,
548 allow_past: Option<bool>,
549 fire_immediately: Option<bool>,
550 ) -> anyhow::Result<()> {
551 check_valid_string_ascii(name, stringify!(name))?;
552 check_positive_u64(interval_ns, stringify!(interval_ns))?;
553 check_predicate_true(
554 callback.is_some() | self.default_callback.is_some(),
555 "No callbacks provided",
556 )?;
557
558 let name = Ustr::from(name);
559 let allow_past = allow_past.unwrap_or(true);
560 let fire_immediately = fire_immediately.unwrap_or(false);
561
562 if self.timer_exists(&name) {
563 self.cancel_timer(name.as_str());
564 log::warn!("Timer '{name}' replaced");
565 }
566
567 match callback {
568 Some(callback_py) => self.callbacks.insert(name, callback_py),
569 None => None,
570 };
571
572 let mut start_time_ns = start_time_ns.unwrap_or_default();
573 let ts_now = self.get_time_ns();
574
575 if start_time_ns == 0 {
576 start_time_ns = self.timestamp_ns();
578 } else if !allow_past {
579 let next_event_time = if fire_immediately {
581 start_time_ns
582 } else {
583 start_time_ns + interval_ns
584 };
585
586 if next_event_time < ts_now {
588 anyhow::bail!(
589 "Timer '{name}' next event time {} would be in the past (current time is {})",
590 next_event_time.to_rfc3339(),
591 ts_now.to_rfc3339(),
592 );
593 }
594 }
595
596 if let Some(stop_time) = stop_time_ns {
597 if stop_time <= start_time_ns {
598 anyhow::bail!(
599 "Timer '{name}' stop time {} must be after start time {}",
600 stop_time.to_rfc3339(),
601 start_time_ns.to_rfc3339(),
602 );
603 }
604 if !allow_past && stop_time <= ts_now {
605 anyhow::bail!(
606 "Timer '{name}' stop time {} is in the past (current time is {})",
607 stop_time.to_rfc3339(),
608 ts_now.to_rfc3339(),
609 );
610 }
611 }
612
613 let interval_ns = create_valid_interval(interval_ns);
614
615 let timer = TestTimer::new(
616 name,
617 interval_ns,
618 start_time_ns,
619 stop_time_ns,
620 fire_immediately,
621 );
622 self.timers.insert(name, timer);
623
624 Ok(())
625 }
626
627 fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
628 self.timers
629 .get(&Ustr::from(name))
630 .map(|timer| timer.next_time_ns())
631 }
632
633 fn cancel_timer(&mut self, name: &str) {
634 let timer = self.timers.remove(&Ustr::from(name));
635 if let Some(mut timer) = timer {
636 timer.cancel();
637 }
638 }
639
640 fn cancel_timers(&mut self) {
641 for timer in &mut self.timers.values_mut() {
642 timer.cancel();
643 }
644
645 self.timers.clear();
646 }
647
648 fn reset(&mut self) {
649 self.time = AtomicTime::new(false, UnixNanos::default());
650 self.timers = BTreeMap::new();
651 self.heap = BinaryHeap::new();
652 self.callbacks = HashMap::new();
653 }
654}
655
656#[derive(Debug)]
664pub struct LiveClock {
665 time: &'static AtomicTime,
666 timers: HashMap<Ustr, LiveTimer>,
667 default_callback: Option<TimeEventCallback>,
668 callbacks: HashMap<Ustr, TimeEventCallback>,
669 sender: Option<Arc<dyn TimeEventSender>>,
670}
671
672impl LiveClock {
673 #[must_use]
675 pub fn new(sender: Option<Arc<dyn TimeEventSender>>) -> Self {
676 Self {
677 time: get_atomic_clock_realtime(),
678 timers: HashMap::new(),
679 default_callback: None,
680 callbacks: HashMap::new(),
681 sender,
682 }
683 }
684
685 #[must_use]
686 pub const fn get_timers(&self) -> &HashMap<Ustr, LiveTimer> {
687 &self.timers
688 }
689
690 fn clear_expired_timers(&mut self) {
692 self.timers.retain(|_, timer| !timer.is_expired());
693 }
694}
695
696impl Default for LiveClock {
697 fn default() -> Self {
699 Self::new(Some(get_time_event_sender()))
700 }
701}
702
703impl Deref for LiveClock {
704 type Target = AtomicTime;
705
706 fn deref(&self) -> &Self::Target {
707 self.time
708 }
709}
710
711impl Clock for LiveClock {
712 fn timestamp_ns(&self) -> UnixNanos {
713 self.time.get_time_ns()
714 }
715
716 fn timestamp_us(&self) -> u64 {
717 self.time.get_time_us()
718 }
719
720 fn timestamp_ms(&self) -> u64 {
721 self.time.get_time_ms()
722 }
723
724 fn timestamp(&self) -> f64 {
725 self.time.get_time()
726 }
727
728 fn timer_names(&self) -> Vec<&str> {
729 self.timers
730 .iter()
731 .filter(|(_, timer)| !timer.is_expired())
732 .map(|(k, _)| k.as_str())
733 .collect()
734 }
735
736 fn timer_count(&self) -> usize {
737 self.timers
738 .iter()
739 .filter(|(_, timer)| !timer.is_expired())
740 .count()
741 }
742
743 fn timer_exists(&self, name: &Ustr) -> bool {
744 self.timers.contains_key(name)
745 }
746
747 fn register_default_handler(&mut self, handler: TimeEventCallback) {
748 self.default_callback = Some(handler);
749 }
750
751 #[allow(unused_variables)]
756 fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
757 let callback = self
759 .callbacks
760 .get(&event.name)
761 .cloned()
762 .or_else(|| self.default_callback.clone())
763 .unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
764
765 TimeEventHandlerV2::new(event, callback)
766 }
767
768 fn set_time_alert_ns(
769 &mut self,
770 name: &str,
771 mut alert_time_ns: UnixNanos, callback: Option<TimeEventCallback>,
773 allow_past: Option<bool>,
774 ) -> anyhow::Result<()> {
775 check_valid_string_ascii(name, stringify!(name))?;
776
777 let name = Ustr::from(name);
778 let allow_past = allow_past.unwrap_or(true);
779
780 if self.timer_exists(&name) {
781 self.cancel_timer(name.as_str());
782 log::warn!("Timer '{name}' replaced");
783 }
784
785 check_predicate_true(
786 callback.is_some()
787 | self.callbacks.contains_key(&name)
788 | self.default_callback.is_some(),
789 "No callbacks provided",
790 )?;
791
792 let callback = if let Some(callback) = callback {
793 self.callbacks.insert(name, callback.clone());
794 callback
795 } else if let Some(existing) = self.callbacks.get(&name) {
796 existing.clone()
797 } else {
798 let default = self
799 .default_callback
800 .clone()
801 .expect("Default callback should exist");
802 self.callbacks.insert(name, default.clone());
803 default
804 };
805
806 let ts_now = self.get_time_ns();
807
808 if alert_time_ns < ts_now {
810 if allow_past {
811 alert_time_ns = ts_now;
812 log::warn!(
813 "Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
814 alert_time_ns.to_rfc3339(),
815 );
816 } else {
817 anyhow::bail!(
818 "Timer '{name}' alert time {} was in the past (current time is {})",
819 alert_time_ns.to_rfc3339(),
820 ts_now.to_rfc3339(),
821 );
822 }
823 }
824
825 let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
827
828 let mut timer = LiveTimer::new(
829 name,
830 interval_ns,
831 ts_now,
832 Some(alert_time_ns),
833 callback,
834 false,
835 self.sender.clone(),
836 );
837
838 timer.start();
839
840 self.clear_expired_timers();
841 self.timers.insert(name, timer);
842
843 Ok(())
844 }
845
846 fn set_timer_ns(
847 &mut self,
848 name: &str,
849 interval_ns: u64,
850 start_time_ns: Option<UnixNanos>,
851 stop_time_ns: Option<UnixNanos>,
852 callback: Option<TimeEventCallback>,
853 allow_past: Option<bool>,
854 fire_immediately: Option<bool>,
855 ) -> anyhow::Result<()> {
856 check_valid_string_ascii(name, stringify!(name))?;
857 check_positive_u64(interval_ns, stringify!(interval_ns))?;
858 check_predicate_true(
859 callback.is_some() | self.default_callback.is_some(),
860 "No callbacks provided",
861 )?;
862
863 let name = Ustr::from(name);
864 let allow_past = allow_past.unwrap_or(true);
865 let fire_immediately = fire_immediately.unwrap_or(false);
866
867 if self.timer_exists(&name) {
868 self.cancel_timer(name.as_str());
869 log::warn!("Timer '{name}' replaced");
870 }
871
872 let callback = match callback {
873 Some(callback) => callback,
874 None => self.default_callback.clone().unwrap(),
875 };
876
877 self.callbacks.insert(name, callback.clone());
878
879 let mut start_time_ns = start_time_ns.unwrap_or_default();
880 let ts_now = self.get_time_ns();
881
882 if start_time_ns == 0 {
883 start_time_ns = self.timestamp_ns();
885 } else if start_time_ns < ts_now && !allow_past {
886 anyhow::bail!(
887 "Timer '{name}' start time {} was in the past (current time is {})",
888 start_time_ns.to_rfc3339(),
889 ts_now.to_rfc3339(),
890 );
891 }
892
893 if let Some(stop_time) = stop_time_ns {
894 if stop_time <= start_time_ns {
895 anyhow::bail!(
896 "Timer '{name}' stop time {} must be after start time {}",
897 stop_time.to_rfc3339(),
898 start_time_ns.to_rfc3339(),
899 );
900 }
901 if !allow_past && stop_time <= ts_now {
902 anyhow::bail!(
903 "Timer '{name}' stop time {} is in the past (current time is {})",
904 stop_time.to_rfc3339(),
905 ts_now.to_rfc3339(),
906 );
907 }
908 }
909
910 let interval_ns = create_valid_interval(interval_ns);
911
912 let mut timer = LiveTimer::new(
913 name,
914 interval_ns,
915 start_time_ns,
916 stop_time_ns,
917 callback,
918 fire_immediately,
919 self.sender.clone(),
920 );
921 timer.start();
922
923 self.clear_expired_timers();
924 self.timers.insert(name, timer);
925
926 Ok(())
927 }
928
929 fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
930 self.timers
931 .get(&Ustr::from(name))
932 .map(|timer| timer.next_time_ns())
933 }
934
935 fn cancel_timer(&mut self, name: &str) {
936 let timer = self.timers.remove(&Ustr::from(name));
937 if let Some(mut timer) = timer {
938 timer.cancel();
939 }
940 }
941
942 fn cancel_timers(&mut self) {
943 for timer in &mut self.timers.values_mut() {
944 timer.cancel();
945 }
946
947 self.timers.clear();
948 }
949
950 fn reset(&mut self) {
951 self.cancel_timers();
952 self.callbacks.clear();
953 }
954}
955
956#[derive(Debug)]
958pub struct TimeEventStream {
959 heap: Arc<Mutex<BinaryHeap<ScheduledTimeEvent>>>,
960}
961
962impl TimeEventStream {
963 pub const fn new(heap: Arc<Mutex<BinaryHeap<ScheduledTimeEvent>>>) -> Self {
964 Self { heap }
965 }
966}
967
968impl Stream for TimeEventStream {
969 type Item = TimeEvent;
970
971 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
972 let mut heap = match self.heap.try_lock() {
973 Ok(guard) => guard,
974 Err(e) => {
975 tracing::error!("Unable to get LiveClock heap lock: {e}");
976 cx.waker().wake_by_ref();
977 return Poll::Pending;
978 }
979 };
980
981 if let Some(event) = heap.pop() {
982 Poll::Ready(Some(event.into_inner()))
983 } else {
984 cx.waker().wake_by_ref();
985 Poll::Pending
986 }
987 }
988}
989
990#[cfg(test)]
995mod tests {
996 use std::{
997 sync::{Arc, Mutex},
998 time::Duration,
999 };
1000
1001 use nautilus_core::{MUTEX_POISONED, time::get_atomic_clock_realtime};
1002 use rstest::{fixture, rstest};
1003 use ustr::Ustr;
1004
1005 use super::*;
1006 use crate::{runner::TimeEventSender, testing::wait_until};
1007
1008 #[derive(Debug, Default)]
1009 struct TestCallback {
1010 called: Arc<Mutex<bool>>,
1012 }
1013
1014 impl TestCallback {
1015 fn new(called: Arc<Mutex<bool>>) -> Self {
1016 Self { called }
1017 }
1018 }
1019
1020 impl From<TestCallback> for TimeEventCallback {
1021 fn from(callback: TestCallback) -> Self {
1022 Self::from(move |_event: TimeEvent| {
1023 if let Ok(mut called) = callback.called.lock() {
1024 *called = true;
1025 }
1026 })
1027 }
1028 }
1029
1030 #[derive(Debug)]
1031 struct CollectingSender {
1032 events: Arc<Mutex<Vec<(TimeEvent, UnixNanos)>>>,
1033 }
1034
1035 impl CollectingSender {
1036 fn new(events: Arc<Mutex<Vec<(TimeEvent, UnixNanos)>>>) -> Self {
1037 Self { events }
1038 }
1039 }
1040
1041 impl TimeEventSender for CollectingSender {
1042 fn send(&self, handler: TimeEventHandlerV2) {
1043 let TimeEventHandlerV2 { event, callback } = handler;
1044 let now_ns = get_atomic_clock_realtime().get_time_ns();
1045 let event_clone = event.clone();
1046 callback.call(event);
1047 self.events
1048 .lock()
1049 .expect(MUTEX_POISONED)
1050 .push((event_clone, now_ns));
1051 }
1052 }
1053
1054 fn wait_for_events(
1055 events: &Arc<Mutex<Vec<(TimeEvent, UnixNanos)>>>,
1056 target: usize,
1057 timeout: Duration,
1058 ) {
1059 wait_until(
1060 || events.lock().expect(MUTEX_POISONED).len() >= target,
1061 timeout,
1062 );
1063 }
1064
1065 #[fixture]
1066 pub fn test_clock() -> TestClock {
1067 let mut clock = TestClock::new();
1068 clock.register_default_handler(TestCallback::default().into());
1069 clock
1070 }
1071
1072 #[rstest]
1073 fn test_time_monotonicity(mut test_clock: TestClock) {
1074 let initial_time = test_clock.timestamp_ns();
1075 test_clock.advance_time(UnixNanos::from(*initial_time + 1000), true);
1076 assert!(test_clock.timestamp_ns() > initial_time);
1077 }
1078
1079 #[rstest]
1080 fn test_timer_registration(mut test_clock: TestClock) {
1081 test_clock
1082 .set_time_alert_ns(
1083 "test_timer",
1084 (*test_clock.timestamp_ns() + 1000).into(),
1085 None,
1086 None,
1087 )
1088 .unwrap();
1089 assert_eq!(test_clock.timer_count(), 1);
1090 assert_eq!(test_clock.timer_names(), vec!["test_timer"]);
1091 }
1092
1093 #[rstest]
1094 fn test_timer_expiration(mut test_clock: TestClock) {
1095 let alert_time = (*test_clock.timestamp_ns() + 1000).into();
1096 test_clock
1097 .set_time_alert_ns("test_timer", alert_time, None, None)
1098 .unwrap();
1099 let events = test_clock.advance_time(alert_time, true);
1100 assert_eq!(events.len(), 1);
1101 assert_eq!(events[0].name.as_str(), "test_timer");
1102 }
1103
1104 #[rstest]
1105 fn test_timer_cancellation(mut test_clock: TestClock) {
1106 test_clock
1107 .set_time_alert_ns(
1108 "test_timer",
1109 (*test_clock.timestamp_ns() + 1000).into(),
1110 None,
1111 None,
1112 )
1113 .unwrap();
1114 assert_eq!(test_clock.timer_count(), 1);
1115 test_clock.cancel_timer("test_timer");
1116 assert_eq!(test_clock.timer_count(), 0);
1117 }
1118
1119 #[rstest]
1120 fn test_time_advancement(mut test_clock: TestClock) {
1121 let start_time = test_clock.timestamp_ns();
1122 test_clock
1123 .set_timer_ns("test_timer", 1000, Some(start_time), None, None, None, None)
1124 .unwrap();
1125 let events = test_clock.advance_time(UnixNanos::from(*start_time + 2500), true);
1126 assert_eq!(events.len(), 2);
1127 assert_eq!(*events[0].ts_event, *start_time + 1000);
1128 assert_eq!(*events[1].ts_event, *start_time + 2000);
1129 }
1130
1131 #[rstest]
1132 fn test_default_and_custom_callbacks() {
1133 let mut clock = TestClock::new();
1134 let default_called = Arc::new(Mutex::new(false));
1135 let custom_called = Arc::new(Mutex::new(false));
1136
1137 let default_callback = TestCallback::new(Arc::clone(&default_called));
1138 let custom_callback = TestCallback::new(Arc::clone(&custom_called));
1139
1140 clock.register_default_handler(TimeEventCallback::from(default_callback));
1141 clock
1142 .set_time_alert_ns(
1143 "default_timer",
1144 (*clock.timestamp_ns() + 1000).into(),
1145 None,
1146 None,
1147 )
1148 .unwrap();
1149 clock
1150 .set_time_alert_ns(
1151 "custom_timer",
1152 (*clock.timestamp_ns() + 1000).into(),
1153 Some(TimeEventCallback::from(custom_callback)),
1154 None,
1155 )
1156 .unwrap();
1157
1158 let events = clock.advance_time(UnixNanos::from(*clock.timestamp_ns() + 1000), true);
1159 let handlers = clock.match_handlers(events);
1160
1161 for handler in handlers {
1162 handler.callback.call(handler.event);
1163 }
1164
1165 assert!(*default_called.lock().expect(MUTEX_POISONED));
1166 assert!(*custom_called.lock().expect(MUTEX_POISONED));
1167 }
1168
1169 #[rstest]
1170 fn test_multiple_timers(mut test_clock: TestClock) {
1171 let start_time = test_clock.timestamp_ns();
1172 test_clock
1173 .set_timer_ns("timer1", 1000, Some(start_time), None, None, None, None)
1174 .unwrap();
1175 test_clock
1176 .set_timer_ns("timer2", 2000, Some(start_time), None, None, None, None)
1177 .unwrap();
1178 let events = test_clock.advance_time(UnixNanos::from(*start_time + 2000), true);
1179 assert_eq!(events.len(), 3);
1180 assert_eq!(events[0].name.as_str(), "timer1");
1181 assert_eq!(events[1].name.as_str(), "timer1");
1182 assert_eq!(events[2].name.as_str(), "timer2");
1183 }
1184
1185 #[rstest]
1186 fn test_allow_past_parameter_true(mut test_clock: TestClock) {
1187 test_clock.set_time(UnixNanos::from(2000));
1188 let current_time = test_clock.timestamp_ns();
1189 let past_time = UnixNanos::from(current_time.as_u64() - 1000);
1190
1191 test_clock
1193 .set_time_alert_ns("past_timer", past_time, None, Some(true))
1194 .unwrap();
1195
1196 assert_eq!(test_clock.timer_count(), 1);
1198 assert_eq!(test_clock.timer_names(), vec!["past_timer"]);
1199
1200 let next_time = test_clock.next_time_ns("past_timer").unwrap();
1202 assert!(next_time >= current_time);
1203 }
1204
1205 #[rstest]
1206 fn test_allow_past_parameter_false(mut test_clock: TestClock) {
1207 test_clock.set_time(UnixNanos::from(2000));
1208 let current_time = test_clock.timestamp_ns();
1209 let past_time = current_time - 1000;
1210
1211 let result = test_clock.set_time_alert_ns("past_timer", past_time, None, Some(false));
1213
1214 assert!(result.is_err());
1216 assert!(format!("{}", result.unwrap_err()).contains("was in the past"));
1217
1218 assert_eq!(test_clock.timer_count(), 0);
1220 assert!(test_clock.timer_names().is_empty());
1221 }
1222
1223 #[rstest]
1224 fn test_invalid_stop_time_validation(mut test_clock: TestClock) {
1225 test_clock.set_time(UnixNanos::from(2000));
1226 let current_time = test_clock.timestamp_ns();
1227 let start_time = current_time + 1000;
1228 let stop_time = current_time + 500; let result = test_clock.set_timer_ns(
1232 "invalid_timer",
1233 100,
1234 Some(start_time),
1235 Some(stop_time),
1236 None,
1237 None,
1238 None,
1239 );
1240
1241 assert!(result.is_err());
1243 assert!(format!("{}", result.unwrap_err()).contains("must be after start time"));
1244
1245 assert_eq!(test_clock.timer_count(), 0);
1247 }
1248
1249 #[rstest]
1250 fn test_set_timer_ns_fire_immediately_true(mut test_clock: TestClock) {
1251 let start_time = test_clock.timestamp_ns();
1252 let interval_ns = 1000;
1253
1254 test_clock
1255 .set_timer_ns(
1256 "fire_immediately_timer",
1257 interval_ns,
1258 Some(start_time),
1259 None,
1260 None,
1261 None,
1262 Some(true),
1263 )
1264 .unwrap();
1265
1266 let events = test_clock.advance_time(start_time + 2500, true);
1268
1269 assert_eq!(events.len(), 3);
1271 assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *start_time + 1000); assert_eq!(*events[2].ts_event, *start_time + 2000); }
1275
1276 #[rstest]
1277 fn test_set_timer_ns_fire_immediately_false(mut test_clock: TestClock) {
1278 let start_time = test_clock.timestamp_ns();
1279 let interval_ns = 1000;
1280
1281 test_clock
1282 .set_timer_ns(
1283 "normal_timer",
1284 interval_ns,
1285 Some(start_time),
1286 None,
1287 None,
1288 None,
1289 Some(false),
1290 )
1291 .unwrap();
1292
1293 let events = test_clock.advance_time(start_time + 2500, true);
1295
1296 assert_eq!(events.len(), 2);
1298 assert_eq!(*events[0].ts_event, *start_time + 1000); assert_eq!(*events[1].ts_event, *start_time + 2000); }
1301
1302 #[rstest]
1303 fn test_set_timer_ns_fire_immediately_default_is_false(mut test_clock: TestClock) {
1304 let start_time = test_clock.timestamp_ns();
1305 let interval_ns = 1000;
1306
1307 test_clock
1309 .set_timer_ns(
1310 "default_timer",
1311 interval_ns,
1312 Some(start_time),
1313 None,
1314 None,
1315 None,
1316 None,
1317 )
1318 .unwrap();
1319
1320 let events = test_clock.advance_time(start_time + 1500, true);
1321
1322 assert_eq!(events.len(), 1);
1324 assert_eq!(*events[0].ts_event, *start_time + 1000); }
1326
1327 #[rstest]
1328 fn test_set_timer_ns_fire_immediately_with_zero_start_time(mut test_clock: TestClock) {
1329 test_clock.set_time(5000.into());
1330 let interval_ns = 1000;
1331
1332 test_clock
1333 .set_timer_ns(
1334 "zero_start_timer",
1335 interval_ns,
1336 None,
1337 None,
1338 None,
1339 None,
1340 Some(true),
1341 )
1342 .unwrap();
1343
1344 let events = test_clock.advance_time(UnixNanos::from(7000), true);
1345
1346 assert_eq!(events.len(), 3);
1349 assert_eq!(*events[0].ts_event, 5000); assert_eq!(*events[1].ts_event, 6000);
1351 assert_eq!(*events[2].ts_event, 7000);
1352 }
1353
1354 #[rstest]
1355 fn test_multiple_timers_different_fire_immediately_settings(mut test_clock: TestClock) {
1356 let start_time = test_clock.timestamp_ns();
1357 let interval_ns = 1000;
1358
1359 test_clock
1361 .set_timer_ns(
1362 "immediate_timer",
1363 interval_ns,
1364 Some(start_time),
1365 None,
1366 None,
1367 None,
1368 Some(true),
1369 )
1370 .unwrap();
1371
1372 test_clock
1374 .set_timer_ns(
1375 "normal_timer",
1376 interval_ns,
1377 Some(start_time),
1378 None,
1379 None,
1380 None,
1381 Some(false),
1382 )
1383 .unwrap();
1384
1385 let events = test_clock.advance_time(start_time + 1500, true);
1386
1387 assert_eq!(events.len(), 3);
1389
1390 let mut event_times: Vec<u64> = events.iter().map(|e| e.ts_event.as_u64()).collect();
1392 event_times.sort();
1393
1394 assert_eq!(event_times[0], start_time.as_u64()); assert_eq!(event_times[1], start_time.as_u64() + 1000); assert_eq!(event_times[2], start_time.as_u64() + 1000); }
1398
1399 #[rstest]
1400 fn test_timer_name_collision_overwrites(mut test_clock: TestClock) {
1401 let start_time = test_clock.timestamp_ns();
1402
1403 test_clock
1405 .set_timer_ns(
1406 "collision_timer",
1407 1000,
1408 Some(start_time),
1409 None,
1410 None,
1411 None,
1412 None,
1413 )
1414 .unwrap();
1415
1416 let result = test_clock.set_timer_ns(
1418 "collision_timer",
1419 2000,
1420 Some(start_time),
1421 None,
1422 None,
1423 None,
1424 None,
1425 );
1426
1427 assert!(result.is_ok());
1428 assert_eq!(test_clock.timer_count(), 1);
1430
1431 let next_time = test_clock.next_time_ns("collision_timer").unwrap();
1433 assert_eq!(next_time, start_time + 2000);
1435 }
1436
1437 #[rstest]
1438 fn test_timer_zero_interval_error(mut test_clock: TestClock) {
1439 let start_time = test_clock.timestamp_ns();
1440
1441 let result =
1443 test_clock.set_timer_ns("zero_interval", 0, Some(start_time), None, None, None, None);
1444
1445 assert!(result.is_err());
1446 assert_eq!(test_clock.timer_count(), 0);
1447 }
1448
1449 #[rstest]
1450 fn test_timer_empty_name_error(mut test_clock: TestClock) {
1451 let start_time = test_clock.timestamp_ns();
1452
1453 let result = test_clock.set_timer_ns("", 1000, Some(start_time), None, None, None, None);
1455
1456 assert!(result.is_err());
1457 assert_eq!(test_clock.timer_count(), 0);
1458 }
1459
1460 #[rstest]
1461 fn test_timer_exists(mut test_clock: TestClock) {
1462 let name = Ustr::from("exists_timer");
1463 assert!(!test_clock.timer_exists(&name));
1464
1465 test_clock
1466 .set_time_alert_ns(
1467 name.as_str(),
1468 (*test_clock.timestamp_ns() + 1_000).into(),
1469 None,
1470 None,
1471 )
1472 .unwrap();
1473
1474 assert!(test_clock.timer_exists(&name));
1475 }
1476
1477 #[rstest]
1478 fn test_timer_rejects_past_stop_time_when_not_allowed(mut test_clock: TestClock) {
1479 test_clock.set_time(UnixNanos::from(10_000));
1480 let current = test_clock.timestamp_ns();
1481
1482 let result = test_clock.set_timer_ns(
1483 "past_stop",
1484 10_000,
1485 Some(current - 500),
1486 Some(current - 100),
1487 None,
1488 Some(false),
1489 None,
1490 );
1491
1492 let err = result.expect_err("expected stop time validation error");
1493 let err_msg = err.to_string();
1494 assert!(err_msg.contains("stop time"));
1495 assert!(err_msg.contains("in the past"));
1496 }
1497
1498 #[rstest]
1499 fn test_timer_accepts_future_stop_time(mut test_clock: TestClock) {
1500 let current = test_clock.timestamp_ns();
1501
1502 let result = test_clock.set_timer_ns(
1503 "future_stop",
1504 1_000,
1505 Some(current),
1506 Some(current + 10_000),
1507 None,
1508 Some(false),
1509 None,
1510 );
1511
1512 assert!(result.is_ok());
1513 }
1514
1515 #[rstest]
1516 fn test_live_clock_timer_replacement_cancels_previous_task() {
1517 let events = Arc::new(Mutex::new(Vec::new()));
1518 let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
1519
1520 let mut clock = LiveClock::new(Some(sender));
1521 clock.register_default_handler(TimeEventCallback::from(|_| {}));
1522
1523 let fast_interval = Duration::from_millis(10).as_nanos() as u64;
1524 clock
1525 .set_timer_ns("replace", fast_interval, None, None, None, None, None)
1526 .unwrap();
1527
1528 wait_for_events(&events, 2, Duration::from_millis(200));
1529 events.lock().expect(MUTEX_POISONED).clear();
1530
1531 let slow_interval = Duration::from_millis(30).as_nanos() as u64;
1532 clock
1533 .set_timer_ns("replace", slow_interval, None, None, None, None, None)
1534 .unwrap();
1535
1536 wait_for_events(&events, 3, Duration::from_millis(300));
1537
1538 let snapshot = events.lock().expect(MUTEX_POISONED).clone();
1539 let diffs: Vec<u64> = snapshot
1540 .windows(2)
1541 .map(|pair| pair[1].0.ts_event.as_u64() - pair[0].0.ts_event.as_u64())
1542 .collect();
1543
1544 assert!(!diffs.is_empty());
1545 for diff in diffs {
1546 assert_ne!(diff, fast_interval);
1547 }
1548
1549 clock.cancel_timers();
1550 }
1551
1552 #[rstest]
1553 fn test_live_clock_time_alert_persists_callback() {
1554 let events = Arc::new(Mutex::new(Vec::new()));
1555 let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
1556
1557 let mut clock = LiveClock::new(Some(sender));
1558 clock.register_default_handler(TimeEventCallback::from(|_| {}));
1559
1560 let now = clock.timestamp_ns();
1561 let alert_time = now + 1_000_u64;
1562
1563 clock
1564 .set_time_alert_ns("alert-callback", alert_time, None, None)
1565 .unwrap();
1566
1567 assert!(clock.callbacks.contains_key(&Ustr::from("alert-callback")));
1568
1569 clock.cancel_timers();
1570 }
1571
1572 #[rstest]
1573 fn test_live_clock_reset_stops_active_timers() {
1574 let events = Arc::new(Mutex::new(Vec::new()));
1575 let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
1576
1577 let mut clock = LiveClock::new(Some(sender));
1578 clock.register_default_handler(TimeEventCallback::from(|_| {}));
1579
1580 clock
1581 .set_timer_ns(
1582 "reset-test",
1583 Duration::from_millis(15).as_nanos() as u64,
1584 None,
1585 None,
1586 None,
1587 None,
1588 None,
1589 )
1590 .unwrap();
1591
1592 wait_for_events(&events, 2, Duration::from_millis(250));
1593
1594 clock.reset();
1595
1596 let start = std::time::Instant::now();
1598 wait_until(
1599 || start.elapsed() >= Duration::from_millis(50),
1600 Duration::from_secs(2),
1601 );
1602
1603 events.lock().expect(MUTEX_POISONED).clear();
1605
1606 let start = std::time::Instant::now();
1608 wait_until(
1609 || start.elapsed() >= Duration::from_millis(50),
1610 Duration::from_secs(2),
1611 );
1612 assert!(events.lock().expect(MUTEX_POISONED).is_empty());
1613 }
1614
1615 #[rstest]
1616 fn test_live_timer_short_delay_not_early() {
1617 let events = Arc::new(Mutex::new(Vec::new()));
1618 let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
1619
1620 let mut clock = LiveClock::new(Some(sender));
1621 clock.register_default_handler(TimeEventCallback::from(|_| {}));
1622
1623 let now = clock.timestamp_ns();
1624 let start_time = UnixNanos::from(*now + 500_000); let interval_ns = 1_000_000;
1626
1627 clock
1628 .set_timer_ns(
1629 "short-delay",
1630 interval_ns,
1631 Some(start_time),
1632 None,
1633 None,
1634 None,
1635 Some(true),
1636 )
1637 .unwrap();
1638
1639 wait_for_events(&events, 1, Duration::from_millis(100));
1640
1641 let snapshot = events.lock().expect(MUTEX_POISONED).clone();
1642 assert!(!snapshot.is_empty());
1643
1644 for (event, actual_ts) in &snapshot {
1645 assert!(actual_ts.as_u64() >= event.ts_event.as_u64());
1646 }
1647
1648 clock.cancel_timers();
1649 }
1650
1651 #[rstest]
1652 fn test_timer_fire_immediately_at_exact_stop_time(mut test_clock: TestClock) {
1653 let start_time = test_clock.timestamp_ns();
1654 let interval_ns = 1000;
1655 let stop_time = start_time + interval_ns; test_clock
1658 .set_timer_ns(
1659 "exact_stop",
1660 interval_ns,
1661 Some(start_time),
1662 Some(stop_time),
1663 None,
1664 None,
1665 Some(true),
1666 )
1667 .unwrap();
1668
1669 let events = test_clock.advance_time(stop_time, true);
1670
1671 assert_eq!(events.len(), 2);
1673 assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *stop_time); }
1676
1677 #[rstest]
1678 fn test_timer_advance_to_exact_next_time(mut test_clock: TestClock) {
1679 let start_time = test_clock.timestamp_ns();
1680 let interval_ns = 1000;
1681
1682 test_clock
1683 .set_timer_ns(
1684 "exact_advance",
1685 interval_ns,
1686 Some(start_time),
1687 None,
1688 None,
1689 None,
1690 Some(false),
1691 )
1692 .unwrap();
1693
1694 let next_time = test_clock.next_time_ns("exact_advance").unwrap();
1696 let events = test_clock.advance_time(next_time, true);
1697
1698 assert_eq!(events.len(), 1);
1699 assert_eq!(*events[0].ts_event, *next_time);
1700 }
1701
1702 #[rstest]
1703 fn test_allow_past_bar_aggregation_use_case(mut test_clock: TestClock) {
1704 test_clock.set_time(UnixNanos::from(100_500)); let bar_start_time = UnixNanos::from(100_000); let interval_ns = 1000; let result = test_clock.set_timer_ns(
1714 "bar_timer",
1715 interval_ns,
1716 Some(bar_start_time),
1717 None,
1718 None,
1719 Some(false), Some(false), );
1722
1723 assert!(result.is_ok());
1725 assert_eq!(test_clock.timer_count(), 1);
1726
1727 let next_time = test_clock.next_time_ns("bar_timer").unwrap();
1729 assert_eq!(*next_time, 101_000);
1730 }
1731
1732 #[rstest]
1733 fn test_allow_past_false_rejects_when_next_event_in_past(mut test_clock: TestClock) {
1734 test_clock.set_time(UnixNanos::from(102_000)); let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000; let result = test_clock.set_timer_ns(
1743 "past_event_timer",
1744 interval_ns,
1745 Some(past_start_time),
1746 None,
1747 None,
1748 Some(false), Some(false), );
1751
1752 assert!(result.is_err());
1754 assert!(
1755 result
1756 .unwrap_err()
1757 .to_string()
1758 .contains("would be in the past")
1759 );
1760 }
1761
1762 #[rstest]
1763 fn test_allow_past_false_with_fire_immediately_true(mut test_clock: TestClock) {
1764 test_clock.set_time(UnixNanos::from(100_500)); let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000;
1768
1769 let result = test_clock.set_timer_ns(
1772 "immediate_past_timer",
1773 interval_ns,
1774 Some(past_start_time),
1775 None,
1776 None,
1777 Some(false), Some(true), );
1780
1781 assert!(result.is_err());
1783 assert!(
1784 result
1785 .unwrap_err()
1786 .to_string()
1787 .contains("would be in the past")
1788 );
1789 }
1790
1791 #[rstest]
1792 fn test_cancel_timer_during_execution(mut test_clock: TestClock) {
1793 let start_time = test_clock.timestamp_ns();
1794
1795 test_clock
1796 .set_timer_ns(
1797 "cancel_test",
1798 1000,
1799 Some(start_time),
1800 None,
1801 None,
1802 None,
1803 None,
1804 )
1805 .unwrap();
1806
1807 assert_eq!(test_clock.timer_count(), 1);
1808
1809 test_clock.cancel_timer("cancel_test");
1811
1812 assert_eq!(test_clock.timer_count(), 0);
1813
1814 let events = test_clock.advance_time(start_time + 2000, true);
1816 assert_eq!(events.len(), 0);
1817 }
1818
1819 #[rstest]
1820 fn test_cancel_all_timers(mut test_clock: TestClock) {
1821 test_clock
1823 .set_timer_ns("timer1", 1000, None, None, None, None, None)
1824 .unwrap();
1825 test_clock
1826 .set_timer_ns("timer2", 1500, None, None, None, None, None)
1827 .unwrap();
1828 test_clock
1829 .set_timer_ns("timer3", 2000, None, None, None, None, None)
1830 .unwrap();
1831
1832 assert_eq!(test_clock.timer_count(), 3);
1833
1834 test_clock.cancel_timers();
1836
1837 assert_eq!(test_clock.timer_count(), 0);
1838
1839 let events = test_clock.advance_time(UnixNanos::from(5000), true);
1841 assert_eq!(events.len(), 0);
1842 }
1843
1844 #[rstest]
1845 fn test_clock_reset_clears_timers(mut test_clock: TestClock) {
1846 test_clock
1847 .set_timer_ns("reset_test", 1000, None, None, None, None, None)
1848 .unwrap();
1849
1850 assert_eq!(test_clock.timer_count(), 1);
1851
1852 test_clock.reset();
1854
1855 assert_eq!(test_clock.timer_count(), 0);
1856 assert_eq!(test_clock.timestamp_ns(), UnixNanos::default()); }
1858
1859 #[rstest]
1860 fn test_set_time_alert_default_impl(mut test_clock: TestClock) {
1861 let current_time = test_clock.utc_now();
1862 let alert_time = current_time + chrono::Duration::seconds(1);
1863
1864 test_clock
1866 .set_time_alert("alert_test", alert_time, None, None)
1867 .unwrap();
1868
1869 assert_eq!(test_clock.timer_count(), 1);
1870 assert_eq!(test_clock.timer_names(), vec!["alert_test"]);
1871
1872 let expected_ns = UnixNanos::from(alert_time);
1874 let next_time = test_clock.next_time_ns("alert_test").unwrap();
1875
1876 let diff = if next_time >= expected_ns {
1878 next_time.as_u64() - expected_ns.as_u64()
1879 } else {
1880 expected_ns.as_u64() - next_time.as_u64()
1881 };
1882 assert!(
1883 diff < 1000,
1884 "Timer should be set within 1 microsecond of expected time"
1885 );
1886 }
1887
1888 #[rstest]
1889 fn test_set_timer_default_impl(mut test_clock: TestClock) {
1890 let current_time = test_clock.utc_now();
1891 let start_time = current_time + chrono::Duration::seconds(1);
1892 let interval = Duration::from_millis(500);
1893
1894 test_clock
1896 .set_timer(
1897 "timer_test",
1898 interval,
1899 Some(start_time),
1900 None,
1901 None,
1902 None,
1903 None,
1904 )
1905 .unwrap();
1906
1907 assert_eq!(test_clock.timer_count(), 1);
1908 assert_eq!(test_clock.timer_names(), vec!["timer_test"]);
1909
1910 let start_ns = UnixNanos::from(start_time);
1912 let interval_ns = interval.as_nanos() as u64;
1913
1914 let events = test_clock.advance_time(start_ns + interval_ns * 3, true);
1915 assert_eq!(events.len(), 3); assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1919 assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1920 assert_eq!(*events[2].ts_event, *start_ns + interval_ns * 3);
1921 }
1922
1923 #[rstest]
1924 fn test_set_timer_with_stop_time_default_impl(mut test_clock: TestClock) {
1925 let current_time = test_clock.utc_now();
1926 let start_time = current_time + chrono::Duration::seconds(1);
1927 let stop_time = current_time + chrono::Duration::seconds(3);
1928 let interval = Duration::from_secs(1);
1929
1930 test_clock
1932 .set_timer(
1933 "timer_with_stop",
1934 interval,
1935 Some(start_time),
1936 Some(stop_time),
1937 None,
1938 None,
1939 None,
1940 )
1941 .unwrap();
1942
1943 assert_eq!(test_clock.timer_count(), 1);
1944
1945 let stop_ns = UnixNanos::from(stop_time);
1947 let events = test_clock.advance_time(stop_ns + 1000, true);
1948
1949 assert_eq!(events.len(), 2);
1951
1952 let start_ns = UnixNanos::from(start_time);
1953 let interval_ns = interval.as_nanos() as u64;
1954 assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1955 assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1956 }
1957
1958 #[rstest]
1959 fn test_set_timer_fire_immediately_default_impl(mut test_clock: TestClock) {
1960 let current_time = test_clock.utc_now();
1961 let start_time = current_time + chrono::Duration::seconds(1);
1962 let interval = Duration::from_millis(500);
1963
1964 test_clock
1966 .set_timer(
1967 "immediate_timer",
1968 interval,
1969 Some(start_time),
1970 None,
1971 None,
1972 None,
1973 Some(true),
1974 )
1975 .unwrap();
1976
1977 let start_ns = UnixNanos::from(start_time);
1978 let interval_ns = interval.as_nanos() as u64;
1979
1980 let events = test_clock.advance_time(start_ns + interval_ns, true);
1982
1983 assert_eq!(events.len(), 2);
1985 assert_eq!(*events[0].ts_event, *start_ns); assert_eq!(*events[1].ts_event, *start_ns + interval_ns); }
1988
1989 #[rstest]
1990 fn test_set_time_alert_when_alert_time_equals_current_time(mut test_clock: TestClock) {
1991 let current_time = test_clock.timestamp_ns();
1992
1993 test_clock
1995 .set_time_alert_ns("alert_at_current_time", current_time, None, None)
1996 .unwrap();
1997
1998 assert_eq!(test_clock.timer_count(), 1);
1999
2000 let events = test_clock.advance_time(current_time, true);
2002
2003 assert_eq!(events.len(), 1);
2005 assert_eq!(events[0].name.as_str(), "alert_at_current_time");
2006 assert_eq!(*events[0].ts_event, *current_time);
2007 }
2008}