nautilus_common/
timer.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Real-time and test timers for use with `Clock` implementations.
17
18#[rustfmt::skip]
19#[cfg(feature = "clock_v2")]
20use std::collections::BinaryHeap;
21
22#[rustfmt::skip]
23#[cfg(feature = "clock_v2")]
24use tokio::sync::Mutex;
25
26use std::{
27    cmp::Ordering,
28    fmt::{Debug, Display},
29    num::NonZeroU64,
30    rc::Rc,
31    sync::{
32        Arc,
33        atomic::{self, AtomicU64},
34    },
35};
36
37use nautilus_core::{
38    UUID4, UnixNanos,
39    correctness::{FAILED, check_valid_string},
40    datetime::floor_to_nearest_microsecond,
41    python::IntoPyObjectNautilusExt,
42    time::get_atomic_clock_realtime,
43};
44#[cfg(feature = "python")]
45use pyo3::{PyObject, Python};
46use tokio::{
47    task::JoinHandle,
48    time::{Duration, Instant},
49};
50use ustr::Ustr;
51
52use crate::runtime::get_runtime;
53
54/// Creates a valid nanoseconds interval that is guaranteed to be positive.
55///
56/// # Panics
57///
58/// This function panics:
59/// - If `interval_ns` is zero.
60#[must_use]
61pub const fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
62    NonZeroU64::new(interval_ns).expect("`interval_ns` must be positive")
63}
64
65#[repr(C)]
66#[derive(Clone, Debug)]
67#[cfg_attr(
68    feature = "python",
69    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
70)]
71/// Represents a time event occurring at the event timestamp.
72///
73/// A `TimeEvent` carries metadata such as the event's name, a unique event ID,
74/// and timestamps indicating when the event was scheduled to occur and when it was initialized.
75#[derive(Eq)]
76pub struct TimeEvent {
77    /// The event name, identifying the nature or purpose of the event.
78    pub name: Ustr,
79    /// The unique identifier for the event.
80    pub event_id: UUID4,
81    /// UNIX timestamp (nanoseconds) when the event occurred.
82    pub ts_event: UnixNanos,
83    /// UNIX timestamp (nanoseconds) when the instance was initialized.
84    pub ts_init: UnixNanos,
85}
86
87/// Reverse order for `TimeEvent` comparison to be used in max heap.
88impl PartialOrd for TimeEvent {
89    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
90        Some(self.cmp(other))
91    }
92}
93
94/// Reverse order for `TimeEvent` comparison to be used in max heap.
95impl Ord for TimeEvent {
96    fn cmp(&self, other: &Self) -> Ordering {
97        other.ts_event.cmp(&self.ts_event)
98    }
99}
100
101impl TimeEvent {
102    /// Creates a new [`TimeEvent`] instance.
103    ///
104    /// # Safety
105    ///
106    /// - Assumes `name` is a valid string.
107    #[must_use]
108    pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
109        Self {
110            name,
111            event_id,
112            ts_event,
113            ts_init,
114        }
115    }
116}
117
118impl Display for TimeEvent {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        write!(
121            f,
122            "TimeEvent(name={}, event_id={}, ts_event={}, ts_init={})",
123            self.name, self.event_id, self.ts_event, self.ts_init
124        )
125    }
126}
127
128impl PartialEq for TimeEvent {
129    fn eq(&self, other: &Self) -> bool {
130        self.event_id == other.event_id
131    }
132}
133
134pub type RustTimeEventCallback = dyn Fn(TimeEvent);
135
136#[derive(Clone)]
137pub enum TimeEventCallback {
138    #[cfg(feature = "python")]
139    Python(Arc<PyObject>),
140    Rust(Rc<RustTimeEventCallback>),
141}
142
143impl Debug for TimeEventCallback {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        match self {
146            #[cfg(feature = "python")]
147            Self::Python(_) => f.write_str("Python callback"),
148            Self::Rust(_) => f.write_str("Rust callback"),
149        }
150    }
151}
152
153impl TimeEventCallback {
154    pub fn call(&self, event: TimeEvent) {
155        match self {
156            #[cfg(feature = "python")]
157            Self::Python(callback) => {
158                Python::with_gil(|py| {
159                    callback.call1(py, (event,)).unwrap();
160                });
161            }
162            Self::Rust(callback) => callback(event),
163        }
164    }
165}
166
167impl From<Rc<RustTimeEventCallback>> for TimeEventCallback {
168    fn from(value: Rc<RustTimeEventCallback>) -> Self {
169        Self::Rust(value)
170    }
171}
172
173#[cfg(feature = "python")]
174impl From<PyObject> for TimeEventCallback {
175    fn from(value: PyObject) -> Self {
176        Self::Python(Arc::new(value))
177    }
178}
179
180// SAFETY: Message handlers cannot be sent across thread boundaries
181unsafe impl Send for TimeEventCallback {}
182unsafe impl Sync for TimeEventCallback {}
183
184#[repr(C)]
185#[derive(Clone, Debug)]
186/// Represents a time event and its associated handler.
187///
188/// `TimeEventHandler` associates a `TimeEvent` with a callback function that is triggered
189/// when the event's timestamp is reached.
190pub struct TimeEventHandlerV2 {
191    /// The time event.
192    pub event: TimeEvent,
193    /// The callable handler for the event.
194    pub callback: TimeEventCallback,
195}
196
197impl TimeEventHandlerV2 {
198    /// Creates a new [`TimeEventHandlerV2`] instance.
199    #[must_use]
200    pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
201        Self { event, callback }
202    }
203
204    pub fn run(self) {
205        let Self { event, callback } = self;
206        callback.call(event);
207    }
208}
209
210impl PartialOrd for TimeEventHandlerV2 {
211    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
212        Some(self.cmp(other))
213    }
214}
215
216impl PartialEq for TimeEventHandlerV2 {
217    fn eq(&self, other: &Self) -> bool {
218        self.event.ts_event == other.event.ts_event
219    }
220}
221
222impl Eq for TimeEventHandlerV2 {}
223
224impl Ord for TimeEventHandlerV2 {
225    fn cmp(&self, other: &Self) -> Ordering {
226        self.event.ts_event.cmp(&other.event.ts_event)
227    }
228}
229
230/// A test timer for user with a `TestClock`.
231///
232/// `TestTimer` simulates time progression in a controlled environment,
233/// allowing for precise control over event generation in test scenarios.
234#[derive(Clone, Copy, Debug)]
235pub struct TestTimer {
236    /// The name of the timer.
237    pub name: Ustr,
238    /// The interval between timer events in nanoseconds.
239    pub interval_ns: NonZeroU64,
240    /// The start time of the timer in UNIX nanoseconds.
241    pub start_time_ns: UnixNanos,
242    /// The optional stop time of the timer in UNIX nanoseconds.
243    pub stop_time_ns: Option<UnixNanos>,
244    next_time_ns: UnixNanos,
245    is_expired: bool,
246}
247
248impl TestTimer {
249    /// Creates a new [`TestTimer`] instance.
250    ///
251    /// # Panics
252    ///
253    /// This function panics:
254    /// - If `name` is not a valid string.
255    #[must_use]
256    pub fn new(
257        name: &str,
258        interval_ns: NonZeroU64,
259        start_time_ns: UnixNanos,
260        stop_time_ns: Option<UnixNanos>,
261    ) -> Self {
262        check_valid_string(name, stringify!(name)).expect(FAILED);
263
264        Self {
265            name: Ustr::from(name),
266            interval_ns,
267            start_time_ns,
268            stop_time_ns,
269            next_time_ns: start_time_ns + interval_ns.get(),
270            is_expired: false,
271        }
272    }
273
274    /// Returns the next time in UNIX nanoseconds when the timer will fire.
275    #[must_use]
276    pub const fn next_time_ns(&self) -> UnixNanos {
277        self.next_time_ns
278    }
279
280    /// Returns whether the timer is expired.
281    #[must_use]
282    pub const fn is_expired(&self) -> bool {
283        self.is_expired
284    }
285
286    #[must_use]
287    pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
288        TimeEvent {
289            name: self.name,
290            event_id,
291            ts_event: self.next_time_ns,
292            ts_init,
293        }
294    }
295
296    /// Advance the test timer forward to the given time, generating a sequence
297    /// of events. A [`TimeEvent`] is appended for each time a next event is
298    /// <= the given `to_time_ns`.
299    ///
300    /// This allows testing of multiple time intervals within a single step.
301    pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
302        let advances = to_time_ns
303            .saturating_sub(self.next_time_ns.as_u64() - self.interval_ns.get())
304            / self.interval_ns.get();
305        self.take(advances as usize).map(|(event, _)| event)
306    }
307
308    /// Cancels the timer (the timer will not generate an event).
309    ///
310    /// Used to stop the timer before its scheduled stop time.
311    pub const fn cancel(&mut self) {
312        self.is_expired = true;
313    }
314}
315
316impl Iterator for TestTimer {
317    type Item = (TimeEvent, UnixNanos);
318
319    fn next(&mut self) -> Option<Self::Item> {
320        if self.is_expired {
321            None
322        } else {
323            let item = (
324                TimeEvent {
325                    name: self.name,
326                    event_id: UUID4::new(),
327                    ts_event: self.next_time_ns,
328                    ts_init: self.next_time_ns,
329                },
330                self.next_time_ns,
331            );
332
333            // If current next event time has exceeded stop time, then expire timer
334            if let Some(stop_time_ns) = self.stop_time_ns {
335                if self.next_time_ns >= stop_time_ns {
336                    self.is_expired = true;
337                }
338            }
339
340            self.next_time_ns += self.interval_ns;
341
342            Some(item)
343        }
344    }
345}
346
347/// A live timer for use with a `LiveClock`.
348///
349/// `LiveTimer` triggers events at specified intervals in a real-time environment,
350/// using Tokio's async runtime to handle scheduling and execution.
351#[derive(Debug)]
352pub struct LiveTimer {
353    /// The name of the timer.
354    pub name: Ustr,
355    /// The start time of the timer in UNIX nanoseconds.
356    pub interval_ns: NonZeroU64,
357    /// The start time of the timer in UNIX nanoseconds.
358    pub start_time_ns: UnixNanos,
359    /// The optional stop time of the timer in UNIX nanoseconds.
360    pub stop_time_ns: Option<UnixNanos>,
361    next_time_ns: Arc<AtomicU64>,
362    callback: TimeEventCallback,
363    task_handle: Option<JoinHandle<()>>,
364    #[cfg(feature = "clock_v2")]
365    heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
366}
367
368impl LiveTimer {
369    /// Creates a new [`LiveTimer`] instance.
370    ///
371    /// # Panics
372    ///
373    /// This function panics:
374    /// - If `name` is not a valid string.
375    /// - If `interval_ns` is zero.
376    #[must_use]
377    #[cfg(not(feature = "clock_v2"))]
378    pub fn new(
379        name: &str,
380        interval_ns: NonZeroU64,
381        start_time_ns: UnixNanos,
382        stop_time_ns: Option<UnixNanos>,
383        callback: TimeEventCallback,
384    ) -> Self {
385        check_valid_string(name, stringify!(name)).expect(FAILED);
386
387        log::debug!("Creating timer '{name}'");
388        Self {
389            name: Ustr::from(name),
390            interval_ns,
391            start_time_ns,
392            stop_time_ns,
393            next_time_ns: Arc::new(AtomicU64::new(start_time_ns.as_u64() + interval_ns.get())),
394            callback,
395            task_handle: None,
396        }
397    }
398
399    /// Creates a new [`LiveTimer`] instance.
400    ///
401    /// # Panics
402    ///
403    /// This function panics:
404    /// - If `name` is not a valid string.
405    /// - If `interval_ns` is zero.
406    #[must_use]
407    #[cfg(feature = "clock_v2")]
408    pub fn new(
409        name: &str,
410        interval_ns: NonZeroU64,
411        start_time_ns: UnixNanos,
412        stop_time_ns: Option<UnixNanos>,
413        callback: TimeEventCallback,
414        heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
415    ) -> Self {
416        check_valid_string(name, stringify!(name)).expect(FAILED);
417
418        log::debug!("Creating timer '{name}'");
419        Self {
420            name: Ustr::from(name),
421            interval_ns,
422            start_time_ns,
423            stop_time_ns,
424            next_time_ns: Arc::new(AtomicU64::new(start_time_ns.as_u64() + interval_ns.get())),
425            callback,
426            heap,
427            task_handle: None,
428        }
429    }
430
431    /// Returns the next time in UNIX nanoseconds when the timer will fire.
432    ///
433    /// Provides the scheduled time for the next event based on the current state of the timer.
434    #[must_use]
435    pub fn next_time_ns(&self) -> UnixNanos {
436        UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
437    }
438
439    /// Returns whether the timer is expired.
440    ///
441    /// An expired timer will not trigger any further events.
442    /// A timer that has not been started is not expired.
443    #[must_use]
444    pub fn is_expired(&self) -> bool {
445        self.task_handle
446            .as_ref()
447            .is_some_and(tokio::task::JoinHandle::is_finished)
448    }
449
450    /// Starts the timer.
451    ///
452    /// Time events will begin triggering at the specified intervals.
453    /// The generated events are handled by the provided callback function.
454    pub fn start(&mut self) {
455        let event_name = self.name;
456        let stop_time_ns = self.stop_time_ns;
457        let next_time_ns = self.next_time_ns.load(atomic::Ordering::SeqCst);
458        let next_time_atomic = self.next_time_ns.clone();
459        let interval_ns = self.interval_ns.get();
460
461        // Floor the next time to the nearest microsecond which is within the timers accuracy
462        let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(next_time_ns));
463
464        #[cfg(feature = "clock_v2")]
465        let heap = self.heap.clone();
466
467        let callback = self.callback.clone();
468        let rt = get_runtime();
469
470        let handle = rt.spawn(async move {
471            let clock = get_atomic_clock_realtime();
472            let now_ns = clock.get_time_ns();
473
474            // 1-millisecond delay to account for the overhead of initializing a tokio timer
475            let overhead = Duration::from_millis(1);
476            let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
477            let delay = Duration::from_nanos(delay_ns).saturating_sub(overhead);
478            let start = Instant::now() + delay;
479
480            let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
481
482            loop {
483                // SAFETY: `timer.tick` is cancellation safe, if the cancel branch completes
484                // first then no tick has been consumed (no event was ready).
485                timer.tick().await;
486                let now_ns = clock.get_time_ns();
487
488                #[cfg(feature = "python")]
489                {
490                    match callback {
491                        TimeEventCallback::Python(ref callback) => {
492                            call_python_with_time_event(event_name, next_time_ns, now_ns, callback);
493                        }
494                        // Note: Clock v1 style path should not be called with Rust callback
495                        TimeEventCallback::Rust(_) => {}
496                    }
497                }
498
499                #[cfg(feature = "clock_v2")]
500                {
501                    let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
502                    heap.lock().await.push(event);
503                }
504
505                // Prepare next time interval
506                next_time_ns += interval_ns;
507                next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
508
509                // Check if expired
510                if let Some(stop_time_ns) = stop_time_ns {
511                    if std::cmp::max(next_time_ns, now_ns) >= stop_time_ns {
512                        break; // Timer expired
513                    }
514                }
515            }
516        });
517
518        self.task_handle = Some(handle);
519    }
520
521    /// Cancels the timer.
522    ///
523    /// The timer will not generate a final event.
524    pub fn cancel(&mut self) {
525        log::debug!("Cancel timer '{}'", self.name);
526        if let Some(ref handle) = self.task_handle {
527            handle.abort();
528        }
529    }
530}
531
532#[cfg(feature = "python")]
533fn call_python_with_time_event(
534    name: Ustr,
535    ts_event: UnixNanos,
536    ts_init: UnixNanos,
537    callback: &PyObject,
538) {
539    use pyo3::types::PyCapsule;
540
541    Python::with_gil(|py| {
542        // Create new time event
543        let event = TimeEvent::new(name, UUID4::new(), ts_event, ts_init);
544        let capsule: PyObject = PyCapsule::new(py, event, None)
545            .expect("Error creating `PyCapsule`")
546            .into_py_any_unwrap(py);
547
548        match callback.call1(py, (capsule,)) {
549            Ok(_) => {}
550            Err(e) => tracing::error!("Error on callback: {e:?}"),
551        }
552    });
553}
554
555////////////////////////////////////////////////////////////////////////////////
556// Tests
557////////////////////////////////////////////////////////////////////////////////
558#[cfg(test)]
559mod tests {
560    use std::num::NonZeroU64;
561
562    use nautilus_core::UnixNanos;
563    use rstest::*;
564
565    use super::{TestTimer, TimeEvent};
566
567    #[rstest]
568    fn test_test_timer_pop_event() {
569        let mut timer = TestTimer::new(
570            "test_timer",
571            NonZeroU64::new(1).unwrap(),
572            UnixNanos::from(1),
573            None,
574        );
575
576        assert!(timer.next().is_some());
577        assert!(timer.next().is_some());
578        timer.is_expired = true;
579        assert!(timer.next().is_none());
580    }
581
582    #[rstest]
583    fn test_test_timer_advance_within_next_time_ns() {
584        let mut timer = TestTimer::new(
585            "test_timer",
586            NonZeroU64::new(5).unwrap(),
587            UnixNanos::default(),
588            None,
589        );
590        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
591        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
592        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
593        assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
594        assert_eq!(timer.next_time_ns, 5);
595        assert!(!timer.is_expired);
596    }
597
598    #[rstest]
599    fn test_test_timer_advance_up_to_next_time_ns() {
600        let mut timer = TestTimer::new(
601            "test_timer",
602            NonZeroU64::new(1).unwrap(),
603            UnixNanos::default(),
604            None,
605        );
606        assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
607        assert!(!timer.is_expired);
608    }
609
610    #[rstest]
611    fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
612        let mut timer = TestTimer::new(
613            "test_timer",
614            NonZeroU64::new(1).unwrap(),
615            UnixNanos::default(),
616            Some(UnixNanos::from(2)),
617        );
618        assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
619        assert!(timer.is_expired);
620    }
621
622    #[rstest]
623    fn test_test_timer_advance_beyond_next_time_ns() {
624        let mut timer = TestTimer::new(
625            "test_timer",
626            NonZeroU64::new(1).unwrap(),
627            UnixNanos::default(),
628            Some(UnixNanos::from(5)),
629        );
630        assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
631        assert!(timer.is_expired);
632    }
633
634    #[rstest]
635    fn test_test_timer_advance_beyond_stop_time() {
636        let mut timer = TestTimer::new(
637            "test_timer",
638            NonZeroU64::new(1).unwrap(),
639            UnixNanos::default(),
640            Some(UnixNanos::from(5)),
641        );
642        assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
643        assert!(timer.is_expired);
644    }
645
646    #[rstest]
647    fn test_test_timer_advance_exact_boundary() {
648        let mut timer = TestTimer::new(
649            "boundary_timer",
650            NonZeroU64::new(5).unwrap(),
651            UnixNanos::from(0),
652            None,
653        );
654        let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(5)).collect();
655        assert_eq!(events.len(), 1, "Expected one event at the 5 ns boundary");
656
657        let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
658        assert_eq!(events.len(), 1, "Expected one event at the 10 ns boundary");
659    }
660}