1use std::{
19 cmp::Ordering,
20 fmt::{Debug, Display},
21 num::NonZeroU64,
22 rc::Rc,
23};
24
25use nautilus_core::{
26 UUID4, UnixNanos,
27 correctness::{FAILED, check_valid_string_utf8},
28};
29#[cfg(feature = "python")]
30use pyo3::{Py, PyAny, Python};
31use ustr::Ustr;
32
33#[must_use]
39pub fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
40 NonZeroU64::new(std::cmp::max(interval_ns, 1)).expect("`interval_ns` must be positive")
41}
42
43#[repr(C)]
44#[derive(Clone, Debug, PartialEq, Eq)]
45#[cfg_attr(
46 feature = "python",
47 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
48)]
49pub struct TimeEvent {
54 pub name: Ustr,
56 pub event_id: UUID4,
58 pub ts_event: UnixNanos,
60 pub ts_init: UnixNanos,
62}
63
64impl TimeEvent {
65 #[must_use]
71 pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
72 Self {
73 name,
74 event_id,
75 ts_event,
76 ts_init,
77 }
78 }
79}
80
81impl Display for TimeEvent {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 write!(
84 f,
85 "{}(name={}, event_id={}, ts_event={}, ts_init={})",
86 stringify!(TimeEvent),
87 self.name,
88 self.event_id,
89 self.ts_event,
90 self.ts_init
91 )
92 }
93}
94
95#[repr(transparent)] #[derive(Clone, Debug, PartialEq, Eq)]
102pub struct ScheduledTimeEvent(pub TimeEvent);
103
104impl ScheduledTimeEvent {
105 #[must_use]
107 pub const fn new(event: TimeEvent) -> Self {
108 Self(event)
109 }
110
111 #[must_use]
113 pub fn into_inner(self) -> TimeEvent {
114 self.0
115 }
116}
117
118impl PartialOrd for ScheduledTimeEvent {
119 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
120 Some(self.cmp(other))
121 }
122}
123
124impl Ord for ScheduledTimeEvent {
125 fn cmp(&self, other: &Self) -> Ordering {
126 other.0.ts_event.cmp(&self.0.ts_event)
128 }
129}
130
131pub enum TimeEventCallback {
133 #[cfg(feature = "python")]
134 Python(Py<PyAny>),
135 Rust(Rc<dyn Fn(TimeEvent)>),
136}
137
138impl Clone for TimeEventCallback {
139 fn clone(&self) -> Self {
140 match self {
141 #[cfg(feature = "python")]
142 Self::Python(obj) => Self::Python(nautilus_core::python::clone_py_object(obj)),
143 Self::Rust(cb) => Self::Rust(cb.clone()),
144 }
145 }
146}
147
148impl Debug for TimeEventCallback {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 match self {
151 #[cfg(feature = "python")]
152 Self::Python(_) => f.write_str("Python callback"),
153 Self::Rust(_) => f.write_str("Rust callback"),
154 }
155 }
156}
157
158impl TimeEventCallback {
159 #[must_use]
164 pub const fn is_rust(&self) -> bool {
165 matches!(self, Self::Rust(_))
166 }
167
168 pub fn call(&self, event: TimeEvent) {
174 match self {
175 #[cfg(feature = "python")]
176 Self::Python(callback) => {
177 Python::attach(|py| {
178 callback.call1(py, (event,)).unwrap();
179 });
180 }
181 Self::Rust(callback) => callback(event),
182 }
183 }
184}
185
186impl<F> From<F> for TimeEventCallback
187where
188 F: Fn(TimeEvent) + 'static,
189{
190 fn from(value: F) -> Self {
191 Self::Rust(Rc::new(value))
192 }
193}
194
195impl From<Rc<dyn Fn(TimeEvent)>> for TimeEventCallback {
196 fn from(value: Rc<dyn Fn(TimeEvent)>) -> Self {
197 Self::Rust(value)
198 }
199}
200
201#[cfg(feature = "python")]
202impl From<Py<PyAny>> for TimeEventCallback {
203 fn from(value: Py<PyAny>) -> Self {
204 Self::Python(value)
205 }
206}
207
208#[allow(unsafe_code)]
218unsafe impl Send for TimeEventCallback {}
219#[allow(unsafe_code)]
220unsafe impl Sync for TimeEventCallback {}
221
222#[repr(C)]
223#[derive(Clone, Debug)]
224pub struct TimeEventHandlerV2 {
229 pub event: TimeEvent,
231 pub callback: TimeEventCallback,
233}
234
235impl TimeEventHandlerV2 {
236 #[must_use]
238 pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
239 Self { event, callback }
240 }
241
242 pub fn run(self) {
248 let Self { event, callback } = self;
249 callback.call(event);
250 }
251}
252
253impl PartialOrd for TimeEventHandlerV2 {
254 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
255 Some(self.cmp(other))
256 }
257}
258
259impl PartialEq for TimeEventHandlerV2 {
260 fn eq(&self, other: &Self) -> bool {
261 self.event.ts_event == other.event.ts_event
262 }
263}
264
265impl Eq for TimeEventHandlerV2 {}
266
267impl Ord for TimeEventHandlerV2 {
268 fn cmp(&self, other: &Self) -> Ordering {
269 self.event.ts_event.cmp(&other.event.ts_event)
270 }
271}
272
273#[derive(Clone, Copy, Debug)]
282pub struct TestTimer {
283 pub name: Ustr,
285 pub interval_ns: NonZeroU64,
287 pub start_time_ns: UnixNanos,
289 pub stop_time_ns: Option<UnixNanos>,
291 pub fire_immediately: bool,
293 next_time_ns: UnixNanos,
294 is_expired: bool,
295}
296
297impl TestTimer {
298 #[must_use]
304 pub fn new(
305 name: Ustr,
306 interval_ns: NonZeroU64,
307 start_time_ns: UnixNanos,
308 stop_time_ns: Option<UnixNanos>,
309 fire_immediately: bool,
310 ) -> Self {
311 check_valid_string_utf8(name, stringify!(name)).expect(FAILED);
312
313 let next_time_ns = if fire_immediately {
314 start_time_ns
315 } else {
316 start_time_ns + interval_ns.get()
317 };
318
319 Self {
320 name,
321 interval_ns,
322 start_time_ns,
323 stop_time_ns,
324 fire_immediately,
325 next_time_ns,
326 is_expired: false,
327 }
328 }
329
330 #[must_use]
332 pub const fn next_time_ns(&self) -> UnixNanos {
333 self.next_time_ns
334 }
335
336 #[must_use]
338 pub const fn is_expired(&self) -> bool {
339 self.is_expired
340 }
341
342 #[must_use]
343 pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
344 TimeEvent {
345 name: self.name,
346 event_id,
347 ts_event: self.next_time_ns,
348 ts_init,
349 }
350 }
351
352 pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
358 let advances = if self.next_time_ns <= to_time_ns {
360 (to_time_ns.as_u64() - self.next_time_ns.as_u64()) / self.interval_ns.get() + 1
361 } else {
362 0
363 };
364 self.take(advances as usize).map(|(event, _)| event)
365 }
366
367 pub const fn cancel(&mut self) {
371 self.is_expired = true;
372 }
373}
374
375impl Iterator for TestTimer {
376 type Item = (TimeEvent, UnixNanos);
377
378 fn next(&mut self) -> Option<Self::Item> {
379 if self.is_expired {
380 None
381 } else {
382 if let Some(stop_time_ns) = self.stop_time_ns
384 && self.next_time_ns > stop_time_ns
385 {
386 self.is_expired = true;
387 return None;
388 }
389
390 let item = (
391 TimeEvent {
392 name: self.name,
393 event_id: UUID4::new(),
394 ts_event: self.next_time_ns,
395 ts_init: self.next_time_ns,
396 },
397 self.next_time_ns,
398 );
399
400 if let Some(stop_time_ns) = self.stop_time_ns
402 && self.next_time_ns == stop_time_ns
403 {
404 self.is_expired = true;
405 }
406
407 self.next_time_ns += self.interval_ns;
408
409 Some(item)
410 }
411 }
412}
413
414#[cfg(test)]
415mod tests {
416 use std::num::NonZeroU64;
417
418 use nautilus_core::UnixNanos;
419 use rstest::*;
420 use ustr::Ustr;
421
422 use super::{TestTimer, TimeEvent};
423
424 #[rstest]
425 fn test_test_timer_pop_event() {
426 let mut timer = TestTimer::new(
427 Ustr::from("TEST_TIMER"),
428 NonZeroU64::new(1).unwrap(),
429 UnixNanos::from(1),
430 None,
431 false,
432 );
433
434 assert!(timer.next().is_some());
435 assert!(timer.next().is_some());
436 timer.is_expired = true;
437 assert!(timer.next().is_none());
438 }
439
440 #[rstest]
441 fn test_test_timer_advance_within_next_time_ns() {
442 let mut timer = TestTimer::new(
443 Ustr::from("TEST_TIMER"),
444 NonZeroU64::new(5).unwrap(),
445 UnixNanos::default(),
446 None,
447 false,
448 );
449 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
450 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
451 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
452 assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
453 assert_eq!(timer.next_time_ns, 5);
454 assert!(!timer.is_expired);
455 }
456
457 #[rstest]
458 fn test_test_timer_advance_up_to_next_time_ns() {
459 let mut timer = TestTimer::new(
460 Ustr::from("TEST_TIMER"),
461 NonZeroU64::new(1).unwrap(),
462 UnixNanos::default(),
463 None,
464 false,
465 );
466 assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
467 assert!(!timer.is_expired);
468 }
469
470 #[rstest]
471 fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
472 let mut timer = TestTimer::new(
473 Ustr::from("TEST_TIMER"),
474 NonZeroU64::new(1).unwrap(),
475 UnixNanos::default(),
476 Some(UnixNanos::from(2)),
477 false,
478 );
479 assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
480 assert!(timer.is_expired);
481 }
482
483 #[rstest]
484 fn test_test_timer_advance_beyond_next_time_ns() {
485 let mut timer = TestTimer::new(
486 Ustr::from("TEST_TIMER"),
487 NonZeroU64::new(1).unwrap(),
488 UnixNanos::default(),
489 Some(UnixNanos::from(5)),
490 false,
491 );
492 assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
493 assert!(timer.is_expired);
494 }
495
496 #[rstest]
497 fn test_test_timer_advance_beyond_stop_time() {
498 let mut timer = TestTimer::new(
499 Ustr::from("TEST_TIMER"),
500 NonZeroU64::new(1).unwrap(),
501 UnixNanos::default(),
502 Some(UnixNanos::from(5)),
503 false,
504 );
505 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
506 assert!(timer.is_expired);
507 }
508
509 #[rstest]
510 fn test_test_timer_advance_exact_boundary() {
511 let mut timer = TestTimer::new(
512 Ustr::from("TEST_TIMER"),
513 NonZeroU64::new(5).unwrap(),
514 UnixNanos::from(0),
515 None,
516 false,
517 );
518 assert_eq!(
519 timer.advance(UnixNanos::from(5)).count(),
520 1,
521 "Expected one event at the 5 ns boundary"
522 );
523 assert_eq!(
524 timer.advance(UnixNanos::from(10)).count(),
525 1,
526 "Expected one event at the 10 ns boundary"
527 );
528 }
529
530 #[rstest]
531 fn test_test_timer_fire_immediately_true() {
532 let mut timer = TestTimer::new(
533 Ustr::from("TEST_TIMER"),
534 NonZeroU64::new(5).unwrap(),
535 UnixNanos::from(10),
536 None,
537 true, );
539
540 assert_eq!(timer.next_time_ns(), UnixNanos::from(10));
542
543 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
545 assert_eq!(events.len(), 1);
546 assert_eq!(events[0].ts_event, UnixNanos::from(10));
547
548 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
550 }
551
552 #[rstest]
553 fn test_test_timer_fire_immediately_false() {
554 let mut timer = TestTimer::new(
555 Ustr::from("TEST_TIMER"),
556 NonZeroU64::new(5).unwrap(),
557 UnixNanos::from(10),
558 None,
559 false, );
561
562 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
564
565 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 0);
567
568 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(15)).collect();
570 assert_eq!(events.len(), 1);
571 assert_eq!(events[0].ts_event, UnixNanos::from(15));
572 }
573
574 use proptest::prelude::*;
579
580 #[derive(Clone, Debug)]
581 enum TimerOperation {
582 AdvanceTime(u64),
583 Cancel,
584 }
585
586 fn timer_operation_strategy() -> impl Strategy<Value = TimerOperation> {
587 prop_oneof![
588 8 => prop::num::u64::ANY.prop_map(|v| TimerOperation::AdvanceTime(v % 1000 + 1)),
589 2 => Just(TimerOperation::Cancel),
590 ]
591 }
592
593 fn timer_config_strategy() -> impl Strategy<Value = (u64, u64, Option<u64>, bool)> {
594 (
595 1u64..=100u64, 0u64..=50u64, prop::option::of(51u64..=200u64), prop::bool::ANY, )
600 }
601
602 fn timer_test_strategy()
603 -> impl Strategy<Value = (Vec<TimerOperation>, (u64, u64, Option<u64>, bool))> {
604 (
605 prop::collection::vec(timer_operation_strategy(), 5..=50),
606 timer_config_strategy(),
607 )
608 }
609
610 #[allow(clippy::needless_collect)] fn test_timer_with_operations(
612 operations: Vec<TimerOperation>,
613 (interval_ns, start_time_ns, stop_time_ns, fire_immediately): (u64, u64, Option<u64>, bool),
614 ) {
615 let mut timer = TestTimer::new(
616 Ustr::from("PROP_TEST_TIMER"),
617 NonZeroU64::new(interval_ns).unwrap(),
618 UnixNanos::from(start_time_ns),
619 stop_time_ns.map(UnixNanos::from),
620 fire_immediately,
621 );
622
623 let mut current_time = start_time_ns;
624
625 for operation in operations {
626 if timer.is_expired() {
627 break;
628 }
629
630 match operation {
631 TimerOperation::AdvanceTime(delta) => {
632 let to_time = current_time + delta;
633 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(to_time)).collect();
634 current_time = to_time;
635
636 for (i, event) in events.iter().enumerate() {
638 if i > 0 {
640 assert!(
641 event.ts_event >= events[i - 1].ts_event,
642 "Events should be in chronological order"
643 );
644 }
645
646 assert!(
648 event.ts_event.as_u64() >= start_time_ns,
649 "Event timestamp should not be before start time"
650 );
651
652 assert!(
653 event.ts_event.as_u64() <= to_time,
654 "Event timestamp should not be after advance time"
655 );
656
657 if let Some(stop_time_ns) = stop_time_ns {
659 assert!(
660 event.ts_event.as_u64() <= stop_time_ns,
661 "Event timestamp should not exceed stop time"
662 );
663 }
664 }
665 }
666 TimerOperation::Cancel => {
667 timer.cancel();
668 assert!(timer.is_expired(), "Timer should be expired after cancel");
669 }
670 }
671
672 if !timer.is_expired() {
674 let expected_interval_multiple = if fire_immediately {
676 timer.next_time_ns().as_u64() >= start_time_ns
677 } else {
678 timer.next_time_ns().as_u64() >= start_time_ns + interval_ns
679 };
680 assert!(
681 expected_interval_multiple,
682 "Next time should respect interval spacing"
683 );
684
685 if let Some(stop_time_ns) = stop_time_ns
688 && timer.next_time_ns().as_u64() > stop_time_ns
689 {
690 let mut test_timer = timer;
692 let events: Vec<TimeEvent> = test_timer
693 .advance(UnixNanos::from(stop_time_ns + 1))
694 .collect();
695 assert!(
696 events.is_empty() || test_timer.is_expired(),
697 "Timer should not generate events beyond stop time"
698 );
699 }
700 }
701 }
702
703 if !timer.is_expired()
706 && let Some(stop_time_ns) = stop_time_ns
707 {
708 let events: Vec<TimeEvent> = timer
709 .advance(UnixNanos::from(stop_time_ns + 1000))
710 .collect();
711 assert!(
712 timer.is_expired() || events.is_empty(),
713 "Timer should eventually expire or stop generating events"
714 );
715 }
716 }
717
718 proptest! {
719 #[rstest]
720 fn prop_timer_advance_operations((operations, config) in timer_test_strategy()) {
721 test_timer_with_operations(operations, config);
722 }
723
724 #[rstest]
725 fn prop_timer_interval_consistency(
726 interval_ns in 1u64..=100u64,
727 start_time_ns in 0u64..=50u64,
728 fire_immediately in prop::bool::ANY,
729 advance_count in 1usize..=20usize,
730 ) {
731 let mut timer = TestTimer::new(
732 Ustr::from("CONSISTENCY_TEST"),
733 NonZeroU64::new(interval_ns).unwrap(),
734 UnixNanos::from(start_time_ns),
735 None, fire_immediately,
737 );
738
739 let mut previous_event_time = if fire_immediately { start_time_ns } else { start_time_ns + interval_ns };
740
741 for _ in 0..advance_count {
742 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(previous_event_time)).collect();
743
744 if !events.is_empty() {
745 prop_assert_eq!(events.len(), 1);
747 prop_assert_eq!(events[0].ts_event.as_u64(), previous_event_time);
748 }
749
750 previous_event_time += interval_ns;
751 }
752 }
753 }
754}