1use std::{
19 collections::{BTreeMap, BinaryHeap, HashMap},
20 fmt::Debug,
21 ops::Deref,
22 pin::Pin,
23 sync::Arc,
24 task::{Context, Poll},
25 time::Duration,
26};
27
28use chrono::{DateTime, Utc};
29use futures::Stream;
30use nautilus_core::{
31 AtomicTime, UnixNanos,
32 correctness::{check_positive_u64, check_predicate_true, check_valid_string},
33 time::get_atomic_clock_realtime,
34};
35use tokio::sync::Mutex;
36use ustr::Ustr;
37
38use crate::{
39 runner::{TimeEventSender, get_time_event_sender},
40 timer::{
41 LiveTimer, TestTimer, TimeEvent, TimeEventCallback, TimeEventHandlerV2,
42 create_valid_interval,
43 },
44};
45
46pub trait Clock: Debug {
52 fn utc_now(&self) -> DateTime<Utc> {
54 DateTime::from_timestamp_nanos(self.timestamp_ns().as_i64())
55 }
56
57 fn timestamp_ns(&self) -> UnixNanos;
59
60 fn timestamp_us(&self) -> u64;
62
63 fn timestamp_ms(&self) -> u64;
65
66 fn timestamp(&self) -> f64;
68
69 fn timer_names(&self) -> Vec<&str>;
71
72 fn timer_count(&self) -> usize;
74
75 fn register_default_handler(&mut self, callback: TimeEventCallback);
78
79 fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2;
83
84 #[allow(clippy::too_many_arguments)]
98 fn set_time_alert(
99 &mut self,
100 name: &str,
101 alert_time: DateTime<Utc>,
102 callback: Option<TimeEventCallback>,
103 allow_past: Option<bool>,
104 ) -> anyhow::Result<()> {
105 self.set_time_alert_ns(name, alert_time.into(), callback, allow_past)
106 }
107
108 #[allow(clippy::too_many_arguments)]
127 fn set_time_alert_ns(
128 &mut self,
129 name: &str,
130 alert_time_ns: UnixNanos,
131 callback: Option<TimeEventCallback>,
132 allow_past: Option<bool>,
133 ) -> anyhow::Result<()>;
134
135 #[allow(clippy::too_many_arguments)]
149 fn set_timer(
150 &mut self,
151 name: &str,
152 interval: Duration,
153 start_time: Option<DateTime<Utc>>,
154 stop_time: Option<DateTime<Utc>>,
155 callback: Option<TimeEventCallback>,
156 allow_past: Option<bool>,
157 fire_immediately: Option<bool>,
158 ) -> anyhow::Result<()> {
159 self.set_timer_ns(
160 name,
161 interval.as_nanos() as u64,
162 start_time.map(UnixNanos::from),
163 stop_time.map(UnixNanos::from),
164 callback,
165 allow_past,
166 fire_immediately,
167 )
168 }
169
170 #[allow(clippy::too_many_arguments)]
191 fn set_timer_ns(
192 &mut self,
193 name: &str,
194 interval_ns: u64,
195 start_time_ns: Option<UnixNanos>,
196 stop_time_ns: Option<UnixNanos>,
197 callback: Option<TimeEventCallback>,
198 allow_past: Option<bool>,
199 fire_immediately: Option<bool>,
200 ) -> anyhow::Result<()>;
201
202 fn next_time_ns(&self, name: &str) -> Option<UnixNanos>;
206
207 fn cancel_timer(&mut self, name: &str);
209
210 fn cancel_timers(&mut self);
212
213 fn reset(&mut self);
215}
216
217#[derive(Debug)]
221pub struct TestClock {
222 time: AtomicTime,
223 timers: BTreeMap<Ustr, TestTimer>,
225 default_callback: Option<TimeEventCallback>,
226 callbacks: HashMap<Ustr, TimeEventCallback>,
227 heap: BinaryHeap<TimeEvent>, }
229
230impl TestClock {
231 #[must_use]
233 pub fn new() -> Self {
234 Self {
235 time: AtomicTime::new(false, UnixNanos::default()),
236 timers: BTreeMap::new(),
237 default_callback: None,
238 callbacks: HashMap::new(),
239 heap: BinaryHeap::new(),
240 }
241 }
242
243 #[must_use]
245 pub const fn get_timers(&self) -> &BTreeMap<Ustr, TestTimer> {
246 &self.timers
247 }
248
249 pub fn advance_time(&mut self, to_time_ns: UnixNanos, set_time: bool) -> Vec<TimeEvent> {
262 assert!(
264 to_time_ns >= self.time.get_time_ns(),
265 "`to_time_ns` {to_time_ns} was < `self.time.get_time_ns()` {}",
266 self.time.get_time_ns()
267 );
268
269 if set_time {
270 self.time.set_time(to_time_ns);
271 }
272
273 let mut events: Vec<TimeEvent> = Vec::new();
275 self.timers.retain(|_, timer| {
276 timer.advance(to_time_ns).for_each(|event| {
277 events.push(event);
278 });
279
280 !timer.is_expired()
281 });
282
283 events.sort_by(|a, b| a.ts_event.cmp(&b.ts_event));
284 events
285 }
286
287 pub fn advance_to_time_on_heap(&mut self, to_time_ns: UnixNanos) {
297 assert!(
299 to_time_ns >= self.time.get_time_ns(),
300 "`to_time_ns` {to_time_ns} was < `self.time.get_time_ns()` {}",
301 self.time.get_time_ns()
302 );
303
304 self.time.set_time(to_time_ns);
305
306 self.timers.retain(|_, timer| {
308 timer.advance(to_time_ns).for_each(|event| {
309 self.heap.push(event);
310 });
311
312 !timer.is_expired()
313 });
314 }
315
316 #[must_use]
326 pub fn match_handlers(&self, events: Vec<TimeEvent>) -> Vec<TimeEventHandlerV2> {
327 events
328 .into_iter()
329 .map(|event| {
330 let callback = self.callbacks.get(&event.name).cloned().unwrap_or_else(|| {
331 self.default_callback
334 .clone()
335 .expect("Default callback should exist")
336 });
337 TimeEventHandlerV2::new(event, callback)
338 })
339 .collect()
340 }
341}
342
343impl Iterator for TestClock {
344 type Item = TimeEventHandlerV2;
345
346 fn next(&mut self) -> Option<Self::Item> {
347 self.heap.pop().map(|event| self.get_handler(event))
348 }
349}
350
351impl Default for TestClock {
352 fn default() -> Self {
354 Self::new()
355 }
356}
357
358impl Deref for TestClock {
359 type Target = AtomicTime;
360
361 fn deref(&self) -> &Self::Target {
362 &self.time
363 }
364}
365
366impl Clock for TestClock {
367 fn timestamp_ns(&self) -> UnixNanos {
368 self.time.get_time_ns()
369 }
370
371 fn timestamp_us(&self) -> u64 {
372 self.time.get_time_us()
373 }
374
375 fn timestamp_ms(&self) -> u64 {
376 self.time.get_time_ms()
377 }
378
379 fn timestamp(&self) -> f64 {
380 self.time.get_time()
381 }
382
383 fn timer_names(&self) -> Vec<&str> {
384 self.timers
385 .iter()
386 .filter(|(_, timer)| !timer.is_expired())
387 .map(|(k, _)| k.as_str())
388 .collect()
389 }
390
391 fn timer_count(&self) -> usize {
392 self.timers
393 .iter()
394 .filter(|(_, timer)| !timer.is_expired())
395 .count()
396 }
397
398 fn register_default_handler(&mut self, callback: TimeEventCallback) {
399 self.default_callback = Some(callback);
400 }
401
402 fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
408 let callback = self
410 .callbacks
411 .get(&event.name)
412 .cloned()
413 .or_else(|| self.default_callback.clone())
414 .unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
415
416 TimeEventHandlerV2::new(event, callback)
417 }
418
419 fn set_time_alert_ns(
420 &mut self,
421 name: &str,
422 mut alert_time_ns: UnixNanos, callback: Option<TimeEventCallback>,
424 allow_past: Option<bool>,
425 ) -> anyhow::Result<()> {
426 check_valid_string(name, stringify!(name))?;
427
428 let name = Ustr::from(name);
429 let allow_past = allow_past.unwrap_or(true);
430
431 check_predicate_true(
432 callback.is_some()
433 | self.callbacks.contains_key(&name)
434 | self.default_callback.is_some(),
435 "No callbacks provided",
436 )?;
437
438 match callback {
439 Some(callback_py) => self.callbacks.insert(name, callback_py),
440 None => None,
441 };
442
443 self.cancel_timer(name.as_str());
445
446 let ts_now = self.get_time_ns();
447
448 if alert_time_ns < ts_now {
449 if allow_past {
450 alert_time_ns = ts_now;
451 log::warn!(
452 "Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
453 alert_time_ns.to_rfc3339(),
454 );
455 } else {
456 anyhow::bail!(
457 "Timer '{name}' alert time {} was in the past (current time is {})",
458 alert_time_ns.to_rfc3339(),
459 ts_now.to_rfc3339(),
460 );
461 }
462 }
463
464 let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
466 let fire_immediately = alert_time_ns == ts_now;
468
469 let timer = TestTimer::new(
470 name,
471 interval_ns,
472 ts_now,
473 Some(alert_time_ns),
474 fire_immediately,
475 );
476 self.timers.insert(name, timer);
477
478 Ok(())
479 }
480
481 fn set_timer_ns(
482 &mut self,
483 name: &str,
484 interval_ns: u64,
485 start_time_ns: Option<UnixNanos>,
486 stop_time_ns: Option<UnixNanos>,
487 callback: Option<TimeEventCallback>,
488 allow_past: Option<bool>,
489 fire_immediately: Option<bool>,
490 ) -> anyhow::Result<()> {
491 check_valid_string(name, stringify!(name))?;
492 check_positive_u64(interval_ns, stringify!(interval_ns))?;
493 check_predicate_true(
494 callback.is_some() | self.default_callback.is_some(),
495 "No callbacks provided",
496 )?;
497
498 let name = Ustr::from(name);
499 let allow_past = allow_past.unwrap_or(true);
500 let fire_immediately = fire_immediately.unwrap_or(false);
501
502 match callback {
503 Some(callback_py) => self.callbacks.insert(name, callback_py),
504 None => None,
505 };
506
507 let mut start_time_ns = start_time_ns.unwrap_or_default();
508 let ts_now = self.get_time_ns();
509
510 if start_time_ns == 0 {
511 start_time_ns = self.timestamp_ns();
513 } else if !allow_past {
514 let next_event_time = if fire_immediately {
516 start_time_ns
517 } else {
518 start_time_ns + interval_ns
519 };
520
521 if next_event_time < ts_now {
523 anyhow::bail!(
524 "Timer '{name}' next event time {} would be in the past (current time is {})",
525 next_event_time.to_rfc3339(),
526 ts_now.to_rfc3339(),
527 );
528 }
529 }
530
531 if let Some(stop_time) = stop_time_ns
532 && stop_time <= start_time_ns
533 {
534 anyhow::bail!(
535 "Timer '{name}' stop time {} must be after start time {}",
536 stop_time.to_rfc3339(),
537 start_time_ns.to_rfc3339(),
538 );
539 }
540
541 let interval_ns = create_valid_interval(interval_ns);
542
543 let timer = TestTimer::new(
544 name,
545 interval_ns,
546 start_time_ns,
547 stop_time_ns,
548 fire_immediately,
549 );
550 self.timers.insert(name, timer);
551
552 Ok(())
553 }
554
555 fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
556 self.timers
557 .get(&Ustr::from(name))
558 .map(|timer| timer.next_time_ns())
559 }
560
561 fn cancel_timer(&mut self, name: &str) {
562 let timer = self.timers.remove(&Ustr::from(name));
563 if let Some(mut timer) = timer {
564 timer.cancel();
565 }
566 }
567
568 fn cancel_timers(&mut self) {
569 for timer in &mut self.timers.values_mut() {
570 timer.cancel();
571 }
572
573 self.timers.clear();
574 }
575
576 fn reset(&mut self) {
577 self.time = AtomicTime::new(false, UnixNanos::default());
578 self.timers = BTreeMap::new();
579 self.heap = BinaryHeap::new();
580 self.callbacks = HashMap::new();
581 }
582}
583
584#[derive(Debug)]
588pub struct LiveClock {
589 time: &'static AtomicTime,
590 timers: HashMap<Ustr, LiveTimer>,
591 default_callback: Option<TimeEventCallback>,
592 callbacks: HashMap<Ustr, TimeEventCallback>,
593 sender: Option<Arc<dyn TimeEventSender>>,
594}
595
596impl LiveClock {
597 #[must_use]
599 pub fn new(sender: Option<Arc<dyn TimeEventSender>>) -> Self {
600 Self {
601 time: get_atomic_clock_realtime(),
602 timers: HashMap::new(),
603 default_callback: None,
604 callbacks: HashMap::new(),
605 sender,
606 }
607 }
608
609 #[must_use]
610 pub const fn get_timers(&self) -> &HashMap<Ustr, LiveTimer> {
611 &self.timers
612 }
613
614 fn clear_expired_timers(&mut self) {
616 self.timers.retain(|_, timer| !timer.is_expired());
617 }
618}
619
620impl Default for LiveClock {
621 fn default() -> Self {
623 Self::new(Some(get_time_event_sender()))
624 }
625}
626
627impl Deref for LiveClock {
628 type Target = AtomicTime;
629
630 fn deref(&self) -> &Self::Target {
631 self.time
632 }
633}
634
635impl Clock for LiveClock {
636 fn timestamp_ns(&self) -> UnixNanos {
637 self.time.get_time_ns()
638 }
639
640 fn timestamp_us(&self) -> u64 {
641 self.time.get_time_us()
642 }
643
644 fn timestamp_ms(&self) -> u64 {
645 self.time.get_time_ms()
646 }
647
648 fn timestamp(&self) -> f64 {
649 self.time.get_time()
650 }
651
652 fn timer_names(&self) -> Vec<&str> {
653 self.timers
654 .iter()
655 .filter(|(_, timer)| !timer.is_expired())
656 .map(|(k, _)| k.as_str())
657 .collect()
658 }
659
660 fn timer_count(&self) -> usize {
661 self.timers
662 .iter()
663 .filter(|(_, timer)| !timer.is_expired())
664 .count()
665 }
666
667 fn register_default_handler(&mut self, handler: TimeEventCallback) {
668 self.default_callback = Some(handler);
669 }
670
671 #[allow(unused_variables)]
676 fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
677 let callback = self
679 .callbacks
680 .get(&event.name)
681 .cloned()
682 .or_else(|| self.default_callback.clone())
683 .unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
684
685 TimeEventHandlerV2::new(event, callback)
686 }
687
688 fn set_time_alert_ns(
689 &mut self,
690 name: &str,
691 mut alert_time_ns: UnixNanos, callback: Option<TimeEventCallback>,
693 allow_past: Option<bool>,
694 ) -> anyhow::Result<()> {
695 check_valid_string(name, stringify!(name))?;
696
697 let name = Ustr::from(name);
698 let allow_past = allow_past.unwrap_or(true);
699
700 check_predicate_true(
701 callback.is_some()
702 | self.callbacks.contains_key(&name)
703 | self.default_callback.is_some(),
704 "No callbacks provided",
705 )?;
706
707 let callback = match callback {
708 Some(callback) => callback,
709 None => {
710 if self.callbacks.contains_key(&name) {
711 self.callbacks.get(&name).unwrap().clone()
712 } else {
713 self.default_callback.clone().unwrap()
714 }
715 }
716 };
717
718 self.cancel_timer(name.as_str());
720
721 let ts_now = self.get_time_ns();
722
723 if alert_time_ns < ts_now {
725 if allow_past {
726 alert_time_ns = ts_now;
727 log::warn!(
728 "Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
729 alert_time_ns.to_rfc3339(),
730 );
731 } else {
732 anyhow::bail!(
733 "Timer '{name}' alert time {} was in the past (current time is {})",
734 alert_time_ns.to_rfc3339(),
735 ts_now.to_rfc3339(),
736 );
737 }
738 }
739
740 let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
742
743 let mut timer = LiveTimer::new(
744 name,
745 interval_ns,
746 ts_now,
747 Some(alert_time_ns),
748 callback,
749 false,
750 self.sender.clone(),
751 );
752
753 timer.start();
754
755 self.clear_expired_timers();
756 self.timers.insert(name, timer);
757
758 Ok(())
759 }
760
761 fn set_timer_ns(
762 &mut self,
763 name: &str,
764 interval_ns: u64,
765 start_time_ns: Option<UnixNanos>,
766 stop_time_ns: Option<UnixNanos>,
767 callback: Option<TimeEventCallback>,
768 allow_past: Option<bool>,
769 fire_immediately: Option<bool>,
770 ) -> anyhow::Result<()> {
771 check_valid_string(name, stringify!(name))?;
772 check_positive_u64(interval_ns, stringify!(interval_ns))?;
773 check_predicate_true(
774 callback.is_some() | self.default_callback.is_some(),
775 "No callbacks provided",
776 )?;
777
778 let name = Ustr::from(name);
779 let allow_past = allow_past.unwrap_or(true);
780 let fire_immediately = fire_immediately.unwrap_or(false);
781
782 let callback = match callback {
783 Some(callback) => callback,
784 None => self.default_callback.clone().unwrap(),
785 };
786
787 self.callbacks.insert(name, callback.clone());
788
789 let mut start_time_ns = start_time_ns.unwrap_or_default();
790 let ts_now = self.get_time_ns();
791
792 if start_time_ns == 0 {
793 start_time_ns = self.timestamp_ns();
795 } else if start_time_ns < ts_now && !allow_past {
796 anyhow::bail!(
797 "Timer '{name}' start time {} was in the past (current time is {})",
798 start_time_ns.to_rfc3339(),
799 ts_now.to_rfc3339(),
800 );
801 }
802
803 if let Some(stop_time) = stop_time_ns
804 && stop_time <= start_time_ns
805 {
806 anyhow::bail!(
807 "Timer '{name}' stop time {} must be after start time {}",
808 stop_time.to_rfc3339(),
809 start_time_ns.to_rfc3339(),
810 );
811 }
812
813 let interval_ns = create_valid_interval(interval_ns);
814
815 let mut timer = LiveTimer::new(
816 name,
817 interval_ns,
818 start_time_ns,
819 stop_time_ns,
820 callback,
821 fire_immediately,
822 self.sender.clone(),
823 );
824 timer.start();
825
826 self.clear_expired_timers();
827 self.timers.insert(name, timer);
828
829 Ok(())
830 }
831
832 fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
833 self.timers
834 .get(&Ustr::from(name))
835 .map(|timer| timer.next_time_ns())
836 }
837
838 fn cancel_timer(&mut self, name: &str) {
839 let timer = self.timers.remove(&Ustr::from(name));
840 if let Some(mut timer) = timer {
841 timer.cancel();
842 }
843 }
844
845 fn cancel_timers(&mut self) {
846 for timer in &mut self.timers.values_mut() {
847 timer.cancel();
848 }
849
850 self.timers.clear();
851 }
852
853 fn reset(&mut self) {
854 self.timers.clear();
855 self.callbacks.clear();
856 }
857}
858
859#[derive(Debug)]
861pub struct TimeEventStream {
862 heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
863}
864
865impl TimeEventStream {
866 pub const fn new(heap: Arc<Mutex<BinaryHeap<TimeEvent>>>) -> Self {
867 Self { heap }
868 }
869}
870
871impl Stream for TimeEventStream {
872 type Item = TimeEvent;
873
874 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
875 let mut heap = match self.heap.try_lock() {
876 Ok(guard) => guard,
877 Err(e) => {
878 tracing::error!("Unable to get LiveClock heap lock: {e}");
879 cx.waker().wake_by_ref();
880 return Poll::Pending;
881 }
882 };
883
884 if let Some(event) = heap.pop() {
885 Poll::Ready(Some(event))
886 } else {
887 cx.waker().wake_by_ref();
888 Poll::Pending
889 }
890 }
891}
892
893#[cfg(test)]
897mod tests {
898 use std::{cell::RefCell, rc::Rc};
899
900 use rstest::{fixture, rstest};
901
902 use super::*;
903
904 #[derive(Default)]
905 struct TestCallback {
906 called: Rc<RefCell<bool>>,
907 }
908
909 impl TestCallback {
910 const fn new(called: Rc<RefCell<bool>>) -> Self {
911 Self { called }
912 }
913 }
914
915 impl From<TestCallback> for TimeEventCallback {
916 fn from(callback: TestCallback) -> Self {
917 Self::Rust(Rc::new(move |_event: TimeEvent| {
918 *callback.called.borrow_mut() = true;
919 }))
920 }
921 }
922
923 #[fixture]
924 pub fn test_clock() -> TestClock {
925 let mut clock = TestClock::new();
926 clock.register_default_handler(TestCallback::default().into());
927 clock
928 }
929
930 #[rstest]
931 fn test_time_monotonicity(mut test_clock: TestClock) {
932 let initial_time = test_clock.timestamp_ns();
933 test_clock.advance_time(UnixNanos::from(*initial_time + 1000), true);
934 assert!(test_clock.timestamp_ns() > initial_time);
935 }
936
937 #[rstest]
938 fn test_timer_registration(mut test_clock: TestClock) {
939 test_clock
940 .set_time_alert_ns(
941 "test_timer",
942 (*test_clock.timestamp_ns() + 1000).into(),
943 None,
944 None,
945 )
946 .unwrap();
947 assert_eq!(test_clock.timer_count(), 1);
948 assert_eq!(test_clock.timer_names(), vec!["test_timer"]);
949 }
950
951 #[rstest]
952 fn test_timer_expiration(mut test_clock: TestClock) {
953 let alert_time = (*test_clock.timestamp_ns() + 1000).into();
954 test_clock
955 .set_time_alert_ns("test_timer", alert_time, None, None)
956 .unwrap();
957 let events = test_clock.advance_time(alert_time, true);
958 assert_eq!(events.len(), 1);
959 assert_eq!(events[0].name.as_str(), "test_timer");
960 }
961
962 #[rstest]
963 fn test_timer_cancellation(mut test_clock: TestClock) {
964 test_clock
965 .set_time_alert_ns(
966 "test_timer",
967 (*test_clock.timestamp_ns() + 1000).into(),
968 None,
969 None,
970 )
971 .unwrap();
972 assert_eq!(test_clock.timer_count(), 1);
973 test_clock.cancel_timer("test_timer");
974 assert_eq!(test_clock.timer_count(), 0);
975 }
976
977 #[rstest]
978 fn test_time_advancement(mut test_clock: TestClock) {
979 let start_time = test_clock.timestamp_ns();
980 test_clock
981 .set_timer_ns("test_timer", 1000, Some(start_time), None, None, None, None)
982 .unwrap();
983 let events = test_clock.advance_time(UnixNanos::from(*start_time + 2500), true);
984 assert_eq!(events.len(), 2);
985 assert_eq!(*events[0].ts_event, *start_time + 1000);
986 assert_eq!(*events[1].ts_event, *start_time + 2000);
987 }
988
989 #[rstest]
990 fn test_default_and_custom_callbacks() {
991 let mut clock = TestClock::new();
992 let default_called = Rc::new(RefCell::new(false));
993 let custom_called = Rc::new(RefCell::new(false));
994
995 let default_callback = TestCallback::new(Rc::clone(&default_called));
996 let custom_callback = TestCallback::new(Rc::clone(&custom_called));
997
998 clock.register_default_handler(TimeEventCallback::from(default_callback));
999 clock
1000 .set_time_alert_ns(
1001 "default_timer",
1002 (*clock.timestamp_ns() + 1000).into(),
1003 None,
1004 None,
1005 )
1006 .unwrap();
1007 clock
1008 .set_time_alert_ns(
1009 "custom_timer",
1010 (*clock.timestamp_ns() + 1000).into(),
1011 Some(TimeEventCallback::from(custom_callback)),
1012 None,
1013 )
1014 .unwrap();
1015
1016 let events = clock.advance_time(UnixNanos::from(*clock.timestamp_ns() + 1000), true);
1017 let handlers = clock.match_handlers(events);
1018
1019 for handler in handlers {
1020 handler.callback.call(handler.event);
1021 }
1022
1023 assert!(*default_called.borrow());
1024 assert!(*custom_called.borrow());
1025 }
1026
1027 #[rstest]
1028 fn test_multiple_timers(mut test_clock: TestClock) {
1029 let start_time = test_clock.timestamp_ns();
1030 test_clock
1031 .set_timer_ns("timer1", 1000, Some(start_time), None, None, None, None)
1032 .unwrap();
1033 test_clock
1034 .set_timer_ns("timer2", 2000, Some(start_time), None, None, None, None)
1035 .unwrap();
1036 let events = test_clock.advance_time(UnixNanos::from(*start_time + 2000), true);
1037 assert_eq!(events.len(), 3);
1038 assert_eq!(events[0].name.as_str(), "timer1");
1039 assert_eq!(events[1].name.as_str(), "timer1");
1040 assert_eq!(events[2].name.as_str(), "timer2");
1041 }
1042
1043 #[rstest]
1044 fn test_allow_past_parameter_true(mut test_clock: TestClock) {
1045 test_clock.set_time(UnixNanos::from(2000));
1046 let current_time = test_clock.timestamp_ns();
1047 let past_time = UnixNanos::from(current_time.as_u64() - 1000);
1048
1049 test_clock
1051 .set_time_alert_ns("past_timer", past_time, None, Some(true))
1052 .unwrap();
1053
1054 assert_eq!(test_clock.timer_count(), 1);
1056 assert_eq!(test_clock.timer_names(), vec!["past_timer"]);
1057
1058 let next_time = test_clock.next_time_ns("past_timer").unwrap();
1060 assert!(next_time >= current_time);
1061 }
1062
1063 #[rstest]
1064 fn test_allow_past_parameter_false(mut test_clock: TestClock) {
1065 test_clock.set_time(UnixNanos::from(2000));
1066 let current_time = test_clock.timestamp_ns();
1067 let past_time = current_time - 1000;
1068
1069 let result = test_clock.set_time_alert_ns("past_timer", past_time, None, Some(false));
1071
1072 assert!(result.is_err());
1074 assert!(format!("{}", result.unwrap_err()).contains("was in the past"));
1075
1076 assert_eq!(test_clock.timer_count(), 0);
1078 assert!(test_clock.timer_names().is_empty());
1079 }
1080
1081 #[rstest]
1082 fn test_invalid_stop_time_validation(mut test_clock: TestClock) {
1083 test_clock.set_time(UnixNanos::from(2000));
1084 let current_time = test_clock.timestamp_ns();
1085 let start_time = current_time + 1000;
1086 let stop_time = current_time + 500; let result = test_clock.set_timer_ns(
1090 "invalid_timer",
1091 100,
1092 Some(start_time),
1093 Some(stop_time),
1094 None,
1095 None,
1096 None,
1097 );
1098
1099 assert!(result.is_err());
1101 assert!(format!("{}", result.unwrap_err()).contains("must be after start time"));
1102
1103 assert_eq!(test_clock.timer_count(), 0);
1105 }
1106
1107 #[rstest]
1108 fn test_set_timer_ns_fire_immediately_true(mut test_clock: TestClock) {
1109 let start_time = test_clock.timestamp_ns();
1110 let interval_ns = 1000;
1111
1112 test_clock
1113 .set_timer_ns(
1114 "fire_immediately_timer",
1115 interval_ns,
1116 Some(start_time),
1117 None,
1118 None,
1119 None,
1120 Some(true),
1121 )
1122 .unwrap();
1123
1124 let events = test_clock.advance_time(start_time + 2500, true);
1126
1127 assert_eq!(events.len(), 3);
1129 assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *start_time + 1000); assert_eq!(*events[2].ts_event, *start_time + 2000); }
1133
1134 #[rstest]
1135 fn test_set_timer_ns_fire_immediately_false(mut test_clock: TestClock) {
1136 let start_time = test_clock.timestamp_ns();
1137 let interval_ns = 1000;
1138
1139 test_clock
1140 .set_timer_ns(
1141 "normal_timer",
1142 interval_ns,
1143 Some(start_time),
1144 None,
1145 None,
1146 None,
1147 Some(false),
1148 )
1149 .unwrap();
1150
1151 let events = test_clock.advance_time(start_time + 2500, true);
1153
1154 assert_eq!(events.len(), 2);
1156 assert_eq!(*events[0].ts_event, *start_time + 1000); assert_eq!(*events[1].ts_event, *start_time + 2000); }
1159
1160 #[rstest]
1161 fn test_set_timer_ns_fire_immediately_default_is_false(mut test_clock: TestClock) {
1162 let start_time = test_clock.timestamp_ns();
1163 let interval_ns = 1000;
1164
1165 test_clock
1167 .set_timer_ns(
1168 "default_timer",
1169 interval_ns,
1170 Some(start_time),
1171 None,
1172 None,
1173 None,
1174 None,
1175 )
1176 .unwrap();
1177
1178 let events = test_clock.advance_time(start_time + 1500, true);
1179
1180 assert_eq!(events.len(), 1);
1182 assert_eq!(*events[0].ts_event, *start_time + 1000); }
1184
1185 #[rstest]
1186 fn test_set_timer_ns_fire_immediately_with_zero_start_time(mut test_clock: TestClock) {
1187 test_clock.set_time(5000.into());
1188 let interval_ns = 1000;
1189
1190 test_clock
1191 .set_timer_ns(
1192 "zero_start_timer",
1193 interval_ns,
1194 None,
1195 None,
1196 None,
1197 None,
1198 Some(true),
1199 )
1200 .unwrap();
1201
1202 let events = test_clock.advance_time(UnixNanos::from(7000), true);
1203
1204 assert_eq!(events.len(), 3);
1207 assert_eq!(*events[0].ts_event, 5000); assert_eq!(*events[1].ts_event, 6000);
1209 assert_eq!(*events[2].ts_event, 7000);
1210 }
1211
1212 #[rstest]
1213 fn test_multiple_timers_different_fire_immediately_settings(mut test_clock: TestClock) {
1214 let start_time = test_clock.timestamp_ns();
1215 let interval_ns = 1000;
1216
1217 test_clock
1219 .set_timer_ns(
1220 "immediate_timer",
1221 interval_ns,
1222 Some(start_time),
1223 None,
1224 None,
1225 None,
1226 Some(true),
1227 )
1228 .unwrap();
1229
1230 test_clock
1232 .set_timer_ns(
1233 "normal_timer",
1234 interval_ns,
1235 Some(start_time),
1236 None,
1237 None,
1238 None,
1239 Some(false),
1240 )
1241 .unwrap();
1242
1243 let events = test_clock.advance_time(start_time + 1500, true);
1244
1245 assert_eq!(events.len(), 3);
1247
1248 let mut event_times: Vec<u64> = events.iter().map(|e| e.ts_event.as_u64()).collect();
1250 event_times.sort();
1251
1252 assert_eq!(event_times[0], start_time.as_u64()); assert_eq!(event_times[1], start_time.as_u64() + 1000); assert_eq!(event_times[2], start_time.as_u64() + 1000); }
1256
1257 #[rstest]
1258 fn test_timer_name_collision_overwrites(mut test_clock: TestClock) {
1259 let start_time = test_clock.timestamp_ns();
1260
1261 test_clock
1263 .set_timer_ns(
1264 "collision_timer",
1265 1000,
1266 Some(start_time),
1267 None,
1268 None,
1269 None,
1270 None,
1271 )
1272 .unwrap();
1273
1274 let result = test_clock.set_timer_ns(
1276 "collision_timer",
1277 2000,
1278 Some(start_time),
1279 None,
1280 None,
1281 None,
1282 None,
1283 );
1284
1285 assert!(result.is_ok());
1286 assert_eq!(test_clock.timer_count(), 1);
1288
1289 let next_time = test_clock.next_time_ns("collision_timer").unwrap();
1291 assert_eq!(next_time, start_time + 2000);
1293 }
1294
1295 #[rstest]
1296 fn test_timer_zero_interval_error(mut test_clock: TestClock) {
1297 let start_time = test_clock.timestamp_ns();
1298
1299 let result =
1301 test_clock.set_timer_ns("zero_interval", 0, Some(start_time), None, None, None, None);
1302
1303 assert!(result.is_err());
1304 assert_eq!(test_clock.timer_count(), 0);
1305 }
1306
1307 #[rstest]
1308 fn test_timer_empty_name_error(mut test_clock: TestClock) {
1309 let start_time = test_clock.timestamp_ns();
1310
1311 let result = test_clock.set_timer_ns("", 1000, Some(start_time), None, None, None, None);
1313
1314 assert!(result.is_err());
1315 assert_eq!(test_clock.timer_count(), 0);
1316 }
1317
1318 #[rstest]
1319 fn test_timer_fire_immediately_at_exact_stop_time(mut test_clock: TestClock) {
1320 let start_time = test_clock.timestamp_ns();
1321 let interval_ns = 1000;
1322 let stop_time = start_time + interval_ns; test_clock
1325 .set_timer_ns(
1326 "exact_stop",
1327 interval_ns,
1328 Some(start_time),
1329 Some(stop_time),
1330 None,
1331 None,
1332 Some(true),
1333 )
1334 .unwrap();
1335
1336 let events = test_clock.advance_time(stop_time, true);
1337
1338 assert_eq!(events.len(), 2);
1340 assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *stop_time); }
1343
1344 #[rstest]
1345 fn test_timer_advance_to_exact_next_time(mut test_clock: TestClock) {
1346 let start_time = test_clock.timestamp_ns();
1347 let interval_ns = 1000;
1348
1349 test_clock
1350 .set_timer_ns(
1351 "exact_advance",
1352 interval_ns,
1353 Some(start_time),
1354 None,
1355 None,
1356 None,
1357 Some(false),
1358 )
1359 .unwrap();
1360
1361 let next_time = test_clock.next_time_ns("exact_advance").unwrap();
1363 let events = test_clock.advance_time(next_time, true);
1364
1365 assert_eq!(events.len(), 1);
1366 assert_eq!(*events[0].ts_event, *next_time);
1367 }
1368
1369 #[rstest]
1370 fn test_allow_past_bar_aggregation_use_case(mut test_clock: TestClock) {
1371 test_clock.set_time(UnixNanos::from(100_500)); let bar_start_time = UnixNanos::from(100_000); let interval_ns = 1000; let result = test_clock.set_timer_ns(
1381 "bar_timer",
1382 interval_ns,
1383 Some(bar_start_time),
1384 None,
1385 None,
1386 Some(false), Some(false), );
1389
1390 assert!(result.is_ok());
1392 assert_eq!(test_clock.timer_count(), 1);
1393
1394 let next_time = test_clock.next_time_ns("bar_timer").unwrap();
1396 assert_eq!(*next_time, 101_000);
1397 }
1398
1399 #[rstest]
1400 fn test_allow_past_false_rejects_when_next_event_in_past(mut test_clock: TestClock) {
1401 test_clock.set_time(UnixNanos::from(102_000)); let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000; let result = test_clock.set_timer_ns(
1410 "past_event_timer",
1411 interval_ns,
1412 Some(past_start_time),
1413 None,
1414 None,
1415 Some(false), Some(false), );
1418
1419 assert!(result.is_err());
1421 assert!(
1422 result
1423 .unwrap_err()
1424 .to_string()
1425 .contains("would be in the past")
1426 );
1427 }
1428
1429 #[rstest]
1430 fn test_allow_past_false_with_fire_immediately_true(mut test_clock: TestClock) {
1431 test_clock.set_time(UnixNanos::from(100_500)); let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000;
1435
1436 let result = test_clock.set_timer_ns(
1439 "immediate_past_timer",
1440 interval_ns,
1441 Some(past_start_time),
1442 None,
1443 None,
1444 Some(false), Some(true), );
1447
1448 assert!(result.is_err());
1450 assert!(
1451 result
1452 .unwrap_err()
1453 .to_string()
1454 .contains("would be in the past")
1455 );
1456 }
1457
1458 #[rstest]
1459 fn test_cancel_timer_during_execution(mut test_clock: TestClock) {
1460 let start_time = test_clock.timestamp_ns();
1461
1462 test_clock
1463 .set_timer_ns(
1464 "cancel_test",
1465 1000,
1466 Some(start_time),
1467 None,
1468 None,
1469 None,
1470 None,
1471 )
1472 .unwrap();
1473
1474 assert_eq!(test_clock.timer_count(), 1);
1475
1476 test_clock.cancel_timer("cancel_test");
1478
1479 assert_eq!(test_clock.timer_count(), 0);
1480
1481 let events = test_clock.advance_time(start_time + 2000, true);
1483 assert_eq!(events.len(), 0);
1484 }
1485
1486 #[rstest]
1487 fn test_cancel_all_timers(mut test_clock: TestClock) {
1488 test_clock
1490 .set_timer_ns("timer1", 1000, None, None, None, None, None)
1491 .unwrap();
1492 test_clock
1493 .set_timer_ns("timer2", 1500, None, None, None, None, None)
1494 .unwrap();
1495 test_clock
1496 .set_timer_ns("timer3", 2000, None, None, None, None, None)
1497 .unwrap();
1498
1499 assert_eq!(test_clock.timer_count(), 3);
1500
1501 test_clock.cancel_timers();
1503
1504 assert_eq!(test_clock.timer_count(), 0);
1505
1506 let events = test_clock.advance_time(UnixNanos::from(5000), true);
1508 assert_eq!(events.len(), 0);
1509 }
1510
1511 #[rstest]
1512 fn test_clock_reset_clears_timers(mut test_clock: TestClock) {
1513 test_clock
1514 .set_timer_ns("reset_test", 1000, None, None, None, None, None)
1515 .unwrap();
1516
1517 assert_eq!(test_clock.timer_count(), 1);
1518
1519 test_clock.reset();
1521
1522 assert_eq!(test_clock.timer_count(), 0);
1523 assert_eq!(test_clock.timestamp_ns(), UnixNanos::default()); }
1525
1526 #[rstest]
1527 fn test_set_time_alert_default_impl(mut test_clock: TestClock) {
1528 let current_time = test_clock.utc_now();
1529 let alert_time = current_time + chrono::Duration::seconds(1);
1530
1531 test_clock
1533 .set_time_alert("alert_test", alert_time, None, None)
1534 .unwrap();
1535
1536 assert_eq!(test_clock.timer_count(), 1);
1537 assert_eq!(test_clock.timer_names(), vec!["alert_test"]);
1538
1539 let expected_ns = UnixNanos::from(alert_time);
1541 let next_time = test_clock.next_time_ns("alert_test").unwrap();
1542
1543 let diff = if next_time >= expected_ns {
1545 next_time.as_u64() - expected_ns.as_u64()
1546 } else {
1547 expected_ns.as_u64() - next_time.as_u64()
1548 };
1549 assert!(
1550 diff < 1000,
1551 "Timer should be set within 1 microsecond of expected time"
1552 );
1553 }
1554
1555 #[rstest]
1556 fn test_set_timer_default_impl(mut test_clock: TestClock) {
1557 let current_time = test_clock.utc_now();
1558 let start_time = current_time + chrono::Duration::seconds(1);
1559 let interval = Duration::from_millis(500);
1560
1561 test_clock
1563 .set_timer(
1564 "timer_test",
1565 interval,
1566 Some(start_time),
1567 None,
1568 None,
1569 None,
1570 None,
1571 )
1572 .unwrap();
1573
1574 assert_eq!(test_clock.timer_count(), 1);
1575 assert_eq!(test_clock.timer_names(), vec!["timer_test"]);
1576
1577 let start_ns = UnixNanos::from(start_time);
1579 let interval_ns = interval.as_nanos() as u64;
1580
1581 let events = test_clock.advance_time(start_ns + interval_ns * 3, true);
1582 assert_eq!(events.len(), 3); assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1586 assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1587 assert_eq!(*events[2].ts_event, *start_ns + interval_ns * 3);
1588 }
1589
1590 #[rstest]
1591 fn test_set_timer_with_stop_time_default_impl(mut test_clock: TestClock) {
1592 let current_time = test_clock.utc_now();
1593 let start_time = current_time + chrono::Duration::seconds(1);
1594 let stop_time = current_time + chrono::Duration::seconds(3);
1595 let interval = Duration::from_secs(1);
1596
1597 test_clock
1599 .set_timer(
1600 "timer_with_stop",
1601 interval,
1602 Some(start_time),
1603 Some(stop_time),
1604 None,
1605 None,
1606 None,
1607 )
1608 .unwrap();
1609
1610 assert_eq!(test_clock.timer_count(), 1);
1611
1612 let stop_ns = UnixNanos::from(stop_time);
1614 let events = test_clock.advance_time(stop_ns + 1000, true);
1615
1616 assert_eq!(events.len(), 2);
1618
1619 let start_ns = UnixNanos::from(start_time);
1620 let interval_ns = interval.as_nanos() as u64;
1621 assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1622 assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1623 }
1624
1625 #[rstest]
1626 fn test_set_timer_fire_immediately_default_impl(mut test_clock: TestClock) {
1627 let current_time = test_clock.utc_now();
1628 let start_time = current_time + chrono::Duration::seconds(1);
1629 let interval = Duration::from_millis(500);
1630
1631 test_clock
1633 .set_timer(
1634 "immediate_timer",
1635 interval,
1636 Some(start_time),
1637 None,
1638 None,
1639 None,
1640 Some(true),
1641 )
1642 .unwrap();
1643
1644 let start_ns = UnixNanos::from(start_time);
1645 let interval_ns = interval.as_nanos() as u64;
1646
1647 let events = test_clock.advance_time(start_ns + interval_ns, true);
1649
1650 assert_eq!(events.len(), 2);
1652 assert_eq!(*events[0].ts_event, *start_ns); assert_eq!(*events[1].ts_event, *start_ns + interval_ns); }
1655
1656 #[rstest]
1657 fn test_set_time_alert_when_alert_time_equals_current_time(mut test_clock: TestClock) {
1658 let current_time = test_clock.timestamp_ns();
1659
1660 test_clock
1662 .set_time_alert_ns("alert_at_current_time", current_time, None, None)
1663 .unwrap();
1664
1665 assert_eq!(test_clock.timer_count(), 1);
1666
1667 let events = test_clock.advance_time(current_time, true);
1669
1670 assert_eq!(events.len(), 1);
1672 assert_eq!(events[0].name.as_str(), "alert_at_current_time");
1673 assert_eq!(*events[0].ts_event, *current_time);
1674 }
1675}