1use std::{any::Any, collections::BTreeMap, fmt::Debug, ops::Deref, time::Duration};
19
20use ahash::AHashMap;
21use chrono::{DateTime, Utc};
22use nautilus_core::{
23 AtomicTime, UnixNanos,
24 correctness::{check_positive_u64, check_predicate_true, check_valid_string_utf8},
25 formatting::Separable,
26};
27use ustr::Ustr;
28
29use crate::timer::{
30 TestTimer, TimeEvent, TimeEventCallback, TimeEventHandler, create_valid_interval,
31};
32
33pub trait Clock: Debug + Any {
39 fn utc_now(&self) -> DateTime<Utc> {
41 DateTime::from_timestamp_nanos(self.timestamp_ns().as_i64())
42 }
43
44 fn timestamp_ns(&self) -> UnixNanos;
46
47 fn timestamp_us(&self) -> u64;
49
50 fn timestamp_ms(&self) -> u64;
52
53 fn timestamp(&self) -> f64;
55
56 fn timer_names(&self) -> Vec<&str>;
58
59 fn timer_count(&self) -> usize;
61
62 fn timer_exists(&self, name: &Ustr) -> bool;
64
65 fn register_default_handler(&mut self, callback: TimeEventCallback);
68
69 fn get_handler(&self, event: TimeEvent) -> TimeEventHandler;
73
74 #[allow(clippy::too_many_arguments)]
88 fn set_time_alert(
89 &mut self,
90 name: &str,
91 alert_time: DateTime<Utc>,
92 callback: Option<TimeEventCallback>,
93 allow_past: Option<bool>,
94 ) -> anyhow::Result<()> {
95 self.set_time_alert_ns(name, alert_time.into(), callback, allow_past)
96 }
97
98 #[allow(clippy::too_many_arguments)]
119 fn set_time_alert_ns(
120 &mut self,
121 name: &str,
122 alert_time_ns: UnixNanos,
123 callback: Option<TimeEventCallback>,
124 allow_past: Option<bool>,
125 ) -> anyhow::Result<()>;
126
127 #[allow(clippy::too_many_arguments)]
143 fn set_timer(
144 &mut self,
145 name: &str,
146 interval: Duration,
147 start_time: Option<DateTime<Utc>>,
148 stop_time: Option<DateTime<Utc>>,
149 callback: Option<TimeEventCallback>,
150 allow_past: Option<bool>,
151 fire_immediately: Option<bool>,
152 ) -> anyhow::Result<()> {
153 self.set_timer_ns(
154 name,
155 interval.as_nanos() as u64,
156 start_time.map(UnixNanos::from),
157 stop_time.map(UnixNanos::from),
158 callback,
159 allow_past,
160 fire_immediately,
161 )
162 }
163
164 #[allow(clippy::too_many_arguments)]
192 fn set_timer_ns(
193 &mut self,
194 name: &str,
195 interval_ns: u64,
196 start_time_ns: Option<UnixNanos>,
197 stop_time_ns: Option<UnixNanos>,
198 callback: Option<TimeEventCallback>,
199 allow_past: Option<bool>,
200 fire_immediately: Option<bool>,
201 ) -> anyhow::Result<()>;
202
203 fn next_time_ns(&self, name: &str) -> Option<UnixNanos>;
207
208 fn cancel_timer(&mut self, name: &str);
210
211 fn cancel_timers(&mut self);
213
214 fn reset(&mut self);
216}
217
218impl dyn Clock {
219 pub fn as_any(&self) -> &dyn std::any::Any {
221 self
222 }
223 pub fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
225 self
226 }
227}
228
229#[derive(Debug, Default)]
234pub struct CallbackRegistry {
235 default_callback: Option<TimeEventCallback>,
236 callbacks: AHashMap<Ustr, TimeEventCallback>,
237}
238
239impl CallbackRegistry {
240 #[must_use]
242 pub fn new() -> Self {
243 Self {
244 default_callback: None,
245 callbacks: AHashMap::new(),
246 }
247 }
248
249 pub fn register_default_handler(&mut self, callback: TimeEventCallback) {
251 self.default_callback = Some(callback);
252 }
253
254 pub fn register_callback(&mut self, name: Ustr, callback: TimeEventCallback) {
256 self.callbacks.insert(name, callback);
257 }
258
259 #[must_use]
261 pub fn has_any_callback(&self, name: &Ustr) -> bool {
262 self.callbacks.contains_key(name) || self.default_callback.is_some()
263 }
264
265 #[must_use]
267 pub fn get_callback(&self, name: &Ustr) -> Option<TimeEventCallback> {
268 self.callbacks
269 .get(name)
270 .cloned()
271 .or_else(|| self.default_callback.clone())
272 }
273
274 #[must_use]
280 pub fn get_handler(&self, event: TimeEvent) -> TimeEventHandler {
281 let callback = self
282 .get_callback(&event.name)
283 .unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
284
285 TimeEventHandler::new(event, callback)
286 }
287
288 pub fn clear(&mut self) {
290 self.callbacks.clear();
291 }
292}
293
294pub fn validate_and_prepare_time_alert(
302 name: &str,
303 mut alert_time_ns: UnixNanos,
304 allow_past: Option<bool>,
305 ts_now: UnixNanos,
306) -> anyhow::Result<(Ustr, UnixNanos)> {
307 check_valid_string_utf8(name, stringify!(name))?;
308
309 let name = Ustr::from(name);
310 let allow_past = allow_past.unwrap_or(true);
311
312 if alert_time_ns < ts_now {
313 if allow_past {
314 alert_time_ns = ts_now;
315 log::warn!(
316 "Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
317 alert_time_ns.to_rfc3339(),
318 );
319 } else {
320 anyhow::bail!(
321 "Timer '{name}' alert time {} was in the past (current time is {ts_now})",
322 alert_time_ns.to_rfc3339(),
323 );
324 }
325 }
326
327 Ok((name, alert_time_ns))
328}
329
330pub fn validate_and_prepare_timer(
339 name: &str,
340 interval_ns: u64,
341 start_time_ns: Option<UnixNanos>,
342 stop_time_ns: Option<UnixNanos>,
343 allow_past: Option<bool>,
344 fire_immediately: Option<bool>,
345 ts_now: UnixNanos,
346) -> anyhow::Result<(Ustr, UnixNanos, Option<UnixNanos>, bool, bool)> {
347 check_valid_string_utf8(name, stringify!(name))?;
348 check_positive_u64(interval_ns, stringify!(interval_ns))?;
349
350 let name = Ustr::from(name);
351 let allow_past = allow_past.unwrap_or(true);
352 let fire_immediately = fire_immediately.unwrap_or(false);
353
354 let mut start_time_ns = start_time_ns.unwrap_or_default();
355
356 if start_time_ns == 0 {
357 start_time_ns = ts_now;
359 } else if !allow_past {
360 let next_event_time = if fire_immediately {
361 start_time_ns
362 } else {
363 start_time_ns + interval_ns
364 };
365
366 if next_event_time < ts_now {
367 anyhow::bail!(
368 "Timer '{name}' next event time {} would be in the past (current time is {ts_now})",
369 next_event_time.to_rfc3339(),
370 );
371 }
372 }
373
374 if let Some(stop_time) = stop_time_ns {
375 if stop_time <= start_time_ns {
376 anyhow::bail!(
377 "Timer '{name}' stop time {} must be after start time {}",
378 stop_time.to_rfc3339(),
379 start_time_ns.to_rfc3339(),
380 );
381 }
382 if !allow_past && stop_time <= ts_now {
383 anyhow::bail!(
384 "Timer '{name}' stop time {} is in the past (current time is {ts_now})",
385 stop_time.to_rfc3339(),
386 );
387 }
388 }
389
390 Ok((
391 name,
392 start_time_ns,
393 stop_time_ns,
394 allow_past,
395 fire_immediately,
396 ))
397}
398
399#[derive(Debug)]
407pub struct TestClock {
408 time: AtomicTime,
409 timers: BTreeMap<Ustr, TestTimer>,
411 callbacks: CallbackRegistry,
412}
413
414impl TestClock {
415 #[must_use]
417 pub fn new() -> Self {
418 Self {
419 time: AtomicTime::new(false, UnixNanos::default()),
420 timers: BTreeMap::new(),
421 callbacks: CallbackRegistry::new(),
422 }
423 }
424
425 #[must_use]
427 pub const fn get_timers(&self) -> &BTreeMap<Ustr, TestTimer> {
428 &self.timers
429 }
430
431 pub fn advance_time(&mut self, to_time_ns: UnixNanos, set_time: bool) -> Vec<TimeEvent> {
448 const WARN_TIME_EVENTS_THRESHOLD: usize = 1_000_000;
449
450 let from_time_ns = self.time.get_time_ns();
451
452 assert!(
453 to_time_ns >= from_time_ns,
454 "Invariant violated: time must be non-decreasing, `to_time_ns` {to_time_ns} < `from_time_ns` {from_time_ns}"
455 );
456
457 if set_time {
458 self.time.set_time(to_time_ns);
459 }
460
461 let mut events: Vec<TimeEvent> = Vec::new();
463 self.timers.retain(|_, timer| {
464 timer.advance(to_time_ns).for_each(|event| {
465 events.push(event);
466 });
467
468 !timer.is_expired()
469 });
470
471 if events.len() >= WARN_TIME_EVENTS_THRESHOLD {
472 log::warn!(
473 "Allocated {} time events during clock advancement from {} to {}, \
474 consider stopping the timer between large time ranges with no data points",
475 events.len().separate_with_commas(),
476 from_time_ns,
477 to_time_ns
478 );
479 }
480
481 events.sort_by(|a, b| a.ts_event.cmp(&b.ts_event));
482 events
483 }
484
485 #[must_use]
495 pub fn match_handlers(&self, events: Vec<TimeEvent>) -> Vec<TimeEventHandler> {
496 events
497 .into_iter()
498 .map(|event| self.callbacks.get_handler(event))
499 .collect()
500 }
501
502 fn replace_existing_timer_if_needed(&mut self, name: &Ustr) {
503 if self.timer_exists(name) {
504 self.cancel_timer(name.as_str());
505 log::warn!("Timer '{name}' replaced");
506 }
507 }
508}
509
510impl Default for TestClock {
511 fn default() -> Self {
513 Self::new()
514 }
515}
516
517impl Deref for TestClock {
518 type Target = AtomicTime;
519
520 fn deref(&self) -> &Self::Target {
521 &self.time
522 }
523}
524
525impl Clock for TestClock {
526 fn timestamp_ns(&self) -> UnixNanos {
527 self.time.get_time_ns()
528 }
529
530 fn timestamp_us(&self) -> u64 {
531 self.time.get_time_us()
532 }
533
534 fn timestamp_ms(&self) -> u64 {
535 self.time.get_time_ms()
536 }
537
538 fn timestamp(&self) -> f64 {
539 self.time.get_time()
540 }
541
542 fn timer_names(&self) -> Vec<&str> {
543 self.timers
544 .iter()
545 .filter(|(_, timer)| !timer.is_expired())
546 .map(|(k, _)| k.as_str())
547 .collect()
548 }
549
550 fn timer_count(&self) -> usize {
551 self.timers
552 .iter()
553 .filter(|(_, timer)| !timer.is_expired())
554 .count()
555 }
556
557 fn timer_exists(&self, name: &Ustr) -> bool {
558 self.timers.contains_key(name)
559 }
560
561 fn register_default_handler(&mut self, callback: TimeEventCallback) {
562 self.callbacks.register_default_handler(callback);
563 }
564
565 fn get_handler(&self, event: TimeEvent) -> TimeEventHandler {
571 self.callbacks.get_handler(event)
572 }
573
574 fn set_time_alert_ns(
575 &mut self,
576 name: &str,
577 alert_time_ns: UnixNanos,
578 callback: Option<TimeEventCallback>,
579 allow_past: Option<bool>,
580 ) -> anyhow::Result<()> {
581 let ts_now = self.get_time_ns();
582 let (name, alert_time_ns) =
583 validate_and_prepare_time_alert(name, alert_time_ns, allow_past, ts_now)?;
584
585 self.replace_existing_timer_if_needed(&name);
586
587 check_predicate_true(
588 callback.is_some() | self.callbacks.has_any_callback(&name),
589 "No callbacks provided",
590 )?;
591
592 if let Some(callback) = callback {
593 self.callbacks.register_callback(name, callback);
594 }
595
596 let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
598 let fire_immediately = alert_time_ns == ts_now;
599
600 let timer = TestTimer::new(
601 name,
602 interval_ns,
603 ts_now,
604 Some(alert_time_ns),
605 fire_immediately,
606 );
607 self.timers.insert(name, timer);
608
609 Ok(())
610 }
611
612 fn set_timer_ns(
613 &mut self,
614 name: &str,
615 interval_ns: u64,
616 start_time_ns: Option<UnixNanos>,
617 stop_time_ns: Option<UnixNanos>,
618 callback: Option<TimeEventCallback>,
619 allow_past: Option<bool>,
620 fire_immediately: Option<bool>,
621 ) -> anyhow::Result<()> {
622 let ts_now = self.get_time_ns();
623 let (name, start_time_ns, stop_time_ns, _allow_past, fire_immediately) =
624 validate_and_prepare_timer(
625 name,
626 interval_ns,
627 start_time_ns,
628 stop_time_ns,
629 allow_past,
630 fire_immediately,
631 ts_now,
632 )?;
633
634 check_predicate_true(
635 callback.is_some() | self.callbacks.has_any_callback(&name),
636 "No callbacks provided",
637 )?;
638
639 self.replace_existing_timer_if_needed(&name);
640
641 if let Some(callback) = callback {
642 self.callbacks.register_callback(name, callback);
643 }
644
645 let interval_ns = create_valid_interval(interval_ns);
646
647 let timer = TestTimer::new(
648 name,
649 interval_ns,
650 start_time_ns,
651 stop_time_ns,
652 fire_immediately,
653 );
654 self.timers.insert(name, timer);
655
656 Ok(())
657 }
658
659 fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
660 self.timers
661 .get(&Ustr::from(name))
662 .map(|timer| timer.next_time_ns())
663 }
664
665 fn cancel_timer(&mut self, name: &str) {
666 let timer = self.timers.remove(&Ustr::from(name));
667 if let Some(mut timer) = timer {
668 timer.cancel();
669 }
670 }
671
672 fn cancel_timers(&mut self) {
673 for timer in &mut self.timers.values_mut() {
674 timer.cancel();
675 }
676
677 self.timers.clear();
678 }
679
680 fn reset(&mut self) {
681 self.time = AtomicTime::new(false, UnixNanos::default());
682 self.timers = BTreeMap::new();
683 self.callbacks.clear();
684 }
685}
686
687#[cfg(test)]
688mod tests {
689 use std::{
690 sync::{Arc, Mutex},
691 time::Duration,
692 };
693
694 use nautilus_core::{MUTEX_POISONED, UnixNanos};
695 use rstest::{fixture, rstest};
696 use ustr::Ustr;
697
698 use super::*;
699 use crate::timer::{TimeEvent, TimeEventCallback};
700
701 #[derive(Debug, Default)]
702 struct TestCallback {
703 called: Arc<Mutex<bool>>,
705 }
706
707 impl TestCallback {
708 fn new(called: Arc<Mutex<bool>>) -> Self {
709 Self { called }
710 }
711 }
712
713 impl From<TestCallback> for TimeEventCallback {
714 fn from(callback: TestCallback) -> Self {
715 Self::from(move |_event: TimeEvent| {
716 if let Ok(mut called) = callback.called.lock() {
717 *called = true;
718 }
719 })
720 }
721 }
722
723 #[fixture]
724 pub fn test_clock() -> TestClock {
725 let mut clock = TestClock::new();
726 clock.register_default_handler(TestCallback::default().into());
727 clock
728 }
729
730 #[rstest]
731 fn test_time_monotonicity(mut test_clock: TestClock) {
732 let initial_time = test_clock.timestamp_ns();
733 test_clock.advance_time(UnixNanos::from(*initial_time + 1000), true);
734 assert!(test_clock.timestamp_ns() > initial_time);
735 }
736
737 #[rstest]
738 fn test_timer_registration(mut test_clock: TestClock) {
739 test_clock
740 .set_time_alert_ns(
741 "test_timer",
742 (*test_clock.timestamp_ns() + 1000).into(),
743 None,
744 None,
745 )
746 .unwrap();
747 assert_eq!(test_clock.timer_count(), 1);
748 assert_eq!(test_clock.timer_names(), vec!["test_timer"]);
749 }
750
751 #[rstest]
752 fn test_timer_expiration(mut test_clock: TestClock) {
753 let alert_time = (*test_clock.timestamp_ns() + 1000).into();
754 test_clock
755 .set_time_alert_ns("test_timer", alert_time, None, None)
756 .unwrap();
757 let events = test_clock.advance_time(alert_time, true);
758 assert_eq!(events.len(), 1);
759 assert_eq!(events[0].name.as_str(), "test_timer");
760 }
761
762 #[rstest]
763 fn test_timer_cancellation(mut test_clock: TestClock) {
764 test_clock
765 .set_time_alert_ns(
766 "test_timer",
767 (*test_clock.timestamp_ns() + 1000).into(),
768 None,
769 None,
770 )
771 .unwrap();
772 assert_eq!(test_clock.timer_count(), 1);
773 test_clock.cancel_timer("test_timer");
774 assert_eq!(test_clock.timer_count(), 0);
775 }
776
777 #[rstest]
778 fn test_time_advancement(mut test_clock: TestClock) {
779 let start_time = test_clock.timestamp_ns();
780 test_clock
781 .set_timer_ns("test_timer", 1000, Some(start_time), None, None, None, None)
782 .unwrap();
783 let events = test_clock.advance_time(UnixNanos::from(*start_time + 2500), true);
784 assert_eq!(events.len(), 2);
785 assert_eq!(*events[0].ts_event, *start_time + 1000);
786 assert_eq!(*events[1].ts_event, *start_time + 2000);
787 }
788
789 #[rstest]
790 fn test_default_and_custom_callbacks() {
791 let mut clock = TestClock::new();
792 let default_called = Arc::new(Mutex::new(false));
793 let custom_called = Arc::new(Mutex::new(false));
794
795 let default_callback = TestCallback::new(Arc::clone(&default_called));
796 let custom_callback = TestCallback::new(Arc::clone(&custom_called));
797
798 clock.register_default_handler(TimeEventCallback::from(default_callback));
799 clock
800 .set_time_alert_ns(
801 "default_timer",
802 (*clock.timestamp_ns() + 1000).into(),
803 None,
804 None,
805 )
806 .unwrap();
807 clock
808 .set_time_alert_ns(
809 "custom_timer",
810 (*clock.timestamp_ns() + 1000).into(),
811 Some(TimeEventCallback::from(custom_callback)),
812 None,
813 )
814 .unwrap();
815
816 let events = clock.advance_time(UnixNanos::from(*clock.timestamp_ns() + 1000), true);
817 let handlers = clock.match_handlers(events);
818
819 for handler in handlers {
820 handler.callback.call(handler.event);
821 }
822
823 assert!(*default_called.lock().expect(MUTEX_POISONED));
824 assert!(*custom_called.lock().expect(MUTEX_POISONED));
825 }
826
827 #[rstest]
828 fn test_timer_with_rust_local_callback() {
829 use std::{cell::RefCell, rc::Rc};
830
831 let mut clock = TestClock::new();
832 let call_count = Rc::new(RefCell::new(0_u32));
833 let call_count_clone = Rc::clone(&call_count);
834
835 let callback: Rc<dyn Fn(TimeEvent)> = Rc::new(move |_event: TimeEvent| {
837 *call_count_clone.borrow_mut() += 1;
838 });
839
840 clock
841 .set_time_alert_ns(
842 "local_timer",
843 (*clock.timestamp_ns() + 1000).into(),
844 Some(TimeEventCallback::from(callback)),
845 None,
846 )
847 .unwrap();
848
849 let events = clock.advance_time(UnixNanos::from(*clock.timestamp_ns() + 1000), true);
850 let handlers = clock.match_handlers(events);
851
852 for handler in handlers {
853 handler.callback.call(handler.event);
854 }
855
856 assert_eq!(*call_count.borrow(), 1);
857 }
858
859 #[rstest]
860 fn test_multiple_timers(mut test_clock: TestClock) {
861 let start_time = test_clock.timestamp_ns();
862 test_clock
863 .set_timer_ns("timer1", 1000, Some(start_time), None, None, None, None)
864 .unwrap();
865 test_clock
866 .set_timer_ns("timer2", 2000, Some(start_time), None, None, None, None)
867 .unwrap();
868 let events = test_clock.advance_time(UnixNanos::from(*start_time + 2000), true);
869 assert_eq!(events.len(), 3);
870 assert_eq!(events[0].name.as_str(), "timer1");
871 assert_eq!(events[1].name.as_str(), "timer1");
872 assert_eq!(events[2].name.as_str(), "timer2");
873 }
874
875 #[rstest]
876 fn test_allow_past_parameter_true(mut test_clock: TestClock) {
877 test_clock.set_time(UnixNanos::from(2000));
878 let current_time = test_clock.timestamp_ns();
879 let past_time = UnixNanos::from(current_time.as_u64() - 1000);
880
881 test_clock
883 .set_time_alert_ns("past_timer", past_time, None, Some(true))
884 .unwrap();
885
886 assert_eq!(test_clock.timer_count(), 1);
888 assert_eq!(test_clock.timer_names(), vec!["past_timer"]);
889
890 let next_time = test_clock.next_time_ns("past_timer").unwrap();
892 assert!(next_time >= current_time);
893 }
894
895 #[rstest]
896 fn test_allow_past_parameter_false(mut test_clock: TestClock) {
897 test_clock.set_time(UnixNanos::from(2000));
898 let current_time = test_clock.timestamp_ns();
899 let past_time = current_time - 1000;
900
901 let result = test_clock.set_time_alert_ns("past_timer", past_time, None, Some(false));
903
904 assert!(result.is_err());
906 assert!(format!("{}", result.unwrap_err()).contains("was in the past"));
907
908 assert_eq!(test_clock.timer_count(), 0);
910 assert!(test_clock.timer_names().is_empty());
911 }
912
913 #[rstest]
914 fn test_invalid_stop_time_validation(mut test_clock: TestClock) {
915 test_clock.set_time(UnixNanos::from(2000));
916 let current_time = test_clock.timestamp_ns();
917 let start_time = current_time + 1000;
918 let stop_time = current_time + 500; let result = test_clock.set_timer_ns(
922 "invalid_timer",
923 100,
924 Some(start_time),
925 Some(stop_time),
926 None,
927 None,
928 None,
929 );
930
931 assert!(result.is_err());
933 assert!(format!("{}", result.unwrap_err()).contains("must be after start time"));
934
935 assert_eq!(test_clock.timer_count(), 0);
937 }
938
939 #[rstest]
940 fn test_set_timer_ns_fire_immediately_true(mut test_clock: TestClock) {
941 let start_time = test_clock.timestamp_ns();
942 let interval_ns = 1000;
943
944 test_clock
945 .set_timer_ns(
946 "fire_immediately_timer",
947 interval_ns,
948 Some(start_time),
949 None,
950 None,
951 None,
952 Some(true),
953 )
954 .unwrap();
955
956 let events = test_clock.advance_time(start_time + 2500, true);
958
959 assert_eq!(events.len(), 3);
961 assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *start_time + 1000); assert_eq!(*events[2].ts_event, *start_time + 2000); }
965
966 #[rstest]
967 fn test_set_timer_ns_fire_immediately_false(mut test_clock: TestClock) {
968 let start_time = test_clock.timestamp_ns();
969 let interval_ns = 1000;
970
971 test_clock
972 .set_timer_ns(
973 "normal_timer",
974 interval_ns,
975 Some(start_time),
976 None,
977 None,
978 None,
979 Some(false),
980 )
981 .unwrap();
982
983 let events = test_clock.advance_time(start_time + 2500, true);
985
986 assert_eq!(events.len(), 2);
988 assert_eq!(*events[0].ts_event, *start_time + 1000); assert_eq!(*events[1].ts_event, *start_time + 2000); }
991
992 #[rstest]
993 fn test_set_timer_ns_fire_immediately_default_is_false(mut test_clock: TestClock) {
994 let start_time = test_clock.timestamp_ns();
995 let interval_ns = 1000;
996
997 test_clock
999 .set_timer_ns(
1000 "default_timer",
1001 interval_ns,
1002 Some(start_time),
1003 None,
1004 None,
1005 None,
1006 None,
1007 )
1008 .unwrap();
1009
1010 let events = test_clock.advance_time(start_time + 1500, true);
1011
1012 assert_eq!(events.len(), 1);
1014 assert_eq!(*events[0].ts_event, *start_time + 1000); }
1016
1017 #[rstest]
1018 fn test_set_timer_ns_fire_immediately_with_zero_start_time(mut test_clock: TestClock) {
1019 test_clock.set_time(5000.into());
1020 let interval_ns = 1000;
1021
1022 test_clock
1023 .set_timer_ns(
1024 "zero_start_timer",
1025 interval_ns,
1026 None,
1027 None,
1028 None,
1029 None,
1030 Some(true),
1031 )
1032 .unwrap();
1033
1034 let events = test_clock.advance_time(UnixNanos::from(7000), true);
1035
1036 assert_eq!(events.len(), 3);
1039 assert_eq!(*events[0].ts_event, 5000); assert_eq!(*events[1].ts_event, 6000);
1041 assert_eq!(*events[2].ts_event, 7000);
1042 }
1043
1044 #[rstest]
1045 fn test_multiple_timers_different_fire_immediately_settings(mut test_clock: TestClock) {
1046 let start_time = test_clock.timestamp_ns();
1047 let interval_ns = 1000;
1048
1049 test_clock
1051 .set_timer_ns(
1052 "immediate_timer",
1053 interval_ns,
1054 Some(start_time),
1055 None,
1056 None,
1057 None,
1058 Some(true),
1059 )
1060 .unwrap();
1061
1062 test_clock
1064 .set_timer_ns(
1065 "normal_timer",
1066 interval_ns,
1067 Some(start_time),
1068 None,
1069 None,
1070 None,
1071 Some(false),
1072 )
1073 .unwrap();
1074
1075 let events = test_clock.advance_time(start_time + 1500, true);
1076
1077 assert_eq!(events.len(), 3);
1079
1080 let mut event_times: Vec<u64> = events.iter().map(|e| e.ts_event.as_u64()).collect();
1082 event_times.sort_unstable();
1083
1084 assert_eq!(event_times[0], start_time.as_u64()); assert_eq!(event_times[1], start_time.as_u64() + 1000); assert_eq!(event_times[2], start_time.as_u64() + 1000); }
1088
1089 #[rstest]
1090 fn test_timer_name_collision_overwrites(mut test_clock: TestClock) {
1091 let start_time = test_clock.timestamp_ns();
1092
1093 test_clock
1095 .set_timer_ns(
1096 "collision_timer",
1097 1000,
1098 Some(start_time),
1099 None,
1100 None,
1101 None,
1102 None,
1103 )
1104 .unwrap();
1105
1106 let result = test_clock.set_timer_ns(
1108 "collision_timer",
1109 2000,
1110 Some(start_time),
1111 None,
1112 None,
1113 None,
1114 None,
1115 );
1116
1117 assert!(result.is_ok());
1118 assert_eq!(test_clock.timer_count(), 1);
1120
1121 let next_time = test_clock.next_time_ns("collision_timer").unwrap();
1123 assert_eq!(next_time, start_time + 2000);
1125 }
1126
1127 #[rstest]
1128 fn test_timer_zero_interval_error(mut test_clock: TestClock) {
1129 let start_time = test_clock.timestamp_ns();
1130
1131 let result =
1133 test_clock.set_timer_ns("zero_interval", 0, Some(start_time), None, None, None, None);
1134
1135 assert!(result.is_err());
1136 assert_eq!(test_clock.timer_count(), 0);
1137 }
1138
1139 #[rstest]
1140 fn test_timer_empty_name_error(mut test_clock: TestClock) {
1141 let start_time = test_clock.timestamp_ns();
1142
1143 let result = test_clock.set_timer_ns("", 1000, Some(start_time), None, None, None, None);
1145
1146 assert!(result.is_err());
1147 assert_eq!(test_clock.timer_count(), 0);
1148 }
1149
1150 #[rstest]
1151 fn test_timer_exists(mut test_clock: TestClock) {
1152 let name = Ustr::from("exists_timer");
1153 assert!(!test_clock.timer_exists(&name));
1154
1155 test_clock
1156 .set_time_alert_ns(
1157 name.as_str(),
1158 (*test_clock.timestamp_ns() + 1_000).into(),
1159 None,
1160 None,
1161 )
1162 .unwrap();
1163
1164 assert!(test_clock.timer_exists(&name));
1165 }
1166
1167 #[rstest]
1168 fn test_timer_rejects_past_stop_time_when_not_allowed(mut test_clock: TestClock) {
1169 test_clock.set_time(UnixNanos::from(10_000));
1170 let current = test_clock.timestamp_ns();
1171
1172 let result = test_clock.set_timer_ns(
1173 "past_stop",
1174 10_000,
1175 Some(current - 500),
1176 Some(current - 100),
1177 None,
1178 Some(false),
1179 None,
1180 );
1181
1182 let err = result.expect_err("expected stop time validation error");
1183 let err_msg = err.to_string();
1184 assert!(err_msg.contains("stop time"));
1185 assert!(err_msg.contains("in the past"));
1186 }
1187
1188 #[rstest]
1189 fn test_timer_accepts_future_stop_time(mut test_clock: TestClock) {
1190 let current = test_clock.timestamp_ns();
1191
1192 let result = test_clock.set_timer_ns(
1193 "future_stop",
1194 1_000,
1195 Some(current),
1196 Some(current + 10_000),
1197 None,
1198 Some(false),
1199 None,
1200 );
1201
1202 assert!(result.is_ok());
1203 }
1204
1205 #[rstest]
1206 fn test_timer_fire_immediately_at_exact_stop_time(mut test_clock: TestClock) {
1207 let start_time = test_clock.timestamp_ns();
1208 let interval_ns = 1000;
1209 let stop_time = start_time + interval_ns; test_clock
1212 .set_timer_ns(
1213 "exact_stop",
1214 interval_ns,
1215 Some(start_time),
1216 Some(stop_time),
1217 None,
1218 None,
1219 Some(true),
1220 )
1221 .unwrap();
1222
1223 let events = test_clock.advance_time(stop_time, true);
1224
1225 assert_eq!(events.len(), 2);
1227 assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *stop_time); }
1230
1231 #[rstest]
1232 fn test_timer_advance_to_exact_next_time(mut test_clock: TestClock) {
1233 let start_time = test_clock.timestamp_ns();
1234 let interval_ns = 1000;
1235
1236 test_clock
1237 .set_timer_ns(
1238 "exact_advance",
1239 interval_ns,
1240 Some(start_time),
1241 None,
1242 None,
1243 None,
1244 Some(false),
1245 )
1246 .unwrap();
1247
1248 let next_time = test_clock.next_time_ns("exact_advance").unwrap();
1250 let events = test_clock.advance_time(next_time, true);
1251
1252 assert_eq!(events.len(), 1);
1253 assert_eq!(*events[0].ts_event, *next_time);
1254 }
1255
1256 #[rstest]
1257 fn test_allow_past_bar_aggregation_use_case(mut test_clock: TestClock) {
1258 test_clock.set_time(UnixNanos::from(100_500)); let bar_start_time = UnixNanos::from(100_000); let interval_ns = 1000; let result = test_clock.set_timer_ns(
1268 "bar_timer",
1269 interval_ns,
1270 Some(bar_start_time),
1271 None,
1272 None,
1273 Some(false), Some(false), );
1276
1277 assert!(result.is_ok());
1279 assert_eq!(test_clock.timer_count(), 1);
1280
1281 let next_time = test_clock.next_time_ns("bar_timer").unwrap();
1283 assert_eq!(*next_time, 101_000);
1284 }
1285
1286 #[rstest]
1287 fn test_allow_past_false_rejects_when_next_event_in_past(mut test_clock: TestClock) {
1288 test_clock.set_time(UnixNanos::from(102_000)); let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000; let result = test_clock.set_timer_ns(
1297 "past_event_timer",
1298 interval_ns,
1299 Some(past_start_time),
1300 None,
1301 None,
1302 Some(false), Some(false), );
1305
1306 assert!(result.is_err());
1308 assert!(
1309 result
1310 .unwrap_err()
1311 .to_string()
1312 .contains("would be in the past")
1313 );
1314 }
1315
1316 #[rstest]
1317 fn test_allow_past_false_with_fire_immediately_true(mut test_clock: TestClock) {
1318 test_clock.set_time(UnixNanos::from(100_500)); let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000;
1322
1323 let result = test_clock.set_timer_ns(
1326 "immediate_past_timer",
1327 interval_ns,
1328 Some(past_start_time),
1329 None,
1330 None,
1331 Some(false), Some(true), );
1334
1335 assert!(result.is_err());
1337 assert!(
1338 result
1339 .unwrap_err()
1340 .to_string()
1341 .contains("would be in the past")
1342 );
1343 }
1344
1345 #[rstest]
1346 fn test_cancel_timer_during_execution(mut test_clock: TestClock) {
1347 let start_time = test_clock.timestamp_ns();
1348
1349 test_clock
1350 .set_timer_ns(
1351 "cancel_test",
1352 1000,
1353 Some(start_time),
1354 None,
1355 None,
1356 None,
1357 None,
1358 )
1359 .unwrap();
1360
1361 assert_eq!(test_clock.timer_count(), 1);
1362
1363 test_clock.cancel_timer("cancel_test");
1365
1366 assert_eq!(test_clock.timer_count(), 0);
1367
1368 let events = test_clock.advance_time(start_time + 2000, true);
1370 assert_eq!(events.len(), 0);
1371 }
1372
1373 #[rstest]
1374 fn test_cancel_all_timers(mut test_clock: TestClock) {
1375 test_clock
1377 .set_timer_ns("timer1", 1000, None, None, None, None, None)
1378 .unwrap();
1379 test_clock
1380 .set_timer_ns("timer2", 1500, None, None, None, None, None)
1381 .unwrap();
1382 test_clock
1383 .set_timer_ns("timer3", 2000, None, None, None, None, None)
1384 .unwrap();
1385
1386 assert_eq!(test_clock.timer_count(), 3);
1387
1388 test_clock.cancel_timers();
1390
1391 assert_eq!(test_clock.timer_count(), 0);
1392
1393 let events = test_clock.advance_time(UnixNanos::from(5000), true);
1395 assert_eq!(events.len(), 0);
1396 }
1397
1398 #[rstest]
1399 fn test_clock_reset_clears_timers(mut test_clock: TestClock) {
1400 test_clock
1401 .set_timer_ns("reset_test", 1000, None, None, None, None, None)
1402 .unwrap();
1403
1404 assert_eq!(test_clock.timer_count(), 1);
1405
1406 test_clock.reset();
1408
1409 assert_eq!(test_clock.timer_count(), 0);
1410 assert_eq!(test_clock.timestamp_ns(), UnixNanos::default()); }
1412
1413 #[rstest]
1414 fn test_set_time_alert_default_impl(mut test_clock: TestClock) {
1415 let current_time = test_clock.utc_now();
1416 let alert_time = current_time + chrono::Duration::seconds(1);
1417
1418 test_clock
1420 .set_time_alert("alert_test", alert_time, None, None)
1421 .unwrap();
1422
1423 assert_eq!(test_clock.timer_count(), 1);
1424 assert_eq!(test_clock.timer_names(), vec!["alert_test"]);
1425
1426 let expected_ns = UnixNanos::from(alert_time);
1428 let next_time = test_clock.next_time_ns("alert_test").unwrap();
1429
1430 let diff = if next_time >= expected_ns {
1432 next_time.as_u64() - expected_ns.as_u64()
1433 } else {
1434 expected_ns.as_u64() - next_time.as_u64()
1435 };
1436 assert!(
1437 diff < 1000,
1438 "Timer should be set within 1 microsecond of expected time"
1439 );
1440 }
1441
1442 #[rstest]
1443 fn test_set_timer_default_impl(mut test_clock: TestClock) {
1444 let current_time = test_clock.utc_now();
1445 let start_time = current_time + chrono::Duration::seconds(1);
1446 let interval = Duration::from_millis(500);
1447
1448 test_clock
1450 .set_timer(
1451 "timer_test",
1452 interval,
1453 Some(start_time),
1454 None,
1455 None,
1456 None,
1457 None,
1458 )
1459 .unwrap();
1460
1461 assert_eq!(test_clock.timer_count(), 1);
1462 assert_eq!(test_clock.timer_names(), vec!["timer_test"]);
1463
1464 let start_ns = UnixNanos::from(start_time);
1466 let interval_ns = interval.as_nanos() as u64;
1467
1468 let events = test_clock.advance_time(start_ns + interval_ns * 3, true);
1469 assert_eq!(events.len(), 3); assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1473 assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1474 assert_eq!(*events[2].ts_event, *start_ns + interval_ns * 3);
1475 }
1476
1477 #[rstest]
1478 fn test_set_timer_with_stop_time_default_impl(mut test_clock: TestClock) {
1479 let current_time = test_clock.utc_now();
1480 let start_time = current_time + chrono::Duration::seconds(1);
1481 let stop_time = current_time + chrono::Duration::seconds(3);
1482 let interval = Duration::from_secs(1);
1483
1484 test_clock
1486 .set_timer(
1487 "timer_with_stop",
1488 interval,
1489 Some(start_time),
1490 Some(stop_time),
1491 None,
1492 None,
1493 None,
1494 )
1495 .unwrap();
1496
1497 assert_eq!(test_clock.timer_count(), 1);
1498
1499 let stop_ns = UnixNanos::from(stop_time);
1501 let events = test_clock.advance_time(stop_ns + 1000, true);
1502
1503 assert_eq!(events.len(), 2);
1505
1506 let start_ns = UnixNanos::from(start_time);
1507 let interval_ns = interval.as_nanos() as u64;
1508 assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1509 assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1510 }
1511
1512 #[rstest]
1513 fn test_set_timer_fire_immediately_default_impl(mut test_clock: TestClock) {
1514 let current_time = test_clock.utc_now();
1515 let start_time = current_time + chrono::Duration::seconds(1);
1516 let interval = Duration::from_millis(500);
1517
1518 test_clock
1520 .set_timer(
1521 "immediate_timer",
1522 interval,
1523 Some(start_time),
1524 None,
1525 None,
1526 None,
1527 Some(true),
1528 )
1529 .unwrap();
1530
1531 let start_ns = UnixNanos::from(start_time);
1532 let interval_ns = interval.as_nanos() as u64;
1533
1534 let events = test_clock.advance_time(start_ns + interval_ns, true);
1536
1537 assert_eq!(events.len(), 2);
1539 assert_eq!(*events[0].ts_event, *start_ns); assert_eq!(*events[1].ts_event, *start_ns + interval_ns); }
1542
1543 #[rstest]
1544 fn test_set_time_alert_when_alert_time_equals_current_time(mut test_clock: TestClock) {
1545 let current_time = test_clock.timestamp_ns();
1546
1547 test_clock
1549 .set_time_alert_ns("alert_at_current_time", current_time, None, None)
1550 .unwrap();
1551
1552 assert_eq!(test_clock.timer_count(), 1);
1553
1554 let events = test_clock.advance_time(current_time, true);
1556
1557 assert_eq!(events.len(), 1);
1559 assert_eq!(events[0].name.as_str(), "alert_at_current_time");
1560 assert_eq!(*events[0].ts_event, *current_time);
1561 }
1562
1563 #[rstest]
1564 fn test_cancel_and_reschedule_same_name(mut test_clock: TestClock) {
1565 let start = test_clock.timestamp_ns();
1566
1567 test_clock
1568 .set_time_alert_ns("timer", UnixNanos::from(*start + 1000), None, None)
1569 .unwrap();
1570 assert_eq!(test_clock.timer_count(), 1);
1571
1572 test_clock.cancel_timer("timer");
1573 assert_eq!(test_clock.timer_count(), 0);
1574
1575 test_clock
1576 .set_time_alert_ns("timer", UnixNanos::from(*start + 2000), None, None)
1577 .unwrap();
1578 assert_eq!(test_clock.timer_count(), 1);
1579
1580 let events = test_clock.advance_time(UnixNanos::from(*start + 1500), true);
1581 assert!(events.is_empty());
1582
1583 let events = test_clock.advance_time(UnixNanos::from(*start + 2000), true);
1584 assert_eq!(events.len(), 1);
1585 assert_eq!(*events[0].ts_event, *start + 2000);
1586 }
1587
1588 #[rstest]
1589 fn test_multiple_timers_same_timestamp_all_fire(mut test_clock: TestClock) {
1590 let fire_time = UnixNanos::from(*test_clock.timestamp_ns() + 1000);
1591
1592 for i in 0..5 {
1593 test_clock
1594 .set_time_alert_ns(&format!("timer_{i}"), fire_time, None, None)
1595 .unwrap();
1596 }
1597 assert_eq!(test_clock.timer_count(), 5);
1598
1599 let events = test_clock.advance_time(fire_time, true);
1600 assert_eq!(events.len(), 5);
1601 for event in &events {
1602 assert_eq!(*event.ts_event, *fire_time);
1603 }
1604 }
1605
1606 #[rstest]
1607 fn test_events_ordered_by_timestamp_after_advance() {
1608 let mut clock = TestClock::new();
1609 clock.register_default_handler(TestCallback::default().into());
1610 let start = clock.timestamp_ns();
1611
1612 clock
1613 .set_time_alert_ns("third", UnixNanos::from(*start + 300), None, None)
1614 .unwrap();
1615 clock
1616 .set_time_alert_ns("first", UnixNanos::from(*start + 100), None, None)
1617 .unwrap();
1618 clock
1619 .set_time_alert_ns("second", UnixNanos::from(*start + 200), None, None)
1620 .unwrap();
1621
1622 let events = clock.advance_time(UnixNanos::from(*start + 400), true);
1623 assert_eq!(events.len(), 3);
1624 assert_eq!(events[0].name.as_str(), "first");
1625 assert_eq!(events[1].name.as_str(), "second");
1626 assert_eq!(events[2].name.as_str(), "third");
1627 }
1628
1629 #[rstest]
1630 fn test_large_interval_does_not_overflow(mut test_clock: TestClock) {
1631 let start = test_clock.timestamp_ns();
1632 let large_interval: u64 = 1_000_000_000 * 60 * 60 * 24 * 365; test_clock
1635 .set_timer_ns(
1636 "large_interval",
1637 large_interval,
1638 Some(start),
1639 None,
1640 None,
1641 None,
1642 None,
1643 )
1644 .unwrap();
1645
1646 let events = test_clock.advance_time(UnixNanos::from(*start + large_interval), true);
1647 assert_eq!(events.len(), 1);
1648 assert_eq!(*events[0].ts_event, *start + large_interval);
1649 }
1650
1651 #[rstest]
1652 fn test_near_zero_interval_fires_correctly(mut test_clock: TestClock) {
1653 let start = test_clock.timestamp_ns();
1654
1655 test_clock
1656 .set_timer_ns("tiny", 1, Some(start), None, None, None, None)
1657 .unwrap();
1658
1659 let events = test_clock.advance_time(UnixNanos::from(*start + 10), true);
1660 assert_eq!(events.len(), 10);
1661
1662 for i in 1..events.len() {
1663 assert!(events[i].ts_event >= events[i - 1].ts_event);
1664 }
1665 }
1666
1667 #[rstest]
1668 fn test_repeated_advance_to_same_time_no_double_fire(mut test_clock: TestClock) {
1669 let fire_time = UnixNanos::from(*test_clock.timestamp_ns() + 1000);
1670
1671 test_clock
1672 .set_time_alert_ns("once", fire_time, None, None)
1673 .unwrap();
1674
1675 let events1 = test_clock.advance_time(fire_time, true);
1676 assert_eq!(events1.len(), 1);
1677
1678 let events2 = test_clock.advance_time(fire_time, true);
1679 assert!(events2.is_empty());
1680 }
1681
1682 #[rstest]
1683 fn test_advance_with_no_timers(mut test_clock: TestClock) {
1684 let start = test_clock.timestamp_ns();
1685
1686 let events = test_clock.advance_time(UnixNanos::from(*start + 1000), true);
1687 assert!(events.is_empty());
1688 assert_eq!(*test_clock.timestamp_ns(), *start + 1000);
1689 }
1690}