1use std::{
19 collections::{BTreeMap, BinaryHeap, HashMap},
20 fmt::Debug,
21 ops::Deref,
22 pin::Pin,
23 sync::Arc,
24 task::{Context, Poll},
25 time::Duration,
26};
27
28use chrono::{DateTime, Utc};
29use futures::Stream;
30use nautilus_core::{
31 AtomicTime, UnixNanos,
32 correctness::{check_positive_u64, check_predicate_true, check_valid_string_ascii},
33 time::get_atomic_clock_realtime,
34};
35use thousands::Separable;
36use tokio::sync::Mutex;
37use ustr::Ustr;
38
39use crate::{
40 runner::{TimeEventSender, get_time_event_sender},
41 timer::{
42 LiveTimer, ScheduledTimeEvent, TestTimer, TimeEvent, TimeEventCallback, TimeEventHandlerV2,
43 create_valid_interval,
44 },
45};
46
47pub trait Clock: Debug {
53 fn utc_now(&self) -> DateTime<Utc> {
55 DateTime::from_timestamp_nanos(self.timestamp_ns().as_i64())
56 }
57
58 fn timestamp_ns(&self) -> UnixNanos;
60
61 fn timestamp_us(&self) -> u64;
63
64 fn timestamp_ms(&self) -> u64;
66
67 fn timestamp(&self) -> f64;
69
70 fn timer_names(&self) -> Vec<&str>;
72
73 fn timer_count(&self) -> usize;
75
76 fn timer_exists(&self, name: &Ustr) -> bool;
78
79 fn register_default_handler(&mut self, callback: TimeEventCallback);
82
83 fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2;
87
88 #[allow(clippy::too_many_arguments)]
102 fn set_time_alert(
103 &mut self,
104 name: &str,
105 alert_time: DateTime<Utc>,
106 callback: Option<TimeEventCallback>,
107 allow_past: Option<bool>,
108 ) -> anyhow::Result<()> {
109 self.set_time_alert_ns(name, alert_time.into(), callback, allow_past)
110 }
111
112 #[allow(clippy::too_many_arguments)]
133 fn set_time_alert_ns(
134 &mut self,
135 name: &str,
136 alert_time_ns: UnixNanos,
137 callback: Option<TimeEventCallback>,
138 allow_past: Option<bool>,
139 ) -> anyhow::Result<()>;
140
141 #[allow(clippy::too_many_arguments)]
157 fn set_timer(
158 &mut self,
159 name: &str,
160 interval: Duration,
161 start_time: Option<DateTime<Utc>>,
162 stop_time: Option<DateTime<Utc>>,
163 callback: Option<TimeEventCallback>,
164 allow_past: Option<bool>,
165 fire_immediately: Option<bool>,
166 ) -> anyhow::Result<()> {
167 self.set_timer_ns(
168 name,
169 interval.as_nanos() as u64,
170 start_time.map(UnixNanos::from),
171 stop_time.map(UnixNanos::from),
172 callback,
173 allow_past,
174 fire_immediately,
175 )
176 }
177
178 #[allow(clippy::too_many_arguments)]
206 fn set_timer_ns(
207 &mut self,
208 name: &str,
209 interval_ns: u64,
210 start_time_ns: Option<UnixNanos>,
211 stop_time_ns: Option<UnixNanos>,
212 callback: Option<TimeEventCallback>,
213 allow_past: Option<bool>,
214 fire_immediately: Option<bool>,
215 ) -> anyhow::Result<()>;
216
217 fn next_time_ns(&self, name: &str) -> Option<UnixNanos>;
221
222 fn cancel_timer(&mut self, name: &str);
224
225 fn cancel_timers(&mut self);
227
228 fn reset(&mut self);
230}
231
232#[derive(Debug)]
240pub struct TestClock {
241 time: AtomicTime,
242 timers: BTreeMap<Ustr, TestTimer>,
244 default_callback: Option<TimeEventCallback>,
245 callbacks: HashMap<Ustr, TimeEventCallback>,
246 heap: BinaryHeap<ScheduledTimeEvent>, }
248
249impl TestClock {
250 #[must_use]
252 pub fn new() -> Self {
253 Self {
254 time: AtomicTime::new(false, UnixNanos::default()),
255 timers: BTreeMap::new(),
256 default_callback: None,
257 callbacks: HashMap::new(),
258 heap: BinaryHeap::new(),
259 }
260 }
261
262 #[must_use]
264 pub const fn get_timers(&self) -> &BTreeMap<Ustr, TestTimer> {
265 &self.timers
266 }
267
268 pub fn advance_time(&mut self, to_time_ns: UnixNanos, set_time: bool) -> Vec<TimeEvent> {
285 const WARN_TIME_EVENTS_THRESHOLD: usize = 1_000_000;
286
287 let from_time_ns = self.time.get_time_ns();
288
289 assert!(
291 to_time_ns >= from_time_ns,
292 "`to_time_ns` {to_time_ns} was < `from_time_ns` {}",
293 from_time_ns
294 );
295
296 if set_time {
297 self.time.set_time(to_time_ns);
298 }
299
300 let mut events: Vec<TimeEvent> = Vec::new();
302 self.timers.retain(|_, timer| {
303 timer.advance(to_time_ns).for_each(|event| {
304 events.push(event);
305 });
306
307 !timer.is_expired()
308 });
309
310 if events.len() >= WARN_TIME_EVENTS_THRESHOLD {
311 log::warn!(
312 "Allocated {} time events during clock advancement from {} to {}, \
313 consider stopping the timer between large time ranges with no data points",
314 events.len().separate_with_commas(),
315 from_time_ns,
316 to_time_ns
317 );
318 }
319
320 events.sort_by(|a, b| a.ts_event.cmp(&b.ts_event));
321 events
322 }
323
324 pub fn advance_to_time_on_heap(&mut self, to_time_ns: UnixNanos) {
338 const WARN_HEAP_SIZE_THRESHOLD: usize = 100_000;
339
340 let from_time_ns = self.time.get_time_ns();
341
342 assert!(
344 to_time_ns >= from_time_ns,
345 "`to_time_ns` {to_time_ns} was < `from_time_ns` {}",
346 from_time_ns
347 );
348
349 self.time.set_time(to_time_ns);
350
351 if self.heap.len() > WARN_HEAP_SIZE_THRESHOLD {
352 log::warn!(
353 "TestClock heap size {} exceeds recommended limit",
354 self.heap.len()
355 );
356 }
357
358 self.timers.retain(|_, timer| {
360 timer.advance(to_time_ns).for_each(|event| {
361 self.heap.push(ScheduledTimeEvent::new(event));
362 });
363
364 !timer.is_expired()
365 });
366 }
367
368 #[must_use]
378 pub fn match_handlers(&self, events: Vec<TimeEvent>) -> Vec<TimeEventHandlerV2> {
379 events
380 .into_iter()
381 .map(|event| {
382 let callback = self.callbacks.get(&event.name).cloned().unwrap_or_else(|| {
383 self.default_callback
386 .clone()
387 .expect("Default callback should exist")
388 });
389 TimeEventHandlerV2::new(event, callback)
390 })
391 .collect()
392 }
393}
394
395impl Iterator for TestClock {
396 type Item = TimeEventHandlerV2;
397
398 fn next(&mut self) -> Option<Self::Item> {
399 self.heap
400 .pop()
401 .map(|event| self.get_handler(event.into_inner()))
402 }
403}
404
405impl Default for TestClock {
406 fn default() -> Self {
408 Self::new()
409 }
410}
411
412impl Deref for TestClock {
413 type Target = AtomicTime;
414
415 fn deref(&self) -> &Self::Target {
416 &self.time
417 }
418}
419
420impl Clock for TestClock {
421 fn timestamp_ns(&self) -> UnixNanos {
422 self.time.get_time_ns()
423 }
424
425 fn timestamp_us(&self) -> u64 {
426 self.time.get_time_us()
427 }
428
429 fn timestamp_ms(&self) -> u64 {
430 self.time.get_time_ms()
431 }
432
433 fn timestamp(&self) -> f64 {
434 self.time.get_time()
435 }
436
437 fn timer_names(&self) -> Vec<&str> {
438 self.timers
439 .iter()
440 .filter(|(_, timer)| !timer.is_expired())
441 .map(|(k, _)| k.as_str())
442 .collect()
443 }
444
445 fn timer_count(&self) -> usize {
446 self.timers
447 .iter()
448 .filter(|(_, timer)| !timer.is_expired())
449 .count()
450 }
451
452 fn timer_exists(&self, name: &Ustr) -> bool {
453 self.timers.contains_key(name)
454 }
455
456 fn register_default_handler(&mut self, callback: TimeEventCallback) {
457 self.default_callback = Some(callback);
458 }
459
460 fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
466 let callback = self
468 .callbacks
469 .get(&event.name)
470 .cloned()
471 .or_else(|| self.default_callback.clone())
472 .unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
473
474 TimeEventHandlerV2::new(event, callback)
475 }
476
477 fn set_time_alert_ns(
478 &mut self,
479 name: &str,
480 mut alert_time_ns: UnixNanos, callback: Option<TimeEventCallback>,
482 allow_past: Option<bool>,
483 ) -> anyhow::Result<()> {
484 check_valid_string_ascii(name, stringify!(name))?;
485
486 let name = Ustr::from(name);
487 let allow_past = allow_past.unwrap_or(true);
488
489 if self.timer_exists(&name) {
490 self.cancel_timer(name.as_str());
491 log::warn!("Timer '{name}' replaced");
492 }
493
494 check_predicate_true(
495 callback.is_some()
496 | self.callbacks.contains_key(&name)
497 | self.default_callback.is_some(),
498 "No callbacks provided",
499 )?;
500
501 match callback {
502 Some(callback_py) => self.callbacks.insert(name, callback_py),
503 None => None,
504 };
505
506 let ts_now = self.get_time_ns();
507
508 if alert_time_ns < ts_now {
509 if allow_past {
510 alert_time_ns = ts_now;
511 log::warn!(
512 "Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
513 alert_time_ns.to_rfc3339(),
514 );
515 } else {
516 anyhow::bail!(
517 "Timer '{name}' alert time {} was in the past (current time is {})",
518 alert_time_ns.to_rfc3339(),
519 ts_now.to_rfc3339(),
520 );
521 }
522 }
523
524 let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
526 let fire_immediately = alert_time_ns == ts_now;
528
529 let timer = TestTimer::new(
530 name,
531 interval_ns,
532 ts_now,
533 Some(alert_time_ns),
534 fire_immediately,
535 );
536 self.timers.insert(name, timer);
537
538 Ok(())
539 }
540
541 fn set_timer_ns(
542 &mut self,
543 name: &str,
544 interval_ns: u64,
545 start_time_ns: Option<UnixNanos>,
546 stop_time_ns: Option<UnixNanos>,
547 callback: Option<TimeEventCallback>,
548 allow_past: Option<bool>,
549 fire_immediately: Option<bool>,
550 ) -> anyhow::Result<()> {
551 check_valid_string_ascii(name, stringify!(name))?;
552 check_positive_u64(interval_ns, stringify!(interval_ns))?;
553 check_predicate_true(
554 callback.is_some() | self.default_callback.is_some(),
555 "No callbacks provided",
556 )?;
557
558 let name = Ustr::from(name);
559 let allow_past = allow_past.unwrap_or(true);
560 let fire_immediately = fire_immediately.unwrap_or(false);
561
562 if self.timer_exists(&name) {
563 self.cancel_timer(name.as_str());
564 log::warn!("Timer '{name}' replaced");
565 }
566
567 match callback {
568 Some(callback_py) => self.callbacks.insert(name, callback_py),
569 None => None,
570 };
571
572 let mut start_time_ns = start_time_ns.unwrap_or_default();
573 let ts_now = self.get_time_ns();
574
575 if start_time_ns == 0 {
576 start_time_ns = self.timestamp_ns();
578 } else if !allow_past {
579 let next_event_time = if fire_immediately {
581 start_time_ns
582 } else {
583 start_time_ns + interval_ns
584 };
585
586 if next_event_time < ts_now {
588 anyhow::bail!(
589 "Timer '{name}' next event time {} would be in the past (current time is {})",
590 next_event_time.to_rfc3339(),
591 ts_now.to_rfc3339(),
592 );
593 }
594 }
595
596 if let Some(stop_time) = stop_time_ns {
597 if stop_time <= start_time_ns {
598 anyhow::bail!(
599 "Timer '{name}' stop time {} must be after start time {}",
600 stop_time.to_rfc3339(),
601 start_time_ns.to_rfc3339(),
602 );
603 }
604 if !allow_past && stop_time <= ts_now {
605 anyhow::bail!(
606 "Timer '{name}' stop time {} is in the past (current time is {})",
607 stop_time.to_rfc3339(),
608 ts_now.to_rfc3339(),
609 );
610 }
611 }
612
613 let interval_ns = create_valid_interval(interval_ns);
614
615 let timer = TestTimer::new(
616 name,
617 interval_ns,
618 start_time_ns,
619 stop_time_ns,
620 fire_immediately,
621 );
622 self.timers.insert(name, timer);
623
624 Ok(())
625 }
626
627 fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
628 self.timers
629 .get(&Ustr::from(name))
630 .map(|timer| timer.next_time_ns())
631 }
632
633 fn cancel_timer(&mut self, name: &str) {
634 let timer = self.timers.remove(&Ustr::from(name));
635 if let Some(mut timer) = timer {
636 timer.cancel();
637 }
638 }
639
640 fn cancel_timers(&mut self) {
641 for timer in &mut self.timers.values_mut() {
642 timer.cancel();
643 }
644
645 self.timers.clear();
646 }
647
648 fn reset(&mut self) {
649 self.time = AtomicTime::new(false, UnixNanos::default());
650 self.timers = BTreeMap::new();
651 self.heap = BinaryHeap::new();
652 self.callbacks = HashMap::new();
653 }
654}
655
656#[derive(Debug)]
664pub struct LiveClock {
665 time: &'static AtomicTime,
666 timers: HashMap<Ustr, LiveTimer>,
667 default_callback: Option<TimeEventCallback>,
668 callbacks: HashMap<Ustr, TimeEventCallback>,
669 sender: Option<Arc<dyn TimeEventSender>>,
670}
671
672impl LiveClock {
673 #[must_use]
675 pub fn new(sender: Option<Arc<dyn TimeEventSender>>) -> Self {
676 Self {
677 time: get_atomic_clock_realtime(),
678 timers: HashMap::new(),
679 default_callback: None,
680 callbacks: HashMap::new(),
681 sender,
682 }
683 }
684
685 #[must_use]
686 pub const fn get_timers(&self) -> &HashMap<Ustr, LiveTimer> {
687 &self.timers
688 }
689
690 fn clear_expired_timers(&mut self) {
692 self.timers.retain(|_, timer| !timer.is_expired());
693 }
694}
695
696impl Default for LiveClock {
697 fn default() -> Self {
699 Self::new(Some(get_time_event_sender()))
700 }
701}
702
703impl Deref for LiveClock {
704 type Target = AtomicTime;
705
706 fn deref(&self) -> &Self::Target {
707 self.time
708 }
709}
710
711impl Clock for LiveClock {
712 fn timestamp_ns(&self) -> UnixNanos {
713 self.time.get_time_ns()
714 }
715
716 fn timestamp_us(&self) -> u64 {
717 self.time.get_time_us()
718 }
719
720 fn timestamp_ms(&self) -> u64 {
721 self.time.get_time_ms()
722 }
723
724 fn timestamp(&self) -> f64 {
725 self.time.get_time()
726 }
727
728 fn timer_names(&self) -> Vec<&str> {
729 self.timers
730 .iter()
731 .filter(|(_, timer)| !timer.is_expired())
732 .map(|(k, _)| k.as_str())
733 .collect()
734 }
735
736 fn timer_count(&self) -> usize {
737 self.timers
738 .iter()
739 .filter(|(_, timer)| !timer.is_expired())
740 .count()
741 }
742
743 fn timer_exists(&self, name: &Ustr) -> bool {
744 self.timers.contains_key(name)
745 }
746
747 fn register_default_handler(&mut self, handler: TimeEventCallback) {
748 self.default_callback = Some(handler);
749 }
750
751 #[allow(unused_variables)]
756 fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
757 let callback = self
759 .callbacks
760 .get(&event.name)
761 .cloned()
762 .or_else(|| self.default_callback.clone())
763 .unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
764
765 TimeEventHandlerV2::new(event, callback)
766 }
767
768 fn set_time_alert_ns(
769 &mut self,
770 name: &str,
771 mut alert_time_ns: UnixNanos, callback: Option<TimeEventCallback>,
773 allow_past: Option<bool>,
774 ) -> anyhow::Result<()> {
775 check_valid_string_ascii(name, stringify!(name))?;
776
777 let name = Ustr::from(name);
778 let allow_past = allow_past.unwrap_or(true);
779
780 if self.timer_exists(&name) {
781 self.cancel_timer(name.as_str());
782 log::warn!("Timer '{name}' replaced");
783 }
784
785 check_predicate_true(
786 callback.is_some()
787 | self.callbacks.contains_key(&name)
788 | self.default_callback.is_some(),
789 "No callbacks provided",
790 )?;
791
792 let callback = if let Some(callback) = callback {
793 self.callbacks.insert(name, callback.clone());
794 callback
795 } else if let Some(existing) = self.callbacks.get(&name) {
796 existing.clone()
797 } else {
798 let default = self
799 .default_callback
800 .clone()
801 .expect("Default callback should exist");
802 self.callbacks.insert(name, default.clone());
803 default
804 };
805
806 let ts_now = self.get_time_ns();
807
808 if alert_time_ns < ts_now {
810 if allow_past {
811 alert_time_ns = ts_now;
812 log::warn!(
813 "Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
814 alert_time_ns.to_rfc3339(),
815 );
816 } else {
817 anyhow::bail!(
818 "Timer '{name}' alert time {} was in the past (current time is {})",
819 alert_time_ns.to_rfc3339(),
820 ts_now.to_rfc3339(),
821 );
822 }
823 }
824
825 let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
827
828 let mut timer = LiveTimer::new(
829 name,
830 interval_ns,
831 ts_now,
832 Some(alert_time_ns),
833 callback,
834 false,
835 self.sender.clone(),
836 );
837
838 timer.start();
839
840 self.clear_expired_timers();
841 self.timers.insert(name, timer);
842
843 Ok(())
844 }
845
846 fn set_timer_ns(
847 &mut self,
848 name: &str,
849 interval_ns: u64,
850 start_time_ns: Option<UnixNanos>,
851 stop_time_ns: Option<UnixNanos>,
852 callback: Option<TimeEventCallback>,
853 allow_past: Option<bool>,
854 fire_immediately: Option<bool>,
855 ) -> anyhow::Result<()> {
856 check_valid_string_ascii(name, stringify!(name))?;
857 check_positive_u64(interval_ns, stringify!(interval_ns))?;
858 check_predicate_true(
859 callback.is_some() | self.default_callback.is_some(),
860 "No callbacks provided",
861 )?;
862
863 let name = Ustr::from(name);
864 let allow_past = allow_past.unwrap_or(true);
865 let fire_immediately = fire_immediately.unwrap_or(false);
866
867 if self.timer_exists(&name) {
868 self.cancel_timer(name.as_str());
869 log::warn!("Timer '{name}' replaced");
870 }
871
872 let callback = match callback {
873 Some(callback) => callback,
874 None => self.default_callback.clone().unwrap(),
875 };
876
877 self.callbacks.insert(name, callback.clone());
878
879 let mut start_time_ns = start_time_ns.unwrap_or_default();
880 let ts_now = self.get_time_ns();
881
882 if start_time_ns == 0 {
883 start_time_ns = self.timestamp_ns();
885 } else if start_time_ns < ts_now && !allow_past {
886 anyhow::bail!(
887 "Timer '{name}' start time {} was in the past (current time is {})",
888 start_time_ns.to_rfc3339(),
889 ts_now.to_rfc3339(),
890 );
891 }
892
893 if let Some(stop_time) = stop_time_ns {
894 if stop_time <= start_time_ns {
895 anyhow::bail!(
896 "Timer '{name}' stop time {} must be after start time {}",
897 stop_time.to_rfc3339(),
898 start_time_ns.to_rfc3339(),
899 );
900 }
901 if !allow_past && stop_time <= ts_now {
902 anyhow::bail!(
903 "Timer '{name}' stop time {} is in the past (current time is {})",
904 stop_time.to_rfc3339(),
905 ts_now.to_rfc3339(),
906 );
907 }
908 }
909
910 let interval_ns = create_valid_interval(interval_ns);
911
912 let mut timer = LiveTimer::new(
913 name,
914 interval_ns,
915 start_time_ns,
916 stop_time_ns,
917 callback,
918 fire_immediately,
919 self.sender.clone(),
920 );
921 timer.start();
922
923 self.clear_expired_timers();
924 self.timers.insert(name, timer);
925
926 Ok(())
927 }
928
929 fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
930 self.timers
931 .get(&Ustr::from(name))
932 .map(|timer| timer.next_time_ns())
933 }
934
935 fn cancel_timer(&mut self, name: &str) {
936 let timer = self.timers.remove(&Ustr::from(name));
937 if let Some(mut timer) = timer {
938 timer.cancel();
939 }
940 }
941
942 fn cancel_timers(&mut self) {
943 for timer in &mut self.timers.values_mut() {
944 timer.cancel();
945 }
946
947 self.timers.clear();
948 }
949
950 fn reset(&mut self) {
951 self.cancel_timers();
952 self.callbacks.clear();
953 }
954}
955
956#[derive(Debug)]
958pub struct TimeEventStream {
959 heap: Arc<Mutex<BinaryHeap<ScheduledTimeEvent>>>,
960}
961
962impl TimeEventStream {
963 pub const fn new(heap: Arc<Mutex<BinaryHeap<ScheduledTimeEvent>>>) -> Self {
964 Self { heap }
965 }
966}
967
968impl Stream for TimeEventStream {
969 type Item = TimeEvent;
970
971 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
972 let mut heap = match self.heap.try_lock() {
973 Ok(guard) => guard,
974 Err(e) => {
975 tracing::error!("Unable to get LiveClock heap lock: {e}");
976 cx.waker().wake_by_ref();
977 return Poll::Pending;
978 }
979 };
980
981 if let Some(event) = heap.pop() {
982 Poll::Ready(Some(event.into_inner()))
983 } else {
984 cx.waker().wake_by_ref();
985 Poll::Pending
986 }
987 }
988}
989
990#[cfg(test)]
995mod tests {
996 use std::{
997 sync::{Arc, Mutex},
998 time::Duration,
999 };
1000
1001 use nautilus_core::time::get_atomic_clock_realtime;
1002 use rstest::{fixture, rstest};
1003 use ustr::Ustr;
1004
1005 use super::*;
1006 use crate::{runner::TimeEventSender, testing::wait_until};
1007
1008 #[derive(Debug, Default)]
1009 struct TestCallback {
1010 called: Arc<Mutex<bool>>,
1012 }
1013
1014 impl TestCallback {
1015 fn new(called: Arc<Mutex<bool>>) -> Self {
1016 Self { called }
1017 }
1018 }
1019
1020 impl From<TestCallback> for TimeEventCallback {
1021 fn from(callback: TestCallback) -> Self {
1022 TimeEventCallback::from(move |_event: TimeEvent| {
1023 if let Ok(mut called) = callback.called.lock() {
1024 *called = true;
1025 }
1026 })
1027 }
1028 }
1029
1030 #[derive(Debug)]
1031 struct CollectingSender {
1032 events: Arc<Mutex<Vec<(TimeEvent, UnixNanos)>>>,
1033 }
1034
1035 impl CollectingSender {
1036 fn new(events: Arc<Mutex<Vec<(TimeEvent, UnixNanos)>>>) -> Self {
1037 Self { events }
1038 }
1039 }
1040
1041 impl TimeEventSender for CollectingSender {
1042 fn send(&self, handler: TimeEventHandlerV2) {
1043 let TimeEventHandlerV2 { event, callback } = handler;
1044 let now_ns = get_atomic_clock_realtime().get_time_ns();
1045 let event_clone = event.clone();
1046 callback.call(event);
1047 self.events.lock().unwrap().push((event_clone, now_ns));
1048 }
1049 }
1050
1051 fn wait_for_events(
1052 events: &Arc<Mutex<Vec<(TimeEvent, UnixNanos)>>>,
1053 target: usize,
1054 timeout: Duration,
1055 ) {
1056 wait_until(|| events.lock().unwrap().len() >= target, timeout);
1057 }
1058
1059 #[fixture]
1060 pub fn test_clock() -> TestClock {
1061 let mut clock = TestClock::new();
1062 clock.register_default_handler(TestCallback::default().into());
1063 clock
1064 }
1065
1066 #[rstest]
1067 fn test_time_monotonicity(mut test_clock: TestClock) {
1068 let initial_time = test_clock.timestamp_ns();
1069 test_clock.advance_time(UnixNanos::from(*initial_time + 1000), true);
1070 assert!(test_clock.timestamp_ns() > initial_time);
1071 }
1072
1073 #[rstest]
1074 fn test_timer_registration(mut test_clock: TestClock) {
1075 test_clock
1076 .set_time_alert_ns(
1077 "test_timer",
1078 (*test_clock.timestamp_ns() + 1000).into(),
1079 None,
1080 None,
1081 )
1082 .unwrap();
1083 assert_eq!(test_clock.timer_count(), 1);
1084 assert_eq!(test_clock.timer_names(), vec!["test_timer"]);
1085 }
1086
1087 #[rstest]
1088 fn test_timer_expiration(mut test_clock: TestClock) {
1089 let alert_time = (*test_clock.timestamp_ns() + 1000).into();
1090 test_clock
1091 .set_time_alert_ns("test_timer", alert_time, None, None)
1092 .unwrap();
1093 let events = test_clock.advance_time(alert_time, true);
1094 assert_eq!(events.len(), 1);
1095 assert_eq!(events[0].name.as_str(), "test_timer");
1096 }
1097
1098 #[rstest]
1099 fn test_timer_cancellation(mut test_clock: TestClock) {
1100 test_clock
1101 .set_time_alert_ns(
1102 "test_timer",
1103 (*test_clock.timestamp_ns() + 1000).into(),
1104 None,
1105 None,
1106 )
1107 .unwrap();
1108 assert_eq!(test_clock.timer_count(), 1);
1109 test_clock.cancel_timer("test_timer");
1110 assert_eq!(test_clock.timer_count(), 0);
1111 }
1112
1113 #[rstest]
1114 fn test_time_advancement(mut test_clock: TestClock) {
1115 let start_time = test_clock.timestamp_ns();
1116 test_clock
1117 .set_timer_ns("test_timer", 1000, Some(start_time), None, None, None, None)
1118 .unwrap();
1119 let events = test_clock.advance_time(UnixNanos::from(*start_time + 2500), true);
1120 assert_eq!(events.len(), 2);
1121 assert_eq!(*events[0].ts_event, *start_time + 1000);
1122 assert_eq!(*events[1].ts_event, *start_time + 2000);
1123 }
1124
1125 #[rstest]
1126 fn test_default_and_custom_callbacks() {
1127 let mut clock = TestClock::new();
1128 let default_called = Arc::new(Mutex::new(false));
1129 let custom_called = Arc::new(Mutex::new(false));
1130
1131 let default_callback = TestCallback::new(Arc::clone(&default_called));
1132 let custom_callback = TestCallback::new(Arc::clone(&custom_called));
1133
1134 clock.register_default_handler(TimeEventCallback::from(default_callback));
1135 clock
1136 .set_time_alert_ns(
1137 "default_timer",
1138 (*clock.timestamp_ns() + 1000).into(),
1139 None,
1140 None,
1141 )
1142 .unwrap();
1143 clock
1144 .set_time_alert_ns(
1145 "custom_timer",
1146 (*clock.timestamp_ns() + 1000).into(),
1147 Some(TimeEventCallback::from(custom_callback)),
1148 None,
1149 )
1150 .unwrap();
1151
1152 let events = clock.advance_time(UnixNanos::from(*clock.timestamp_ns() + 1000), true);
1153 let handlers = clock.match_handlers(events);
1154
1155 for handler in handlers {
1156 handler.callback.call(handler.event);
1157 }
1158
1159 assert!(*default_called.lock().unwrap());
1160 assert!(*custom_called.lock().unwrap());
1161 }
1162
1163 #[rstest]
1164 fn test_multiple_timers(mut test_clock: TestClock) {
1165 let start_time = test_clock.timestamp_ns();
1166 test_clock
1167 .set_timer_ns("timer1", 1000, Some(start_time), None, None, None, None)
1168 .unwrap();
1169 test_clock
1170 .set_timer_ns("timer2", 2000, Some(start_time), None, None, None, None)
1171 .unwrap();
1172 let events = test_clock.advance_time(UnixNanos::from(*start_time + 2000), true);
1173 assert_eq!(events.len(), 3);
1174 assert_eq!(events[0].name.as_str(), "timer1");
1175 assert_eq!(events[1].name.as_str(), "timer1");
1176 assert_eq!(events[2].name.as_str(), "timer2");
1177 }
1178
1179 #[rstest]
1180 fn test_allow_past_parameter_true(mut test_clock: TestClock) {
1181 test_clock.set_time(UnixNanos::from(2000));
1182 let current_time = test_clock.timestamp_ns();
1183 let past_time = UnixNanos::from(current_time.as_u64() - 1000);
1184
1185 test_clock
1187 .set_time_alert_ns("past_timer", past_time, None, Some(true))
1188 .unwrap();
1189
1190 assert_eq!(test_clock.timer_count(), 1);
1192 assert_eq!(test_clock.timer_names(), vec!["past_timer"]);
1193
1194 let next_time = test_clock.next_time_ns("past_timer").unwrap();
1196 assert!(next_time >= current_time);
1197 }
1198
1199 #[rstest]
1200 fn test_allow_past_parameter_false(mut test_clock: TestClock) {
1201 test_clock.set_time(UnixNanos::from(2000));
1202 let current_time = test_clock.timestamp_ns();
1203 let past_time = current_time - 1000;
1204
1205 let result = test_clock.set_time_alert_ns("past_timer", past_time, None, Some(false));
1207
1208 assert!(result.is_err());
1210 assert!(format!("{}", result.unwrap_err()).contains("was in the past"));
1211
1212 assert_eq!(test_clock.timer_count(), 0);
1214 assert!(test_clock.timer_names().is_empty());
1215 }
1216
1217 #[rstest]
1218 fn test_invalid_stop_time_validation(mut test_clock: TestClock) {
1219 test_clock.set_time(UnixNanos::from(2000));
1220 let current_time = test_clock.timestamp_ns();
1221 let start_time = current_time + 1000;
1222 let stop_time = current_time + 500; let result = test_clock.set_timer_ns(
1226 "invalid_timer",
1227 100,
1228 Some(start_time),
1229 Some(stop_time),
1230 None,
1231 None,
1232 None,
1233 );
1234
1235 assert!(result.is_err());
1237 assert!(format!("{}", result.unwrap_err()).contains("must be after start time"));
1238
1239 assert_eq!(test_clock.timer_count(), 0);
1241 }
1242
1243 #[rstest]
1244 fn test_set_timer_ns_fire_immediately_true(mut test_clock: TestClock) {
1245 let start_time = test_clock.timestamp_ns();
1246 let interval_ns = 1000;
1247
1248 test_clock
1249 .set_timer_ns(
1250 "fire_immediately_timer",
1251 interval_ns,
1252 Some(start_time),
1253 None,
1254 None,
1255 None,
1256 Some(true),
1257 )
1258 .unwrap();
1259
1260 let events = test_clock.advance_time(start_time + 2500, true);
1262
1263 assert_eq!(events.len(), 3);
1265 assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *start_time + 1000); assert_eq!(*events[2].ts_event, *start_time + 2000); }
1269
1270 #[rstest]
1271 fn test_set_timer_ns_fire_immediately_false(mut test_clock: TestClock) {
1272 let start_time = test_clock.timestamp_ns();
1273 let interval_ns = 1000;
1274
1275 test_clock
1276 .set_timer_ns(
1277 "normal_timer",
1278 interval_ns,
1279 Some(start_time),
1280 None,
1281 None,
1282 None,
1283 Some(false),
1284 )
1285 .unwrap();
1286
1287 let events = test_clock.advance_time(start_time + 2500, true);
1289
1290 assert_eq!(events.len(), 2);
1292 assert_eq!(*events[0].ts_event, *start_time + 1000); assert_eq!(*events[1].ts_event, *start_time + 2000); }
1295
1296 #[rstest]
1297 fn test_set_timer_ns_fire_immediately_default_is_false(mut test_clock: TestClock) {
1298 let start_time = test_clock.timestamp_ns();
1299 let interval_ns = 1000;
1300
1301 test_clock
1303 .set_timer_ns(
1304 "default_timer",
1305 interval_ns,
1306 Some(start_time),
1307 None,
1308 None,
1309 None,
1310 None,
1311 )
1312 .unwrap();
1313
1314 let events = test_clock.advance_time(start_time + 1500, true);
1315
1316 assert_eq!(events.len(), 1);
1318 assert_eq!(*events[0].ts_event, *start_time + 1000); }
1320
1321 #[rstest]
1322 fn test_set_timer_ns_fire_immediately_with_zero_start_time(mut test_clock: TestClock) {
1323 test_clock.set_time(5000.into());
1324 let interval_ns = 1000;
1325
1326 test_clock
1327 .set_timer_ns(
1328 "zero_start_timer",
1329 interval_ns,
1330 None,
1331 None,
1332 None,
1333 None,
1334 Some(true),
1335 )
1336 .unwrap();
1337
1338 let events = test_clock.advance_time(UnixNanos::from(7000), true);
1339
1340 assert_eq!(events.len(), 3);
1343 assert_eq!(*events[0].ts_event, 5000); assert_eq!(*events[1].ts_event, 6000);
1345 assert_eq!(*events[2].ts_event, 7000);
1346 }
1347
1348 #[rstest]
1349 fn test_multiple_timers_different_fire_immediately_settings(mut test_clock: TestClock) {
1350 let start_time = test_clock.timestamp_ns();
1351 let interval_ns = 1000;
1352
1353 test_clock
1355 .set_timer_ns(
1356 "immediate_timer",
1357 interval_ns,
1358 Some(start_time),
1359 None,
1360 None,
1361 None,
1362 Some(true),
1363 )
1364 .unwrap();
1365
1366 test_clock
1368 .set_timer_ns(
1369 "normal_timer",
1370 interval_ns,
1371 Some(start_time),
1372 None,
1373 None,
1374 None,
1375 Some(false),
1376 )
1377 .unwrap();
1378
1379 let events = test_clock.advance_time(start_time + 1500, true);
1380
1381 assert_eq!(events.len(), 3);
1383
1384 let mut event_times: Vec<u64> = events.iter().map(|e| e.ts_event.as_u64()).collect();
1386 event_times.sort();
1387
1388 assert_eq!(event_times[0], start_time.as_u64()); assert_eq!(event_times[1], start_time.as_u64() + 1000); assert_eq!(event_times[2], start_time.as_u64() + 1000); }
1392
1393 #[rstest]
1394 fn test_timer_name_collision_overwrites(mut test_clock: TestClock) {
1395 let start_time = test_clock.timestamp_ns();
1396
1397 test_clock
1399 .set_timer_ns(
1400 "collision_timer",
1401 1000,
1402 Some(start_time),
1403 None,
1404 None,
1405 None,
1406 None,
1407 )
1408 .unwrap();
1409
1410 let result = test_clock.set_timer_ns(
1412 "collision_timer",
1413 2000,
1414 Some(start_time),
1415 None,
1416 None,
1417 None,
1418 None,
1419 );
1420
1421 assert!(result.is_ok());
1422 assert_eq!(test_clock.timer_count(), 1);
1424
1425 let next_time = test_clock.next_time_ns("collision_timer").unwrap();
1427 assert_eq!(next_time, start_time + 2000);
1429 }
1430
1431 #[rstest]
1432 fn test_timer_zero_interval_error(mut test_clock: TestClock) {
1433 let start_time = test_clock.timestamp_ns();
1434
1435 let result =
1437 test_clock.set_timer_ns("zero_interval", 0, Some(start_time), None, None, None, None);
1438
1439 assert!(result.is_err());
1440 assert_eq!(test_clock.timer_count(), 0);
1441 }
1442
1443 #[rstest]
1444 fn test_timer_empty_name_error(mut test_clock: TestClock) {
1445 let start_time = test_clock.timestamp_ns();
1446
1447 let result = test_clock.set_timer_ns("", 1000, Some(start_time), None, None, None, None);
1449
1450 assert!(result.is_err());
1451 assert_eq!(test_clock.timer_count(), 0);
1452 }
1453
1454 #[rstest]
1455 fn test_timer_exists(mut test_clock: TestClock) {
1456 let name = Ustr::from("exists_timer");
1457 assert!(!test_clock.timer_exists(&name));
1458
1459 test_clock
1460 .set_time_alert_ns(
1461 name.as_str(),
1462 (*test_clock.timestamp_ns() + 1_000).into(),
1463 None,
1464 None,
1465 )
1466 .unwrap();
1467
1468 assert!(test_clock.timer_exists(&name));
1469 }
1470
1471 #[rstest]
1472 fn test_timer_rejects_past_stop_time_when_not_allowed(mut test_clock: TestClock) {
1473 test_clock.set_time(UnixNanos::from(10_000));
1474 let current = test_clock.timestamp_ns();
1475
1476 let result = test_clock.set_timer_ns(
1477 "past_stop",
1478 10_000,
1479 Some(current - 500),
1480 Some(current - 100),
1481 None,
1482 Some(false),
1483 None,
1484 );
1485
1486 let err = result.expect_err("expected stop time validation error");
1487 let err_msg = err.to_string();
1488 assert!(err_msg.contains("stop time"));
1489 assert!(err_msg.contains("in the past"));
1490 }
1491
1492 #[rstest]
1493 fn test_timer_accepts_future_stop_time(mut test_clock: TestClock) {
1494 let current = test_clock.timestamp_ns();
1495
1496 let result = test_clock.set_timer_ns(
1497 "future_stop",
1498 1_000,
1499 Some(current),
1500 Some(current + 10_000),
1501 None,
1502 Some(false),
1503 None,
1504 );
1505
1506 assert!(result.is_ok());
1507 }
1508
1509 #[rstest]
1510 fn test_live_clock_timer_replacement_cancels_previous_task() {
1511 let events = Arc::new(Mutex::new(Vec::new()));
1512 let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
1513
1514 let mut clock = LiveClock::new(Some(sender));
1515 clock.register_default_handler(TimeEventCallback::from(|_| {}));
1516
1517 let fast_interval = Duration::from_millis(10).as_nanos() as u64;
1518 clock
1519 .set_timer_ns("replace", fast_interval, None, None, None, None, None)
1520 .unwrap();
1521
1522 wait_for_events(&events, 2, Duration::from_millis(200));
1523 events.lock().unwrap().clear();
1524
1525 let slow_interval = Duration::from_millis(30).as_nanos() as u64;
1526 clock
1527 .set_timer_ns("replace", slow_interval, None, None, None, None, None)
1528 .unwrap();
1529
1530 wait_for_events(&events, 3, Duration::from_millis(300));
1531
1532 let snapshot = events.lock().unwrap().clone();
1533 let diffs: Vec<u64> = snapshot
1534 .windows(2)
1535 .map(|pair| pair[1].0.ts_event.as_u64() - pair[0].0.ts_event.as_u64())
1536 .collect();
1537
1538 assert!(!diffs.is_empty());
1539 for diff in diffs {
1540 assert_ne!(diff, fast_interval);
1541 }
1542
1543 clock.cancel_timers();
1544 }
1545
1546 #[rstest]
1547 fn test_live_clock_time_alert_persists_callback() {
1548 let events = Arc::new(Mutex::new(Vec::new()));
1549 let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
1550
1551 let mut clock = LiveClock::new(Some(sender));
1552 clock.register_default_handler(TimeEventCallback::from(|_| {}));
1553
1554 let now = clock.timestamp_ns();
1555 let alert_time = now + 1_000_u64;
1556
1557 clock
1558 .set_time_alert_ns("alert-callback", alert_time, None, None)
1559 .unwrap();
1560
1561 assert!(clock.callbacks.contains_key(&Ustr::from("alert-callback")));
1562
1563 clock.cancel_timers();
1564 }
1565
1566 #[rstest]
1567 fn test_live_clock_reset_stops_active_timers() {
1568 let events = Arc::new(Mutex::new(Vec::new()));
1569 let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
1570
1571 let mut clock = LiveClock::new(Some(sender));
1572 clock.register_default_handler(TimeEventCallback::from(|_| {}));
1573
1574 clock
1575 .set_timer_ns(
1576 "reset-test",
1577 Duration::from_millis(15).as_nanos() as u64,
1578 None,
1579 None,
1580 None,
1581 None,
1582 None,
1583 )
1584 .unwrap();
1585
1586 wait_for_events(&events, 2, Duration::from_millis(250));
1587
1588 clock.reset();
1589
1590 let start = std::time::Instant::now();
1592 wait_until(
1593 || start.elapsed() >= Duration::from_millis(50),
1594 Duration::from_secs(2),
1595 );
1596
1597 events.lock().unwrap().clear();
1599
1600 let start = std::time::Instant::now();
1602 wait_until(
1603 || start.elapsed() >= Duration::from_millis(50),
1604 Duration::from_secs(2),
1605 );
1606 assert!(events.lock().unwrap().is_empty());
1607 }
1608
1609 #[rstest]
1610 fn test_live_timer_short_delay_not_early() {
1611 let events = Arc::new(Mutex::new(Vec::new()));
1612 let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
1613
1614 let mut clock = LiveClock::new(Some(sender));
1615 clock.register_default_handler(TimeEventCallback::from(|_| {}));
1616
1617 let now = clock.timestamp_ns();
1618 let start_time = UnixNanos::from(*now + 500_000); let interval_ns = 1_000_000;
1620
1621 clock
1622 .set_timer_ns(
1623 "short-delay",
1624 interval_ns,
1625 Some(start_time),
1626 None,
1627 None,
1628 None,
1629 Some(true),
1630 )
1631 .unwrap();
1632
1633 wait_for_events(&events, 1, Duration::from_millis(100));
1634
1635 let snapshot = events.lock().unwrap().clone();
1636 assert!(!snapshot.is_empty());
1637
1638 for (event, actual_ts) in &snapshot {
1639 assert!(actual_ts.as_u64() >= event.ts_event.as_u64());
1640 }
1641
1642 clock.cancel_timers();
1643 }
1644
1645 #[rstest]
1646 fn test_timer_fire_immediately_at_exact_stop_time(mut test_clock: TestClock) {
1647 let start_time = test_clock.timestamp_ns();
1648 let interval_ns = 1000;
1649 let stop_time = start_time + interval_ns; test_clock
1652 .set_timer_ns(
1653 "exact_stop",
1654 interval_ns,
1655 Some(start_time),
1656 Some(stop_time),
1657 None,
1658 None,
1659 Some(true),
1660 )
1661 .unwrap();
1662
1663 let events = test_clock.advance_time(stop_time, true);
1664
1665 assert_eq!(events.len(), 2);
1667 assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *stop_time); }
1670
1671 #[rstest]
1672 fn test_timer_advance_to_exact_next_time(mut test_clock: TestClock) {
1673 let start_time = test_clock.timestamp_ns();
1674 let interval_ns = 1000;
1675
1676 test_clock
1677 .set_timer_ns(
1678 "exact_advance",
1679 interval_ns,
1680 Some(start_time),
1681 None,
1682 None,
1683 None,
1684 Some(false),
1685 )
1686 .unwrap();
1687
1688 let next_time = test_clock.next_time_ns("exact_advance").unwrap();
1690 let events = test_clock.advance_time(next_time, true);
1691
1692 assert_eq!(events.len(), 1);
1693 assert_eq!(*events[0].ts_event, *next_time);
1694 }
1695
1696 #[rstest]
1697 fn test_allow_past_bar_aggregation_use_case(mut test_clock: TestClock) {
1698 test_clock.set_time(UnixNanos::from(100_500)); let bar_start_time = UnixNanos::from(100_000); let interval_ns = 1000; let result = test_clock.set_timer_ns(
1708 "bar_timer",
1709 interval_ns,
1710 Some(bar_start_time),
1711 None,
1712 None,
1713 Some(false), Some(false), );
1716
1717 assert!(result.is_ok());
1719 assert_eq!(test_clock.timer_count(), 1);
1720
1721 let next_time = test_clock.next_time_ns("bar_timer").unwrap();
1723 assert_eq!(*next_time, 101_000);
1724 }
1725
1726 #[rstest]
1727 fn test_allow_past_false_rejects_when_next_event_in_past(mut test_clock: TestClock) {
1728 test_clock.set_time(UnixNanos::from(102_000)); let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000; let result = test_clock.set_timer_ns(
1737 "past_event_timer",
1738 interval_ns,
1739 Some(past_start_time),
1740 None,
1741 None,
1742 Some(false), Some(false), );
1745
1746 assert!(result.is_err());
1748 assert!(
1749 result
1750 .unwrap_err()
1751 .to_string()
1752 .contains("would be in the past")
1753 );
1754 }
1755
1756 #[rstest]
1757 fn test_allow_past_false_with_fire_immediately_true(mut test_clock: TestClock) {
1758 test_clock.set_time(UnixNanos::from(100_500)); let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000;
1762
1763 let result = test_clock.set_timer_ns(
1766 "immediate_past_timer",
1767 interval_ns,
1768 Some(past_start_time),
1769 None,
1770 None,
1771 Some(false), Some(true), );
1774
1775 assert!(result.is_err());
1777 assert!(
1778 result
1779 .unwrap_err()
1780 .to_string()
1781 .contains("would be in the past")
1782 );
1783 }
1784
1785 #[rstest]
1786 fn test_cancel_timer_during_execution(mut test_clock: TestClock) {
1787 let start_time = test_clock.timestamp_ns();
1788
1789 test_clock
1790 .set_timer_ns(
1791 "cancel_test",
1792 1000,
1793 Some(start_time),
1794 None,
1795 None,
1796 None,
1797 None,
1798 )
1799 .unwrap();
1800
1801 assert_eq!(test_clock.timer_count(), 1);
1802
1803 test_clock.cancel_timer("cancel_test");
1805
1806 assert_eq!(test_clock.timer_count(), 0);
1807
1808 let events = test_clock.advance_time(start_time + 2000, true);
1810 assert_eq!(events.len(), 0);
1811 }
1812
1813 #[rstest]
1814 fn test_cancel_all_timers(mut test_clock: TestClock) {
1815 test_clock
1817 .set_timer_ns("timer1", 1000, None, None, None, None, None)
1818 .unwrap();
1819 test_clock
1820 .set_timer_ns("timer2", 1500, None, None, None, None, None)
1821 .unwrap();
1822 test_clock
1823 .set_timer_ns("timer3", 2000, None, None, None, None, None)
1824 .unwrap();
1825
1826 assert_eq!(test_clock.timer_count(), 3);
1827
1828 test_clock.cancel_timers();
1830
1831 assert_eq!(test_clock.timer_count(), 0);
1832
1833 let events = test_clock.advance_time(UnixNanos::from(5000), true);
1835 assert_eq!(events.len(), 0);
1836 }
1837
1838 #[rstest]
1839 fn test_clock_reset_clears_timers(mut test_clock: TestClock) {
1840 test_clock
1841 .set_timer_ns("reset_test", 1000, None, None, None, None, None)
1842 .unwrap();
1843
1844 assert_eq!(test_clock.timer_count(), 1);
1845
1846 test_clock.reset();
1848
1849 assert_eq!(test_clock.timer_count(), 0);
1850 assert_eq!(test_clock.timestamp_ns(), UnixNanos::default()); }
1852
1853 #[rstest]
1854 fn test_set_time_alert_default_impl(mut test_clock: TestClock) {
1855 let current_time = test_clock.utc_now();
1856 let alert_time = current_time + chrono::Duration::seconds(1);
1857
1858 test_clock
1860 .set_time_alert("alert_test", alert_time, None, None)
1861 .unwrap();
1862
1863 assert_eq!(test_clock.timer_count(), 1);
1864 assert_eq!(test_clock.timer_names(), vec!["alert_test"]);
1865
1866 let expected_ns = UnixNanos::from(alert_time);
1868 let next_time = test_clock.next_time_ns("alert_test").unwrap();
1869
1870 let diff = if next_time >= expected_ns {
1872 next_time.as_u64() - expected_ns.as_u64()
1873 } else {
1874 expected_ns.as_u64() - next_time.as_u64()
1875 };
1876 assert!(
1877 diff < 1000,
1878 "Timer should be set within 1 microsecond of expected time"
1879 );
1880 }
1881
1882 #[rstest]
1883 fn test_set_timer_default_impl(mut test_clock: TestClock) {
1884 let current_time = test_clock.utc_now();
1885 let start_time = current_time + chrono::Duration::seconds(1);
1886 let interval = Duration::from_millis(500);
1887
1888 test_clock
1890 .set_timer(
1891 "timer_test",
1892 interval,
1893 Some(start_time),
1894 None,
1895 None,
1896 None,
1897 None,
1898 )
1899 .unwrap();
1900
1901 assert_eq!(test_clock.timer_count(), 1);
1902 assert_eq!(test_clock.timer_names(), vec!["timer_test"]);
1903
1904 let start_ns = UnixNanos::from(start_time);
1906 let interval_ns = interval.as_nanos() as u64;
1907
1908 let events = test_clock.advance_time(start_ns + interval_ns * 3, true);
1909 assert_eq!(events.len(), 3); assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1913 assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1914 assert_eq!(*events[2].ts_event, *start_ns + interval_ns * 3);
1915 }
1916
1917 #[rstest]
1918 fn test_set_timer_with_stop_time_default_impl(mut test_clock: TestClock) {
1919 let current_time = test_clock.utc_now();
1920 let start_time = current_time + chrono::Duration::seconds(1);
1921 let stop_time = current_time + chrono::Duration::seconds(3);
1922 let interval = Duration::from_secs(1);
1923
1924 test_clock
1926 .set_timer(
1927 "timer_with_stop",
1928 interval,
1929 Some(start_time),
1930 Some(stop_time),
1931 None,
1932 None,
1933 None,
1934 )
1935 .unwrap();
1936
1937 assert_eq!(test_clock.timer_count(), 1);
1938
1939 let stop_ns = UnixNanos::from(stop_time);
1941 let events = test_clock.advance_time(stop_ns + 1000, true);
1942
1943 assert_eq!(events.len(), 2);
1945
1946 let start_ns = UnixNanos::from(start_time);
1947 let interval_ns = interval.as_nanos() as u64;
1948 assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1949 assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1950 }
1951
1952 #[rstest]
1953 fn test_set_timer_fire_immediately_default_impl(mut test_clock: TestClock) {
1954 let current_time = test_clock.utc_now();
1955 let start_time = current_time + chrono::Duration::seconds(1);
1956 let interval = Duration::from_millis(500);
1957
1958 test_clock
1960 .set_timer(
1961 "immediate_timer",
1962 interval,
1963 Some(start_time),
1964 None,
1965 None,
1966 None,
1967 Some(true),
1968 )
1969 .unwrap();
1970
1971 let start_ns = UnixNanos::from(start_time);
1972 let interval_ns = interval.as_nanos() as u64;
1973
1974 let events = test_clock.advance_time(start_ns + interval_ns, true);
1976
1977 assert_eq!(events.len(), 2);
1979 assert_eq!(*events[0].ts_event, *start_ns); assert_eq!(*events[1].ts_event, *start_ns + interval_ns); }
1982
1983 #[rstest]
1984 fn test_set_time_alert_when_alert_time_equals_current_time(mut test_clock: TestClock) {
1985 let current_time = test_clock.timestamp_ns();
1986
1987 test_clock
1989 .set_time_alert_ns("alert_at_current_time", current_time, None, None)
1990 .unwrap();
1991
1992 assert_eq!(test_clock.timer_count(), 1);
1993
1994 let events = test_clock.advance_time(current_time, true);
1996
1997 assert_eq!(events.len(), 1);
1999 assert_eq!(events[0].name.as_str(), "alert_at_current_time");
2000 assert_eq!(*events[0].ts_event, *current_time);
2001 }
2002}