1#[rustfmt::skip]
19#[cfg(feature = "clock_v2")]
20use std::collections::BinaryHeap;
21
22#[rustfmt::skip]
23#[cfg(feature = "clock_v2")]
24use tokio::sync::Mutex;
25
26use std::{
27 cmp::Ordering,
28 fmt::{Debug, Display},
29 num::NonZeroU64,
30 rc::Rc,
31 sync::{
32 Arc,
33 atomic::{self, AtomicU64},
34 },
35};
36
37use nautilus_core::{
38 UUID4, UnixNanos,
39 correctness::{FAILED, check_valid_string},
40 datetime::floor_to_nearest_microsecond,
41 python::IntoPyObjectNautilusExt,
42 time::get_atomic_clock_realtime,
43};
44#[cfg(feature = "python")]
45use pyo3::{PyObject, Python};
46use tokio::{
47 task::JoinHandle,
48 time::{Duration, Instant},
49};
50use ustr::Ustr;
51
52use crate::runtime::get_runtime;
53
54#[must_use]
61pub const fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
62 NonZeroU64::new(interval_ns).expect("`interval_ns` must be positive")
63}
64
65#[repr(C)]
66#[derive(Clone, Debug)]
67#[cfg_attr(
68 feature = "python",
69 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
70)]
71#[derive(Eq)]
76pub struct TimeEvent {
77 pub name: Ustr,
79 pub event_id: UUID4,
81 pub ts_event: UnixNanos,
83 pub ts_init: UnixNanos,
85}
86
87impl PartialOrd for TimeEvent {
89 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
90 Some(self.cmp(other))
91 }
92}
93
94impl Ord for TimeEvent {
96 fn cmp(&self, other: &Self) -> Ordering {
97 other.ts_event.cmp(&self.ts_event)
98 }
99}
100
101impl TimeEvent {
102 #[must_use]
108 pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
109 Self {
110 name,
111 event_id,
112 ts_event,
113 ts_init,
114 }
115 }
116}
117
118impl Display for TimeEvent {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 write!(
121 f,
122 "TimeEvent(name={}, event_id={}, ts_event={}, ts_init={})",
123 self.name, self.event_id, self.ts_event, self.ts_init
124 )
125 }
126}
127
128impl PartialEq for TimeEvent {
129 fn eq(&self, other: &Self) -> bool {
130 self.event_id == other.event_id
131 }
132}
133
134pub type RustTimeEventCallback = dyn Fn(TimeEvent);
135
136#[derive(Clone)]
137pub enum TimeEventCallback {
138 #[cfg(feature = "python")]
139 Python(Arc<PyObject>),
140 Rust(Rc<RustTimeEventCallback>),
141}
142
143impl Debug for TimeEventCallback {
144 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145 match self {
146 #[cfg(feature = "python")]
147 Self::Python(_) => f.write_str("Python callback"),
148 Self::Rust(_) => f.write_str("Rust callback"),
149 }
150 }
151}
152
153impl TimeEventCallback {
154 pub fn call(&self, event: TimeEvent) {
155 match self {
156 #[cfg(feature = "python")]
157 Self::Python(callback) => {
158 Python::with_gil(|py| {
159 callback.call1(py, (event,)).unwrap();
160 });
161 }
162 Self::Rust(callback) => callback(event),
163 }
164 }
165}
166
167impl From<Rc<RustTimeEventCallback>> for TimeEventCallback {
168 fn from(value: Rc<RustTimeEventCallback>) -> Self {
169 Self::Rust(value)
170 }
171}
172
173#[cfg(feature = "python")]
174impl From<PyObject> for TimeEventCallback {
175 fn from(value: PyObject) -> Self {
176 Self::Python(Arc::new(value))
177 }
178}
179
180unsafe impl Send for TimeEventCallback {}
182unsafe impl Sync for TimeEventCallback {}
183
184#[repr(C)]
185#[derive(Clone, Debug)]
186pub struct TimeEventHandlerV2 {
191 pub event: TimeEvent,
193 pub callback: TimeEventCallback,
195}
196
197impl TimeEventHandlerV2 {
198 #[must_use]
200 pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
201 Self { event, callback }
202 }
203
204 pub fn run(self) {
205 let Self { event, callback } = self;
206 callback.call(event);
207 }
208}
209
210impl PartialOrd for TimeEventHandlerV2 {
211 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
212 Some(self.cmp(other))
213 }
214}
215
216impl PartialEq for TimeEventHandlerV2 {
217 fn eq(&self, other: &Self) -> bool {
218 self.event.ts_event == other.event.ts_event
219 }
220}
221
222impl Eq for TimeEventHandlerV2 {}
223
224impl Ord for TimeEventHandlerV2 {
225 fn cmp(&self, other: &Self) -> Ordering {
226 self.event.ts_event.cmp(&other.event.ts_event)
227 }
228}
229
230#[derive(Clone, Copy, Debug)]
235pub struct TestTimer {
236 pub name: Ustr,
238 pub interval_ns: NonZeroU64,
240 pub start_time_ns: UnixNanos,
242 pub stop_time_ns: Option<UnixNanos>,
244 next_time_ns: UnixNanos,
245 is_expired: bool,
246}
247
248impl TestTimer {
249 #[must_use]
256 pub fn new(
257 name: &str,
258 interval_ns: NonZeroU64,
259 start_time_ns: UnixNanos,
260 stop_time_ns: Option<UnixNanos>,
261 ) -> Self {
262 check_valid_string(name, stringify!(name)).expect(FAILED);
263
264 Self {
265 name: Ustr::from(name),
266 interval_ns,
267 start_time_ns,
268 stop_time_ns,
269 next_time_ns: start_time_ns + interval_ns.get(),
270 is_expired: false,
271 }
272 }
273
274 #[must_use]
276 pub const fn next_time_ns(&self) -> UnixNanos {
277 self.next_time_ns
278 }
279
280 #[must_use]
282 pub const fn is_expired(&self) -> bool {
283 self.is_expired
284 }
285
286 #[must_use]
287 pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
288 TimeEvent {
289 name: self.name,
290 event_id,
291 ts_event: self.next_time_ns,
292 ts_init,
293 }
294 }
295
296 pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
302 let advances = to_time_ns
303 .saturating_sub(self.next_time_ns.as_u64() - self.interval_ns.get())
304 / self.interval_ns.get();
305 self.take(advances as usize).map(|(event, _)| event)
306 }
307
308 pub const fn cancel(&mut self) {
312 self.is_expired = true;
313 }
314}
315
316impl Iterator for TestTimer {
317 type Item = (TimeEvent, UnixNanos);
318
319 fn next(&mut self) -> Option<Self::Item> {
320 if self.is_expired {
321 None
322 } else {
323 let item = (
324 TimeEvent {
325 name: self.name,
326 event_id: UUID4::new(),
327 ts_event: self.next_time_ns,
328 ts_init: self.next_time_ns,
329 },
330 self.next_time_ns,
331 );
332
333 if let Some(stop_time_ns) = self.stop_time_ns {
335 if self.next_time_ns >= stop_time_ns {
336 self.is_expired = true;
337 }
338 }
339
340 self.next_time_ns += self.interval_ns;
341
342 Some(item)
343 }
344 }
345}
346
347#[derive(Debug)]
352pub struct LiveTimer {
353 pub name: Ustr,
355 pub interval_ns: NonZeroU64,
357 pub start_time_ns: UnixNanos,
359 pub stop_time_ns: Option<UnixNanos>,
361 next_time_ns: Arc<AtomicU64>,
362 callback: TimeEventCallback,
363 task_handle: Option<JoinHandle<()>>,
364 #[cfg(feature = "clock_v2")]
365 heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
366}
367
368impl LiveTimer {
369 #[must_use]
377 #[cfg(not(feature = "clock_v2"))]
378 pub fn new(
379 name: &str,
380 interval_ns: NonZeroU64,
381 start_time_ns: UnixNanos,
382 stop_time_ns: Option<UnixNanos>,
383 callback: TimeEventCallback,
384 ) -> Self {
385 check_valid_string(name, stringify!(name)).expect(FAILED);
386
387 log::debug!("Creating timer '{name}'");
388 Self {
389 name: Ustr::from(name),
390 interval_ns,
391 start_time_ns,
392 stop_time_ns,
393 next_time_ns: Arc::new(AtomicU64::new(start_time_ns.as_u64() + interval_ns.get())),
394 callback,
395 task_handle: None,
396 }
397 }
398
399 #[must_use]
407 #[cfg(feature = "clock_v2")]
408 pub fn new(
409 name: &str,
410 interval_ns: NonZeroU64,
411 start_time_ns: UnixNanos,
412 stop_time_ns: Option<UnixNanos>,
413 callback: TimeEventCallback,
414 heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
415 ) -> Self {
416 check_valid_string(name, stringify!(name)).expect(FAILED);
417
418 log::debug!("Creating timer '{name}'");
419 Self {
420 name: Ustr::from(name),
421 interval_ns,
422 start_time_ns,
423 stop_time_ns,
424 next_time_ns: Arc::new(AtomicU64::new(start_time_ns.as_u64() + interval_ns.get())),
425 callback,
426 heap,
427 task_handle: None,
428 }
429 }
430
431 #[must_use]
435 pub fn next_time_ns(&self) -> UnixNanos {
436 UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
437 }
438
439 #[must_use]
444 pub fn is_expired(&self) -> bool {
445 self.task_handle
446 .as_ref()
447 .is_some_and(tokio::task::JoinHandle::is_finished)
448 }
449
450 pub fn start(&mut self) {
455 let event_name = self.name;
456 let stop_time_ns = self.stop_time_ns;
457 let next_time_ns = self.next_time_ns.load(atomic::Ordering::SeqCst);
458 let next_time_atomic = self.next_time_ns.clone();
459 let interval_ns = self.interval_ns.get();
460
461 let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(next_time_ns));
463
464 #[cfg(feature = "clock_v2")]
465 let heap = self.heap.clone();
466
467 let callback = self.callback.clone();
468 let rt = get_runtime();
469
470 let handle = rt.spawn(async move {
471 let clock = get_atomic_clock_realtime();
472 let now_ns = clock.get_time_ns();
473
474 let overhead = Duration::from_millis(1);
476 let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
477 let delay = Duration::from_nanos(delay_ns).saturating_sub(overhead);
478 let start = Instant::now() + delay;
479
480 let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
481
482 loop {
483 timer.tick().await;
486 let now_ns = clock.get_time_ns();
487
488 #[cfg(feature = "python")]
489 {
490 match callback {
491 TimeEventCallback::Python(ref callback) => {
492 call_python_with_time_event(event_name, next_time_ns, now_ns, callback);
493 }
494 TimeEventCallback::Rust(_) => {}
496 }
497 }
498
499 #[cfg(feature = "clock_v2")]
500 {
501 let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
502 heap.lock().await.push(event);
503 }
504
505 next_time_ns += interval_ns;
507 next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
508
509 if let Some(stop_time_ns) = stop_time_ns {
511 if std::cmp::max(next_time_ns, now_ns) >= stop_time_ns {
512 break; }
514 }
515 }
516 });
517
518 self.task_handle = Some(handle);
519 }
520
521 pub fn cancel(&mut self) {
525 log::debug!("Cancel timer '{}'", self.name);
526 if let Some(ref handle) = self.task_handle {
527 handle.abort();
528 }
529 }
530}
531
532#[cfg(feature = "python")]
533fn call_python_with_time_event(
534 name: Ustr,
535 ts_event: UnixNanos,
536 ts_init: UnixNanos,
537 callback: &PyObject,
538) {
539 use pyo3::types::PyCapsule;
540
541 Python::with_gil(|py| {
542 let event = TimeEvent::new(name, UUID4::new(), ts_event, ts_init);
544 let capsule: PyObject = PyCapsule::new(py, event, None)
545 .expect("Error creating `PyCapsule`")
546 .into_py_any_unwrap(py);
547
548 match callback.call1(py, (capsule,)) {
549 Ok(_) => {}
550 Err(e) => tracing::error!("Error on callback: {e:?}"),
551 }
552 });
553}
554
555#[cfg(test)]
559mod tests {
560 use std::num::NonZeroU64;
561
562 use nautilus_core::UnixNanos;
563 use rstest::*;
564
565 use super::{TestTimer, TimeEvent};
566
567 #[rstest]
568 fn test_test_timer_pop_event() {
569 let mut timer = TestTimer::new(
570 "test_timer",
571 NonZeroU64::new(1).unwrap(),
572 UnixNanos::from(1),
573 None,
574 );
575
576 assert!(timer.next().is_some());
577 assert!(timer.next().is_some());
578 timer.is_expired = true;
579 assert!(timer.next().is_none());
580 }
581
582 #[rstest]
583 fn test_test_timer_advance_within_next_time_ns() {
584 let mut timer = TestTimer::new(
585 "test_timer",
586 NonZeroU64::new(5).unwrap(),
587 UnixNanos::default(),
588 None,
589 );
590 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
591 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
592 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
593 assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
594 assert_eq!(timer.next_time_ns, 5);
595 assert!(!timer.is_expired);
596 }
597
598 #[rstest]
599 fn test_test_timer_advance_up_to_next_time_ns() {
600 let mut timer = TestTimer::new(
601 "test_timer",
602 NonZeroU64::new(1).unwrap(),
603 UnixNanos::default(),
604 None,
605 );
606 assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
607 assert!(!timer.is_expired);
608 }
609
610 #[rstest]
611 fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
612 let mut timer = TestTimer::new(
613 "test_timer",
614 NonZeroU64::new(1).unwrap(),
615 UnixNanos::default(),
616 Some(UnixNanos::from(2)),
617 );
618 assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
619 assert!(timer.is_expired);
620 }
621
622 #[rstest]
623 fn test_test_timer_advance_beyond_next_time_ns() {
624 let mut timer = TestTimer::new(
625 "test_timer",
626 NonZeroU64::new(1).unwrap(),
627 UnixNanos::default(),
628 Some(UnixNanos::from(5)),
629 );
630 assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
631 assert!(timer.is_expired);
632 }
633
634 #[rstest]
635 fn test_test_timer_advance_beyond_stop_time() {
636 let mut timer = TestTimer::new(
637 "test_timer",
638 NonZeroU64::new(1).unwrap(),
639 UnixNanos::default(),
640 Some(UnixNanos::from(5)),
641 );
642 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
643 assert!(timer.is_expired);
644 }
645
646 #[rstest]
647 fn test_test_timer_advance_exact_boundary() {
648 let mut timer = TestTimer::new(
649 "boundary_timer",
650 NonZeroU64::new(5).unwrap(),
651 UnixNanos::from(0),
652 None,
653 );
654 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(5)).collect();
655 assert_eq!(events.len(), 1, "Expected one event at the 5 ns boundary");
656
657 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
658 assert_eq!(events.len(), 1, "Expected one event at the 10 ns boundary");
659 }
660}