1#[rustfmt::skip]
19#[cfg(feature = "clock_v2")]
20use std::collections::BinaryHeap;
21
22#[rustfmt::skip]
23#[cfg(feature = "clock_v2")]
24use tokio::sync::Mutex;
25
26use std::{
27 cmp::Ordering,
28 fmt::{Debug, Display},
29 num::NonZeroU64,
30 rc::Rc,
31 sync::{
32 Arc,
33 atomic::{self, AtomicU64},
34 },
35};
36
37use nautilus_core::{
38 UUID4, UnixNanos,
39 correctness::{FAILED, check_valid_string},
40 datetime::floor_to_nearest_microsecond,
41 time::get_atomic_clock_realtime,
42};
43#[cfg(feature = "python")]
44use pyo3::{PyObject, Python};
45use tokio::{
46 task::JoinHandle,
47 time::{Duration, Instant},
48};
49use ustr::Ustr;
50
51use crate::runtime::get_runtime;
52
53#[must_use]
59pub const fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
60 NonZeroU64::new(interval_ns).expect("`interval_ns` must be positive")
61}
62
63#[repr(C)]
64#[derive(Clone, Debug)]
65#[cfg_attr(
66 feature = "python",
67 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
68)]
69#[derive(Eq)]
74pub struct TimeEvent {
75 pub name: Ustr,
77 pub event_id: UUID4,
79 pub ts_event: UnixNanos,
81 pub ts_init: UnixNanos,
83}
84
85impl PartialOrd for TimeEvent {
87 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
88 Some(self.cmp(other))
89 }
90}
91
92impl Ord for TimeEvent {
94 fn cmp(&self, other: &Self) -> Ordering {
95 other.ts_event.cmp(&self.ts_event)
96 }
97}
98
99impl TimeEvent {
100 #[must_use]
106 pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
107 Self {
108 name,
109 event_id,
110 ts_event,
111 ts_init,
112 }
113 }
114}
115
116impl Display for TimeEvent {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 write!(
119 f,
120 "TimeEvent(name={}, event_id={}, ts_event={}, ts_init={})",
121 self.name, self.event_id, self.ts_event, self.ts_init
122 )
123 }
124}
125
126impl PartialEq for TimeEvent {
127 fn eq(&self, other: &Self) -> bool {
128 self.event_id == other.event_id
129 }
130}
131
132pub type RustTimeEventCallback = dyn Fn(TimeEvent);
133
134#[derive(Clone)]
135pub enum TimeEventCallback {
136 #[cfg(feature = "python")]
137 Python(Arc<PyObject>),
138 Rust(Rc<RustTimeEventCallback>),
139}
140
141impl Debug for TimeEventCallback {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 match self {
144 #[cfg(feature = "python")]
145 Self::Python(_) => f.write_str("Python callback"),
146 Self::Rust(_) => f.write_str("Rust callback"),
147 }
148 }
149}
150
151impl TimeEventCallback {
152 pub fn call(&self, event: TimeEvent) {
153 match self {
154 #[cfg(feature = "python")]
155 Self::Python(callback) => {
156 Python::with_gil(|py| {
157 callback.call1(py, (event,)).unwrap();
158 });
159 }
160 Self::Rust(callback) => callback(event),
161 }
162 }
163}
164
165impl From<Rc<RustTimeEventCallback>> for TimeEventCallback {
166 fn from(value: Rc<RustTimeEventCallback>) -> Self {
167 Self::Rust(value)
168 }
169}
170
171#[cfg(feature = "python")]
172impl From<PyObject> for TimeEventCallback {
173 fn from(value: PyObject) -> Self {
174 Self::Python(Arc::new(value))
175 }
176}
177
178unsafe impl Send for TimeEventCallback {}
180unsafe impl Sync for TimeEventCallback {}
181
182#[repr(C)]
183#[derive(Clone, Debug)]
184pub struct TimeEventHandlerV2 {
189 pub event: TimeEvent,
191 pub callback: TimeEventCallback,
193}
194
195impl TimeEventHandlerV2 {
196 #[must_use]
198 pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
199 Self { event, callback }
200 }
201
202 pub fn run(self) {
203 let Self { event, callback } = self;
204 callback.call(event);
205 }
206}
207
208impl PartialOrd for TimeEventHandlerV2 {
209 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
210 Some(self.cmp(other))
211 }
212}
213
214impl PartialEq for TimeEventHandlerV2 {
215 fn eq(&self, other: &Self) -> bool {
216 self.event.ts_event == other.event.ts_event
217 }
218}
219
220impl Eq for TimeEventHandlerV2 {}
221
222impl Ord for TimeEventHandlerV2 {
223 fn cmp(&self, other: &Self) -> Ordering {
224 self.event.ts_event.cmp(&other.event.ts_event)
225 }
226}
227
228#[derive(Clone, Copy, Debug)]
233pub struct TestTimer {
234 pub name: Ustr,
236 pub interval_ns: NonZeroU64,
238 pub start_time_ns: UnixNanos,
240 pub stop_time_ns: Option<UnixNanos>,
242 next_time_ns: UnixNanos,
243 is_expired: bool,
244}
245
246impl TestTimer {
247 #[must_use]
253 pub fn new(
254 name: Ustr,
255 interval_ns: NonZeroU64,
256 start_time_ns: UnixNanos,
257 stop_time_ns: Option<UnixNanos>,
258 ) -> Self {
259 check_valid_string(name, stringify!(name)).expect(FAILED);
260
261 Self {
262 name,
263 interval_ns,
264 start_time_ns,
265 stop_time_ns,
266 next_time_ns: start_time_ns + interval_ns.get(),
267 is_expired: false,
268 }
269 }
270
271 #[must_use]
273 pub const fn next_time_ns(&self) -> UnixNanos {
274 self.next_time_ns
275 }
276
277 #[must_use]
279 pub const fn is_expired(&self) -> bool {
280 self.is_expired
281 }
282
283 #[must_use]
284 pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
285 TimeEvent {
286 name: self.name,
287 event_id,
288 ts_event: self.next_time_ns,
289 ts_init,
290 }
291 }
292
293 pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
299 let advances = to_time_ns
300 .saturating_sub(self.next_time_ns.as_u64() - self.interval_ns.get())
301 / self.interval_ns.get();
302 self.take(advances as usize).map(|(event, _)| event)
303 }
304
305 pub const fn cancel(&mut self) {
309 self.is_expired = true;
310 }
311}
312
313impl Iterator for TestTimer {
314 type Item = (TimeEvent, UnixNanos);
315
316 fn next(&mut self) -> Option<Self::Item> {
317 if self.is_expired {
318 None
319 } else {
320 let item = (
321 TimeEvent {
322 name: self.name,
323 event_id: UUID4::new(),
324 ts_event: self.next_time_ns,
325 ts_init: self.next_time_ns,
326 },
327 self.next_time_ns,
328 );
329
330 if let Some(stop_time_ns) = self.stop_time_ns {
332 if self.next_time_ns >= stop_time_ns {
333 self.is_expired = true;
334 }
335 }
336
337 self.next_time_ns += self.interval_ns;
338
339 Some(item)
340 }
341 }
342}
343
344#[derive(Debug)]
349pub struct LiveTimer {
350 pub name: Ustr,
352 pub interval_ns: NonZeroU64,
354 pub start_time_ns: UnixNanos,
356 pub stop_time_ns: Option<UnixNanos>,
358 next_time_ns: Arc<AtomicU64>,
359 callback: TimeEventCallback,
360 task_handle: Option<JoinHandle<()>>,
361 #[cfg(feature = "clock_v2")]
362 heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
363}
364
365impl LiveTimer {
366 #[must_use]
372 #[cfg(not(feature = "clock_v2"))]
373 pub fn new(
374 name: Ustr,
375 interval_ns: NonZeroU64,
376 start_time_ns: UnixNanos,
377 stop_time_ns: Option<UnixNanos>,
378 callback: TimeEventCallback,
379 ) -> Self {
380 check_valid_string(name, stringify!(name)).expect(FAILED);
381
382 log::debug!("Creating timer '{name}'");
383 Self {
384 name,
385 interval_ns,
386 start_time_ns,
387 stop_time_ns,
388 next_time_ns: Arc::new(AtomicU64::new(start_time_ns.as_u64() + interval_ns.get())),
389 callback,
390 task_handle: None,
391 }
392 }
393
394 #[must_use]
400 #[cfg(feature = "clock_v2")]
401 pub fn new(
402 name: Ustr,
403 interval_ns: NonZeroU64,
404 start_time_ns: UnixNanos,
405 stop_time_ns: Option<UnixNanos>,
406 callback: TimeEventCallback,
407 heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
408 ) -> Self {
409 check_valid_string(name, stringify!(name)).expect(FAILED);
410
411 log::debug!("Creating timer '{name}'");
412 Self {
413 name,
414 interval_ns,
415 start_time_ns,
416 stop_time_ns,
417 next_time_ns: Arc::new(AtomicU64::new(start_time_ns.as_u64() + interval_ns.get())),
418 callback,
419 heap,
420 task_handle: None,
421 }
422 }
423
424 #[must_use]
428 pub fn next_time_ns(&self) -> UnixNanos {
429 UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
430 }
431
432 #[must_use]
437 pub fn is_expired(&self) -> bool {
438 self.task_handle
439 .as_ref()
440 .is_some_and(tokio::task::JoinHandle::is_finished)
441 }
442
443 #[allow(unused_variables)] pub fn start(&mut self) {
449 let event_name = self.name;
450 let stop_time_ns = self.stop_time_ns;
451 let interval_ns = self.interval_ns.get();
452 let callback = self.callback.clone();
453
454 let clock = get_atomic_clock_realtime();
456 let now_ns = clock.get_time_ns();
457
458 let mut next_time_ns = self.next_time_ns.load(atomic::Ordering::SeqCst);
460 if next_time_ns <= now_ns {
461 log::warn!(
462 "Timer '{}' alert time {} was in the past, adjusted to current time for immediate fire",
463 event_name,
464 next_time_ns,
465 );
466 next_time_ns = now_ns.into();
467 self.next_time_ns
468 .store(now_ns.as_u64(), atomic::Ordering::SeqCst);
469 }
470
471 let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(next_time_ns));
473 let next_time_atomic = self.next_time_ns.clone();
474
475 #[cfg(feature = "clock_v2")]
476 let heap = self.heap.clone();
477
478 let rt = get_runtime();
479 let handle = rt.spawn(async move {
480 let clock = get_atomic_clock_realtime();
481
482 let overhead = Duration::from_millis(1);
484 let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
485 let delay = Duration::from_nanos(delay_ns).saturating_sub(overhead);
486 let start = Instant::now() + delay;
487
488 let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
489
490 loop {
491 timer.tick().await;
494 let now_ns = clock.get_time_ns();
495
496 #[cfg(feature = "python")]
497 {
498 match callback {
499 TimeEventCallback::Python(ref callback) => {
500 call_python_with_time_event(event_name, next_time_ns, now_ns, callback);
501 }
502 TimeEventCallback::Rust(_) => {}
504 }
505 }
506
507 #[cfg(feature = "clock_v2")]
508 {
509 let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
510 heap.lock().await.push(event);
511 }
512
513 next_time_ns += interval_ns;
515 next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
516
517 if let Some(stop_time_ns) = stop_time_ns {
519 if std::cmp::max(next_time_ns, now_ns) >= stop_time_ns {
520 break; }
522 }
523 }
524 });
525
526 self.task_handle = Some(handle);
527 }
528
529 pub fn cancel(&mut self) {
533 log::debug!("Cancel timer '{}'", self.name);
534 if let Some(ref handle) = self.task_handle {
535 handle.abort();
536 }
537 }
538}
539
540#[cfg(feature = "python")]
541fn call_python_with_time_event(
542 name: Ustr,
543 ts_event: UnixNanos,
544 ts_init: UnixNanos,
545 callback: &PyObject,
546) {
547 use nautilus_core::python::IntoPyObjectNautilusExt;
548 use pyo3::types::PyCapsule;
549
550 Python::with_gil(|py| {
551 let event = TimeEvent::new(name, UUID4::new(), ts_event, ts_init);
553 let capsule: PyObject = PyCapsule::new(py, event, None)
554 .expect("Error creating `PyCapsule`")
555 .into_py_any_unwrap(py);
556
557 match callback.call1(py, (capsule,)) {
558 Ok(_) => {}
559 Err(e) => tracing::error!("Error on callback: {e:?}"),
560 }
561 });
562}
563
564#[cfg(test)]
568mod tests {
569 use std::num::NonZeroU64;
570
571 use nautilus_core::UnixNanos;
572 use rstest::*;
573 use ustr::Ustr;
574
575 use super::{TestTimer, TimeEvent};
576
577 #[rstest]
578 fn test_test_timer_pop_event() {
579 let mut timer = TestTimer::new(
580 Ustr::from("TEST_TIMER"),
581 NonZeroU64::new(1).unwrap(),
582 UnixNanos::from(1),
583 None,
584 );
585
586 assert!(timer.next().is_some());
587 assert!(timer.next().is_some());
588 timer.is_expired = true;
589 assert!(timer.next().is_none());
590 }
591
592 #[rstest]
593 fn test_test_timer_advance_within_next_time_ns() {
594 let mut timer = TestTimer::new(
595 Ustr::from("TEST_TIMER"),
596 NonZeroU64::new(5).unwrap(),
597 UnixNanos::default(),
598 None,
599 );
600 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
601 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
602 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
603 assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
604 assert_eq!(timer.next_time_ns, 5);
605 assert!(!timer.is_expired);
606 }
607
608 #[rstest]
609 fn test_test_timer_advance_up_to_next_time_ns() {
610 let mut timer = TestTimer::new(
611 Ustr::from("TEST_TIMER"),
612 NonZeroU64::new(1).unwrap(),
613 UnixNanos::default(),
614 None,
615 );
616 assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
617 assert!(!timer.is_expired);
618 }
619
620 #[rstest]
621 fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
622 let mut timer = TestTimer::new(
623 Ustr::from("TEST_TIMER"),
624 NonZeroU64::new(1).unwrap(),
625 UnixNanos::default(),
626 Some(UnixNanos::from(2)),
627 );
628 assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
629 assert!(timer.is_expired);
630 }
631
632 #[rstest]
633 fn test_test_timer_advance_beyond_next_time_ns() {
634 let mut timer = TestTimer::new(
635 Ustr::from("TEST_TIMER"),
636 NonZeroU64::new(1).unwrap(),
637 UnixNanos::default(),
638 Some(UnixNanos::from(5)),
639 );
640 assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
641 assert!(timer.is_expired);
642 }
643
644 #[rstest]
645 fn test_test_timer_advance_beyond_stop_time() {
646 let mut timer = TestTimer::new(
647 Ustr::from("TEST_TIMER"),
648 NonZeroU64::new(1).unwrap(),
649 UnixNanos::default(),
650 Some(UnixNanos::from(5)),
651 );
652 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
653 assert!(timer.is_expired);
654 }
655
656 #[rstest]
657 fn test_test_timer_advance_exact_boundary() {
658 let mut timer = TestTimer::new(
659 Ustr::from("TEST_TIMER"),
660 NonZeroU64::new(5).unwrap(),
661 UnixNanos::from(0),
662 None,
663 );
664 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(5)).collect();
665 assert_eq!(events.len(), 1, "Expected one event at the 5 ns boundary");
666
667 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
668 assert_eq!(events.len(), 1, "Expected one event at the 10 ns boundary");
669 }
670}