nautilus_common/
timer.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Real-time and test timers for use with `Clock` implementations.
17
18#[rustfmt::skip]
19#[cfg(feature = "clock_v2")]
20use std::collections::BinaryHeap;
21
22#[rustfmt::skip]
23#[cfg(feature = "clock_v2")]
24use tokio::sync::Mutex;
25
26use std::{
27    cmp::Ordering,
28    fmt::{Debug, Display},
29    num::NonZeroU64,
30    rc::Rc,
31    sync::{
32        atomic::{self, AtomicU64},
33        Arc,
34    },
35};
36
37use nautilus_core::{
38    correctness::{check_valid_string, FAILED},
39    datetime::floor_to_nearest_microsecond,
40    time::get_atomic_clock_realtime,
41    UnixNanos, UUID4,
42};
43#[cfg(feature = "python")]
44use pyo3::{PyObject, Python};
45use tokio::{
46    task::JoinHandle,
47    time::{Duration, Instant},
48};
49use ustr::Ustr;
50
51use crate::runtime::get_runtime;
52
53/// Creates a valid nanoseconds interval that is guaranteed to be positive.
54///
55/// # Panics
56///
57/// This function panics:
58/// - If `interval_ns` is zero.
59#[must_use]
60pub const fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
61    NonZeroU64::new(interval_ns).expect("`interval_ns` must be positive")
62}
63
64#[repr(C)]
65#[derive(Clone, Debug)]
66#[cfg_attr(
67    feature = "python",
68    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
69)]
70/// Represents a time event occurring at the event timestamp.
71///
72/// A `TimeEvent` carries metadata such as the event's name, a unique event ID,
73/// and timestamps indicating when the event was scheduled to occur and when it was initialized.
74#[derive(Eq)]
75pub struct TimeEvent {
76    /// The event name, identifying the nature or purpose of the event.
77    pub name: Ustr,
78    /// The unique identifier for the event.
79    pub event_id: UUID4,
80    /// UNIX timestamp (nanoseconds) when the event occurred.
81    pub ts_event: UnixNanos,
82    /// UNIX timestamp (nanoseconds) when the instance was initialized.
83    pub ts_init: UnixNanos,
84}
85
86/// Reverse order for `TimeEvent` comparison to be used in max heap.
87impl PartialOrd for TimeEvent {
88    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
89        Some(self.cmp(other))
90    }
91}
92
93/// Reverse order for `TimeEvent` comparison to be used in max heap.
94impl Ord for TimeEvent {
95    fn cmp(&self, other: &Self) -> Ordering {
96        other.ts_event.cmp(&self.ts_event)
97    }
98}
99
100impl TimeEvent {
101    /// Creates a new [`TimeEvent`] instance.
102    ///
103    /// # Safety
104    ///
105    /// - Assumes `name` is a valid string.
106    #[must_use]
107    pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
108        Self {
109            name,
110            event_id,
111            ts_event,
112            ts_init,
113        }
114    }
115}
116
117impl Display for TimeEvent {
118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119        write!(
120            f,
121            "TimeEvent(name={}, event_id={}, ts_event={}, ts_init={})",
122            self.name, self.event_id, self.ts_event, self.ts_init
123        )
124    }
125}
126
127impl PartialEq for TimeEvent {
128    fn eq(&self, other: &Self) -> bool {
129        self.event_id == other.event_id
130    }
131}
132
133pub type RustTimeEventCallback = dyn Fn(TimeEvent);
134
135#[derive(Clone)]
136pub enum TimeEventCallback {
137    #[cfg(feature = "python")]
138    Python(Arc<PyObject>),
139    Rust(Rc<RustTimeEventCallback>),
140}
141
142impl Debug for TimeEventCallback {
143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144        match self {
145            #[cfg(feature = "python")]
146            Self::Python(_) => f.write_str("Python callback"),
147            Self::Rust(_) => f.write_str("Rust callback"),
148        }
149    }
150}
151
152impl TimeEventCallback {
153    pub fn call(&self, event: TimeEvent) {
154        match self {
155            #[cfg(feature = "python")]
156            Self::Python(callback) => {
157                Python::with_gil(|py| {
158                    callback.call1(py, (event,)).unwrap();
159                });
160            }
161            Self::Rust(callback) => callback(event),
162        }
163    }
164}
165
166impl From<Rc<RustTimeEventCallback>> for TimeEventCallback {
167    fn from(value: Rc<RustTimeEventCallback>) -> Self {
168        Self::Rust(value)
169    }
170}
171
172#[cfg(feature = "python")]
173impl From<PyObject> for TimeEventCallback {
174    fn from(value: PyObject) -> Self {
175        Self::Python(Arc::new(value))
176    }
177}
178
179// SAFETY: Message handlers cannot be sent across thread boundaries
180unsafe impl Send for TimeEventCallback {}
181unsafe impl Sync for TimeEventCallback {}
182
183#[repr(C)]
184#[derive(Clone, Debug)]
185/// Represents a time event and its associated handler.
186///
187/// `TimeEventHandler` associates a `TimeEvent` with a callback function that is triggered
188/// when the event's timestamp is reached.
189pub struct TimeEventHandlerV2 {
190    /// The time event.
191    pub event: TimeEvent,
192    /// The callable handler for the event.
193    pub callback: TimeEventCallback,
194}
195
196impl TimeEventHandlerV2 {
197    /// Creates a new [`TimeEventHandlerV2`] instance.
198    #[must_use]
199    pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
200        Self { event, callback }
201    }
202
203    pub fn run(self) {
204        let Self { event, callback } = self;
205        callback.call(event);
206    }
207}
208
209impl PartialOrd for TimeEventHandlerV2 {
210    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
211        Some(self.cmp(other))
212    }
213}
214
215impl PartialEq for TimeEventHandlerV2 {
216    fn eq(&self, other: &Self) -> bool {
217        self.event.ts_event == other.event.ts_event
218    }
219}
220
221impl Eq for TimeEventHandlerV2 {}
222
223impl Ord for TimeEventHandlerV2 {
224    fn cmp(&self, other: &Self) -> Ordering {
225        self.event.ts_event.cmp(&other.event.ts_event)
226    }
227}
228
229/// A test timer for user with a `TestClock`.
230///
231/// `TestTimer` simulates time progression in a controlled environment,
232/// allowing for precise control over event generation in test scenarios.
233#[derive(Clone, Copy, Debug)]
234pub struct TestTimer {
235    /// The name of the timer.
236    pub name: Ustr,
237    /// The interval between timer events in nanoseconds.
238    pub interval_ns: NonZeroU64,
239    /// The start time of the timer in UNIX nanoseconds.
240    pub start_time_ns: UnixNanos,
241    /// The optional stop time of the timer in UNIX nanoseconds.
242    pub stop_time_ns: Option<UnixNanos>,
243    next_time_ns: UnixNanos,
244    is_expired: bool,
245}
246
247impl TestTimer {
248    /// Creates a new [`TestTimer`] instance.
249    ///
250    /// # Panics
251    ///
252    /// This function panics:
253    /// - If `name` is not a valid string.
254    #[must_use]
255    pub fn new(
256        name: &str,
257        interval_ns: NonZeroU64,
258        start_time_ns: UnixNanos,
259        stop_time_ns: Option<UnixNanos>,
260    ) -> Self {
261        check_valid_string(name, stringify!(name)).expect(FAILED);
262
263        Self {
264            name: Ustr::from(name),
265            interval_ns,
266            start_time_ns,
267            stop_time_ns,
268            next_time_ns: start_time_ns + interval_ns.get(),
269            is_expired: false,
270        }
271    }
272
273    /// Returns the next time in UNIX nanoseconds when the timer will fire.
274    #[must_use]
275    pub const fn next_time_ns(&self) -> UnixNanos {
276        self.next_time_ns
277    }
278
279    /// Returns whether the timer is expired.
280    #[must_use]
281    pub const fn is_expired(&self) -> bool {
282        self.is_expired
283    }
284
285    #[must_use]
286    pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
287        TimeEvent {
288            name: self.name,
289            event_id,
290            ts_event: self.next_time_ns,
291            ts_init,
292        }
293    }
294
295    /// Advance the test timer forward to the given time, generating a sequence
296    /// of events. A [`TimeEvent`] is appended for each time a next event is
297    /// <= the given `to_time_ns`.
298    ///
299    /// This allows testing of multiple time intervals within a single step.
300    pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
301        let advances = to_time_ns
302            .saturating_sub(self.next_time_ns.as_u64() - self.interval_ns.get())
303            / self.interval_ns.get();
304        self.take(advances as usize).map(|(event, _)| event)
305    }
306
307    /// Cancels the timer (the timer will not generate an event).
308    ///
309    /// Used to stop the timer before its scheduled stop time.
310    pub const fn cancel(&mut self) {
311        self.is_expired = true;
312    }
313}
314
315impl Iterator for TestTimer {
316    type Item = (TimeEvent, UnixNanos);
317
318    fn next(&mut self) -> Option<Self::Item> {
319        if self.is_expired {
320            None
321        } else {
322            let item = (
323                TimeEvent {
324                    name: self.name,
325                    event_id: UUID4::new(),
326                    ts_event: self.next_time_ns,
327                    ts_init: self.next_time_ns,
328                },
329                self.next_time_ns,
330            );
331
332            // If current next event time has exceeded stop time, then expire timer
333            if let Some(stop_time_ns) = self.stop_time_ns {
334                if self.next_time_ns >= stop_time_ns {
335                    self.is_expired = true;
336                }
337            }
338
339            self.next_time_ns += self.interval_ns;
340
341            Some(item)
342        }
343    }
344}
345
346/// A live timer for use with a `LiveClock`.
347///
348/// `LiveTimer` triggers events at specified intervals in a real-time environment,
349/// using Tokio's async runtime to handle scheduling and execution.
350#[derive(Debug)]
351pub struct LiveTimer {
352    /// The name of the timer.
353    pub name: Ustr,
354    /// The start time of the timer in UNIX nanoseconds.
355    pub interval_ns: NonZeroU64,
356    /// The start time of the timer in UNIX nanoseconds.
357    pub start_time_ns: UnixNanos,
358    /// The optional stop time of the timer in UNIX nanoseconds.
359    pub stop_time_ns: Option<UnixNanos>,
360    next_time_ns: Arc<AtomicU64>,
361    callback: TimeEventCallback,
362    task_handle: Option<JoinHandle<()>>,
363    #[cfg(feature = "clock_v2")]
364    heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
365}
366
367impl LiveTimer {
368    /// Creates a new [`LiveTimer`] instance.
369    ///
370    /// # Panics
371    ///
372    /// This function panics:
373    /// - If `name` is not a valid string.
374    /// - If `interval_ns` is zero.
375    #[must_use]
376    #[cfg(not(feature = "clock_v2"))]
377    pub fn new(
378        name: &str,
379        interval_ns: NonZeroU64,
380        start_time_ns: UnixNanos,
381        stop_time_ns: Option<UnixNanos>,
382        callback: TimeEventCallback,
383    ) -> Self {
384        check_valid_string(name, stringify!(name)).expect(FAILED);
385
386        log::debug!("Creating timer '{name}'");
387        Self {
388            name: Ustr::from(name),
389            interval_ns,
390            start_time_ns,
391            stop_time_ns,
392            next_time_ns: Arc::new(AtomicU64::new(start_time_ns.as_u64() + interval_ns.get())),
393            callback,
394            task_handle: None,
395        }
396    }
397
398    /// Creates a new [`LiveTimer`] instance.
399    ///
400    /// # Panics
401    ///
402    /// This function panics:
403    /// - If `name` is not a valid string.
404    /// - If `interval_ns` is zero.
405    #[must_use]
406    #[cfg(feature = "clock_v2")]
407    pub fn new(
408        name: &str,
409        interval_ns: NonZeroU64,
410        start_time_ns: UnixNanos,
411        stop_time_ns: Option<UnixNanos>,
412        callback: TimeEventCallback,
413        heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
414    ) -> Self {
415        check_valid_string(name, stringify!(name)).expect(FAILED);
416
417        log::debug!("Creating timer '{name}'");
418        Self {
419            name: Ustr::from(name),
420            interval_ns,
421            start_time_ns,
422            stop_time_ns,
423            next_time_ns: Arc::new(AtomicU64::new(start_time_ns.as_u64() + interval_ns.get())),
424            callback,
425            heap,
426            task_handle: None,
427        }
428    }
429
430    /// Returns the next time in UNIX nanoseconds when the timer will fire.
431    ///
432    /// Provides the scheduled time for the next event based on the current state of the timer.
433    #[must_use]
434    pub fn next_time_ns(&self) -> UnixNanos {
435        UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
436    }
437
438    /// Returns whether the timer is expired.
439    ///
440    /// An expired timer will not trigger any further events.
441    /// A timer that has not been started is not expired.
442    #[must_use]
443    pub fn is_expired(&self) -> bool {
444        self.task_handle
445            .as_ref()
446            .is_some_and(tokio::task::JoinHandle::is_finished)
447    }
448
449    /// Starts the timer.
450    ///
451    /// Time events will begin triggering at the specified intervals.
452    /// The generated events are handled by the provided callback function.
453    pub fn start(&mut self) {
454        let event_name = self.name;
455        let stop_time_ns = self.stop_time_ns;
456        let next_time_ns = self.next_time_ns.load(atomic::Ordering::SeqCst);
457        let next_time_atomic = self.next_time_ns.clone();
458        let interval_ns = self.interval_ns.get();
459
460        // Floor the next time to the nearest microsecond which is within the timers accuracy
461        let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(next_time_ns));
462
463        #[cfg(feature = "clock_v2")]
464        let heap = self.heap.clone();
465
466        let callback = self.callback.clone();
467        let rt = get_runtime();
468
469        let handle = rt.spawn(async move {
470            let clock = get_atomic_clock_realtime();
471            let now_ns = clock.get_time_ns();
472
473            // 1-millisecond delay to account for the overhead of initializing a tokio timer
474            let overhead = Duration::from_millis(1);
475            let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
476            let delay = Duration::from_nanos(delay_ns).saturating_sub(overhead);
477            let start = Instant::now() + delay;
478
479            let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
480
481            loop {
482                // SAFETY: `timer.tick` is cancellation safe, if the cancel branch completes
483                // first then no tick has been consumed (no event was ready).
484                timer.tick().await;
485                let now_ns = clock.get_time_ns();
486
487                #[cfg(feature = "python")]
488                {
489                    match callback {
490                        TimeEventCallback::Python(ref callback) => {
491                            call_python_with_time_event(event_name, next_time_ns, now_ns, callback);
492                        }
493                        // Note: Clock v1 style path should not be called with Rust callback
494                        TimeEventCallback::Rust(_) => {}
495                    }
496                }
497
498                #[cfg(feature = "clock_v2")]
499                {
500                    let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
501                    heap.lock().await.push(event);
502                }
503
504                // Prepare next time interval
505                next_time_ns += interval_ns;
506                next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
507
508                // Check if expired
509                if let Some(stop_time_ns) = stop_time_ns {
510                    if std::cmp::max(next_time_ns, now_ns) >= stop_time_ns {
511                        break; // Timer expired
512                    }
513                }
514            }
515        });
516
517        self.task_handle = Some(handle);
518    }
519
520    /// Cancels the timer.
521    ///
522    /// The timer will not generate a final event.
523    pub fn cancel(&mut self) {
524        log::debug!("Cancel timer '{}'", self.name);
525        if let Some(ref handle) = self.task_handle {
526            handle.abort();
527        }
528    }
529}
530
531#[cfg(feature = "python")]
532fn call_python_with_time_event(
533    name: Ustr,
534    ts_event: UnixNanos,
535    ts_init: UnixNanos,
536    callback: &PyObject,
537) {
538    use pyo3::{types::PyCapsule, IntoPy};
539
540    Python::with_gil(|py| {
541        // Create new time event
542        let event = TimeEvent::new(name, UUID4::new(), ts_event, ts_init);
543        let capsule: PyObject = PyCapsule::new(py, event, None)
544            .expect("Error creating `PyCapsule`")
545            .into_py(py);
546
547        match callback.call1(py, (capsule,)) {
548            Ok(_) => {}
549            Err(e) => tracing::error!("Error on callback: {e:?}"),
550        }
551    });
552}
553
554////////////////////////////////////////////////////////////////////////////////
555// Tests
556////////////////////////////////////////////////////////////////////////////////
557#[cfg(test)]
558mod tests {
559    use std::num::NonZeroU64;
560
561    use nautilus_core::UnixNanos;
562    use rstest::*;
563
564    use super::{TestTimer, TimeEvent};
565
566    #[rstest]
567    fn test_test_timer_pop_event() {
568        let mut timer = TestTimer::new(
569            "test_timer",
570            NonZeroU64::new(1).unwrap(),
571            UnixNanos::from(1),
572            None,
573        );
574
575        assert!(timer.next().is_some());
576        assert!(timer.next().is_some());
577        timer.is_expired = true;
578        assert!(timer.next().is_none());
579    }
580
581    #[rstest]
582    fn test_test_timer_advance_within_next_time_ns() {
583        let mut timer = TestTimer::new(
584            "test_timer",
585            NonZeroU64::new(5).unwrap(),
586            UnixNanos::default(),
587            None,
588        );
589        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
590        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
591        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
592        assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
593        assert_eq!(timer.next_time_ns, 5);
594        assert!(!timer.is_expired);
595    }
596
597    #[rstest]
598    fn test_test_timer_advance_up_to_next_time_ns() {
599        let mut timer = TestTimer::new(
600            "test_timer",
601            NonZeroU64::new(1).unwrap(),
602            UnixNanos::default(),
603            None,
604        );
605        assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
606        assert!(!timer.is_expired);
607    }
608
609    #[rstest]
610    fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
611        let mut timer = TestTimer::new(
612            "test_timer",
613            NonZeroU64::new(1).unwrap(),
614            UnixNanos::default(),
615            Some(UnixNanos::from(2)),
616        );
617        assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
618        assert!(timer.is_expired);
619    }
620
621    #[rstest]
622    fn test_test_timer_advance_beyond_next_time_ns() {
623        let mut timer = TestTimer::new(
624            "test_timer",
625            NonZeroU64::new(1).unwrap(),
626            UnixNanos::default(),
627            Some(UnixNanos::from(5)),
628        );
629        assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
630        assert!(timer.is_expired);
631    }
632
633    #[rstest]
634    fn test_test_timer_advance_beyond_stop_time() {
635        let mut timer = TestTimer::new(
636            "test_timer",
637            NonZeroU64::new(1).unwrap(),
638            UnixNanos::default(),
639            Some(UnixNanos::from(5)),
640        );
641        assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
642        assert!(timer.is_expired);
643    }
644
645    #[rstest]
646    fn test_test_timer_advance_exact_boundary() {
647        let mut timer = TestTimer::new(
648            "boundary_timer",
649            NonZeroU64::new(5).unwrap(),
650            UnixNanos::from(0),
651            None,
652        );
653        let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(5)).collect();
654        assert_eq!(events.len(), 1, "Expected one event at the 5 ns boundary");
655
656        let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
657        assert_eq!(events.len(), 1, "Expected one event at the 10 ns boundary");
658    }
659}