1#[rustfmt::skip]
19#[cfg(feature = "clock_v2")]
20use std::collections::BinaryHeap;
21
22#[rustfmt::skip]
23#[cfg(feature = "clock_v2")]
24use tokio::sync::Mutex;
25
26use std::{
27 cmp::Ordering,
28 fmt::{Debug, Display},
29 num::NonZeroU64,
30 rc::Rc,
31 sync::{
32 atomic::{self, AtomicU64},
33 Arc,
34 },
35};
36
37use nautilus_core::{
38 correctness::{check_valid_string, FAILED},
39 datetime::floor_to_nearest_microsecond,
40 time::get_atomic_clock_realtime,
41 UnixNanos, UUID4,
42};
43#[cfg(feature = "python")]
44use pyo3::{PyObject, Python};
45use tokio::{
46 task::JoinHandle,
47 time::{Duration, Instant},
48};
49use ustr::Ustr;
50
51use crate::runtime::get_runtime;
52
53#[must_use]
60pub const fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
61 NonZeroU64::new(interval_ns).expect("`interval_ns` must be positive")
62}
63
64#[repr(C)]
65#[derive(Clone, Debug)]
66#[cfg_attr(
67 feature = "python",
68 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
69)]
70#[derive(Eq)]
75pub struct TimeEvent {
76 pub name: Ustr,
78 pub event_id: UUID4,
80 pub ts_event: UnixNanos,
82 pub ts_init: UnixNanos,
84}
85
86impl PartialOrd for TimeEvent {
88 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
89 Some(self.cmp(other))
90 }
91}
92
93impl Ord for TimeEvent {
95 fn cmp(&self, other: &Self) -> Ordering {
96 other.ts_event.cmp(&self.ts_event)
97 }
98}
99
100impl TimeEvent {
101 #[must_use]
107 pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
108 Self {
109 name,
110 event_id,
111 ts_event,
112 ts_init,
113 }
114 }
115}
116
117impl Display for TimeEvent {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 write!(
120 f,
121 "TimeEvent(name={}, event_id={}, ts_event={}, ts_init={})",
122 self.name, self.event_id, self.ts_event, self.ts_init
123 )
124 }
125}
126
127impl PartialEq for TimeEvent {
128 fn eq(&self, other: &Self) -> bool {
129 self.event_id == other.event_id
130 }
131}
132
133pub type RustTimeEventCallback = dyn Fn(TimeEvent);
134
135#[derive(Clone)]
136pub enum TimeEventCallback {
137 #[cfg(feature = "python")]
138 Python(Arc<PyObject>),
139 Rust(Rc<RustTimeEventCallback>),
140}
141
142impl Debug for TimeEventCallback {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 match self {
145 #[cfg(feature = "python")]
146 Self::Python(_) => f.write_str("Python callback"),
147 Self::Rust(_) => f.write_str("Rust callback"),
148 }
149 }
150}
151
152impl TimeEventCallback {
153 pub fn call(&self, event: TimeEvent) {
154 match self {
155 #[cfg(feature = "python")]
156 Self::Python(callback) => {
157 Python::with_gil(|py| {
158 callback.call1(py, (event,)).unwrap();
159 });
160 }
161 Self::Rust(callback) => callback(event),
162 }
163 }
164}
165
166impl From<Rc<RustTimeEventCallback>> for TimeEventCallback {
167 fn from(value: Rc<RustTimeEventCallback>) -> Self {
168 Self::Rust(value)
169 }
170}
171
172#[cfg(feature = "python")]
173impl From<PyObject> for TimeEventCallback {
174 fn from(value: PyObject) -> Self {
175 Self::Python(Arc::new(value))
176 }
177}
178
179unsafe impl Send for TimeEventCallback {}
181unsafe impl Sync for TimeEventCallback {}
182
183#[repr(C)]
184#[derive(Clone, Debug)]
185pub struct TimeEventHandlerV2 {
190 pub event: TimeEvent,
192 pub callback: TimeEventCallback,
194}
195
196impl TimeEventHandlerV2 {
197 #[must_use]
199 pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
200 Self { event, callback }
201 }
202
203 pub fn run(self) {
204 let Self { event, callback } = self;
205 callback.call(event);
206 }
207}
208
209impl PartialOrd for TimeEventHandlerV2 {
210 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
211 Some(self.cmp(other))
212 }
213}
214
215impl PartialEq for TimeEventHandlerV2 {
216 fn eq(&self, other: &Self) -> bool {
217 self.event.ts_event == other.event.ts_event
218 }
219}
220
221impl Eq for TimeEventHandlerV2 {}
222
223impl Ord for TimeEventHandlerV2 {
224 fn cmp(&self, other: &Self) -> Ordering {
225 self.event.ts_event.cmp(&other.event.ts_event)
226 }
227}
228
229#[derive(Clone, Copy, Debug)]
234pub struct TestTimer {
235 pub name: Ustr,
237 pub interval_ns: NonZeroU64,
239 pub start_time_ns: UnixNanos,
241 pub stop_time_ns: Option<UnixNanos>,
243 next_time_ns: UnixNanos,
244 is_expired: bool,
245}
246
247impl TestTimer {
248 #[must_use]
255 pub fn new(
256 name: &str,
257 interval_ns: NonZeroU64,
258 start_time_ns: UnixNanos,
259 stop_time_ns: Option<UnixNanos>,
260 ) -> Self {
261 check_valid_string(name, stringify!(name)).expect(FAILED);
262
263 Self {
264 name: Ustr::from(name),
265 interval_ns,
266 start_time_ns,
267 stop_time_ns,
268 next_time_ns: start_time_ns + interval_ns.get(),
269 is_expired: false,
270 }
271 }
272
273 #[must_use]
275 pub const fn next_time_ns(&self) -> UnixNanos {
276 self.next_time_ns
277 }
278
279 #[must_use]
281 pub const fn is_expired(&self) -> bool {
282 self.is_expired
283 }
284
285 #[must_use]
286 pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
287 TimeEvent {
288 name: self.name,
289 event_id,
290 ts_event: self.next_time_ns,
291 ts_init,
292 }
293 }
294
295 pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
301 let advances = to_time_ns
302 .saturating_sub(self.next_time_ns.as_u64() - self.interval_ns.get())
303 / self.interval_ns.get();
304 self.take(advances as usize).map(|(event, _)| event)
305 }
306
307 pub const fn cancel(&mut self) {
311 self.is_expired = true;
312 }
313}
314
315impl Iterator for TestTimer {
316 type Item = (TimeEvent, UnixNanos);
317
318 fn next(&mut self) -> Option<Self::Item> {
319 if self.is_expired {
320 None
321 } else {
322 let item = (
323 TimeEvent {
324 name: self.name,
325 event_id: UUID4::new(),
326 ts_event: self.next_time_ns,
327 ts_init: self.next_time_ns,
328 },
329 self.next_time_ns,
330 );
331
332 if let Some(stop_time_ns) = self.stop_time_ns {
334 if self.next_time_ns >= stop_time_ns {
335 self.is_expired = true;
336 }
337 }
338
339 self.next_time_ns += self.interval_ns;
340
341 Some(item)
342 }
343 }
344}
345
346#[derive(Debug)]
351pub struct LiveTimer {
352 pub name: Ustr,
354 pub interval_ns: NonZeroU64,
356 pub start_time_ns: UnixNanos,
358 pub stop_time_ns: Option<UnixNanos>,
360 next_time_ns: Arc<AtomicU64>,
361 callback: TimeEventCallback,
362 task_handle: Option<JoinHandle<()>>,
363 #[cfg(feature = "clock_v2")]
364 heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
365}
366
367impl LiveTimer {
368 #[must_use]
376 #[cfg(not(feature = "clock_v2"))]
377 pub fn new(
378 name: &str,
379 interval_ns: NonZeroU64,
380 start_time_ns: UnixNanos,
381 stop_time_ns: Option<UnixNanos>,
382 callback: TimeEventCallback,
383 ) -> Self {
384 check_valid_string(name, stringify!(name)).expect(FAILED);
385
386 log::debug!("Creating timer '{name}'");
387 Self {
388 name: Ustr::from(name),
389 interval_ns,
390 start_time_ns,
391 stop_time_ns,
392 next_time_ns: Arc::new(AtomicU64::new(start_time_ns.as_u64() + interval_ns.get())),
393 callback,
394 task_handle: None,
395 }
396 }
397
398 #[must_use]
406 #[cfg(feature = "clock_v2")]
407 pub fn new(
408 name: &str,
409 interval_ns: NonZeroU64,
410 start_time_ns: UnixNanos,
411 stop_time_ns: Option<UnixNanos>,
412 callback: TimeEventCallback,
413 heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
414 ) -> Self {
415 check_valid_string(name, stringify!(name)).expect(FAILED);
416
417 log::debug!("Creating timer '{name}'");
418 Self {
419 name: Ustr::from(name),
420 interval_ns,
421 start_time_ns,
422 stop_time_ns,
423 next_time_ns: Arc::new(AtomicU64::new(start_time_ns.as_u64() + interval_ns.get())),
424 callback,
425 heap,
426 task_handle: None,
427 }
428 }
429
430 #[must_use]
434 pub fn next_time_ns(&self) -> UnixNanos {
435 UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
436 }
437
438 #[must_use]
443 pub fn is_expired(&self) -> bool {
444 self.task_handle
445 .as_ref()
446 .is_some_and(tokio::task::JoinHandle::is_finished)
447 }
448
449 pub fn start(&mut self) {
454 let event_name = self.name;
455 let stop_time_ns = self.stop_time_ns;
456 let next_time_ns = self.next_time_ns.load(atomic::Ordering::SeqCst);
457 let next_time_atomic = self.next_time_ns.clone();
458 let interval_ns = self.interval_ns.get();
459
460 let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(next_time_ns));
462
463 #[cfg(feature = "clock_v2")]
464 let heap = self.heap.clone();
465
466 let callback = self.callback.clone();
467 let rt = get_runtime();
468
469 let handle = rt.spawn(async move {
470 let clock = get_atomic_clock_realtime();
471 let now_ns = clock.get_time_ns();
472
473 let overhead = Duration::from_millis(1);
475 let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
476 let delay = Duration::from_nanos(delay_ns).saturating_sub(overhead);
477 let start = Instant::now() + delay;
478
479 let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
480
481 loop {
482 timer.tick().await;
485 let now_ns = clock.get_time_ns();
486
487 #[cfg(feature = "python")]
488 {
489 match callback {
490 TimeEventCallback::Python(ref callback) => {
491 call_python_with_time_event(event_name, next_time_ns, now_ns, callback);
492 }
493 TimeEventCallback::Rust(_) => {}
495 }
496 }
497
498 #[cfg(feature = "clock_v2")]
499 {
500 let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
501 heap.lock().await.push(event);
502 }
503
504 next_time_ns += interval_ns;
506 next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
507
508 if let Some(stop_time_ns) = stop_time_ns {
510 if std::cmp::max(next_time_ns, now_ns) >= stop_time_ns {
511 break; }
513 }
514 }
515 });
516
517 self.task_handle = Some(handle);
518 }
519
520 pub fn cancel(&mut self) {
524 log::debug!("Cancel timer '{}'", self.name);
525 if let Some(ref handle) = self.task_handle {
526 handle.abort();
527 }
528 }
529}
530
531#[cfg(feature = "python")]
532fn call_python_with_time_event(
533 name: Ustr,
534 ts_event: UnixNanos,
535 ts_init: UnixNanos,
536 callback: &PyObject,
537) {
538 use pyo3::{types::PyCapsule, IntoPy};
539
540 Python::with_gil(|py| {
541 let event = TimeEvent::new(name, UUID4::new(), ts_event, ts_init);
543 let capsule: PyObject = PyCapsule::new(py, event, None)
544 .expect("Error creating `PyCapsule`")
545 .into_py(py);
546
547 match callback.call1(py, (capsule,)) {
548 Ok(_) => {}
549 Err(e) => tracing::error!("Error on callback: {e:?}"),
550 }
551 });
552}
553
554#[cfg(test)]
558mod tests {
559 use std::num::NonZeroU64;
560
561 use nautilus_core::UnixNanos;
562 use rstest::*;
563
564 use super::{TestTimer, TimeEvent};
565
566 #[rstest]
567 fn test_test_timer_pop_event() {
568 let mut timer = TestTimer::new(
569 "test_timer",
570 NonZeroU64::new(1).unwrap(),
571 UnixNanos::from(1),
572 None,
573 );
574
575 assert!(timer.next().is_some());
576 assert!(timer.next().is_some());
577 timer.is_expired = true;
578 assert!(timer.next().is_none());
579 }
580
581 #[rstest]
582 fn test_test_timer_advance_within_next_time_ns() {
583 let mut timer = TestTimer::new(
584 "test_timer",
585 NonZeroU64::new(5).unwrap(),
586 UnixNanos::default(),
587 None,
588 );
589 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
590 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
591 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
592 assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
593 assert_eq!(timer.next_time_ns, 5);
594 assert!(!timer.is_expired);
595 }
596
597 #[rstest]
598 fn test_test_timer_advance_up_to_next_time_ns() {
599 let mut timer = TestTimer::new(
600 "test_timer",
601 NonZeroU64::new(1).unwrap(),
602 UnixNanos::default(),
603 None,
604 );
605 assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
606 assert!(!timer.is_expired);
607 }
608
609 #[rstest]
610 fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
611 let mut timer = TestTimer::new(
612 "test_timer",
613 NonZeroU64::new(1).unwrap(),
614 UnixNanos::default(),
615 Some(UnixNanos::from(2)),
616 );
617 assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
618 assert!(timer.is_expired);
619 }
620
621 #[rstest]
622 fn test_test_timer_advance_beyond_next_time_ns() {
623 let mut timer = TestTimer::new(
624 "test_timer",
625 NonZeroU64::new(1).unwrap(),
626 UnixNanos::default(),
627 Some(UnixNanos::from(5)),
628 );
629 assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
630 assert!(timer.is_expired);
631 }
632
633 #[rstest]
634 fn test_test_timer_advance_beyond_stop_time() {
635 let mut timer = TestTimer::new(
636 "test_timer",
637 NonZeroU64::new(1).unwrap(),
638 UnixNanos::default(),
639 Some(UnixNanos::from(5)),
640 );
641 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
642 assert!(timer.is_expired);
643 }
644
645 #[rstest]
646 fn test_test_timer_advance_exact_boundary() {
647 let mut timer = TestTimer::new(
648 "boundary_timer",
649 NonZeroU64::new(5).unwrap(),
650 UnixNanos::from(0),
651 None,
652 );
653 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(5)).collect();
654 assert_eq!(events.len(), 1, "Expected one event at the 5 ns boundary");
655
656 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
657 assert_eq!(events.len(), 1, "Expected one event at the 10 ns boundary");
658 }
659}