nautilus_common/
timer.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Real-time and test timers for use with `Clock` implementations.
17
18#[rustfmt::skip]
19#[cfg(feature = "clock_v2")]
20use std::collections::BinaryHeap;
21
22#[rustfmt::skip]
23#[cfg(feature = "clock_v2")]
24use tokio::sync::Mutex;
25
26use std::{
27    cmp::Ordering,
28    fmt::{Debug, Display},
29    num::NonZeroU64,
30    rc::Rc,
31    sync::{
32        Arc,
33        atomic::{self, AtomicU64},
34    },
35};
36
37use nautilus_core::{
38    UUID4, UnixNanos,
39    correctness::{FAILED, check_valid_string},
40    datetime::floor_to_nearest_microsecond,
41    time::get_atomic_clock_realtime,
42};
43#[cfg(feature = "python")]
44use pyo3::{PyObject, Python};
45use tokio::{
46    task::JoinHandle,
47    time::{Duration, Instant},
48};
49use ustr::Ustr;
50
51use crate::runtime::get_runtime;
52
53/// Creates a valid nanoseconds interval that is guaranteed to be positive.
54///
55/// # Panics
56///
57/// This function panics if `interval_ns` is zero.
58#[must_use]
59pub const fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
60    NonZeroU64::new(interval_ns).expect("`interval_ns` must be positive")
61}
62
63#[repr(C)]
64#[derive(Clone, Debug)]
65#[cfg_attr(
66    feature = "python",
67    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
68)]
69/// Represents a time event occurring at the event timestamp.
70///
71/// A `TimeEvent` carries metadata such as the event's name, a unique event ID,
72/// and timestamps indicating when the event was scheduled to occur and when it was initialized.
73#[derive(Eq)]
74pub struct TimeEvent {
75    /// The event name, identifying the nature or purpose of the event.
76    pub name: Ustr,
77    /// The unique identifier for the event.
78    pub event_id: UUID4,
79    /// UNIX timestamp (nanoseconds) when the event occurred.
80    pub ts_event: UnixNanos,
81    /// UNIX timestamp (nanoseconds) when the instance was initialized.
82    pub ts_init: UnixNanos,
83}
84
85/// Reverse order for `TimeEvent` comparison to be used in max heap.
86impl PartialOrd for TimeEvent {
87    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
88        Some(self.cmp(other))
89    }
90}
91
92/// Reverse order for `TimeEvent` comparison to be used in max heap.
93impl Ord for TimeEvent {
94    fn cmp(&self, other: &Self) -> Ordering {
95        other.ts_event.cmp(&self.ts_event)
96    }
97}
98
99impl TimeEvent {
100    /// Creates a new [`TimeEvent`] instance.
101    ///
102    /// # Safety
103    ///
104    /// - Assumes `name` is a valid string.
105    #[must_use]
106    pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
107        Self {
108            name,
109            event_id,
110            ts_event,
111            ts_init,
112        }
113    }
114}
115
116impl Display for TimeEvent {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        write!(
119            f,
120            "TimeEvent(name={}, event_id={}, ts_event={}, ts_init={})",
121            self.name, self.event_id, self.ts_event, self.ts_init
122        )
123    }
124}
125
126impl PartialEq for TimeEvent {
127    fn eq(&self, other: &Self) -> bool {
128        self.event_id == other.event_id
129    }
130}
131
132pub type RustTimeEventCallback = dyn Fn(TimeEvent);
133
134#[derive(Clone)]
135pub enum TimeEventCallback {
136    #[cfg(feature = "python")]
137    Python(Arc<PyObject>),
138    Rust(Rc<RustTimeEventCallback>),
139}
140
141impl Debug for TimeEventCallback {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        match self {
144            #[cfg(feature = "python")]
145            Self::Python(_) => f.write_str("Python callback"),
146            Self::Rust(_) => f.write_str("Rust callback"),
147        }
148    }
149}
150
151impl TimeEventCallback {
152    pub fn call(&self, event: TimeEvent) {
153        match self {
154            #[cfg(feature = "python")]
155            Self::Python(callback) => {
156                Python::with_gil(|py| {
157                    callback.call1(py, (event,)).unwrap();
158                });
159            }
160            Self::Rust(callback) => callback(event),
161        }
162    }
163}
164
165impl From<Rc<RustTimeEventCallback>> for TimeEventCallback {
166    fn from(value: Rc<RustTimeEventCallback>) -> Self {
167        Self::Rust(value)
168    }
169}
170
171#[cfg(feature = "python")]
172impl From<PyObject> for TimeEventCallback {
173    fn from(value: PyObject) -> Self {
174        Self::Python(Arc::new(value))
175    }
176}
177
178// SAFETY: Message handlers cannot be sent across thread boundaries
179unsafe impl Send for TimeEventCallback {}
180unsafe impl Sync for TimeEventCallback {}
181
182#[repr(C)]
183#[derive(Clone, Debug)]
184/// Represents a time event and its associated handler.
185///
186/// `TimeEventHandler` associates a `TimeEvent` with a callback function that is triggered
187/// when the event's timestamp is reached.
188pub struct TimeEventHandlerV2 {
189    /// The time event.
190    pub event: TimeEvent,
191    /// The callable handler for the event.
192    pub callback: TimeEventCallback,
193}
194
195impl TimeEventHandlerV2 {
196    /// Creates a new [`TimeEventHandlerV2`] instance.
197    #[must_use]
198    pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
199        Self { event, callback }
200    }
201
202    pub fn run(self) {
203        let Self { event, callback } = self;
204        callback.call(event);
205    }
206}
207
208impl PartialOrd for TimeEventHandlerV2 {
209    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
210        Some(self.cmp(other))
211    }
212}
213
214impl PartialEq for TimeEventHandlerV2 {
215    fn eq(&self, other: &Self) -> bool {
216        self.event.ts_event == other.event.ts_event
217    }
218}
219
220impl Eq for TimeEventHandlerV2 {}
221
222impl Ord for TimeEventHandlerV2 {
223    fn cmp(&self, other: &Self) -> Ordering {
224        self.event.ts_event.cmp(&other.event.ts_event)
225    }
226}
227
228/// A test timer for user with a `TestClock`.
229///
230/// `TestTimer` simulates time progression in a controlled environment,
231/// allowing for precise control over event generation in test scenarios.
232#[derive(Clone, Copy, Debug)]
233pub struct TestTimer {
234    /// The name of the timer.
235    pub name: Ustr,
236    /// The interval between timer events in nanoseconds.
237    pub interval_ns: NonZeroU64,
238    /// The start time of the timer in UNIX nanoseconds.
239    pub start_time_ns: UnixNanos,
240    /// The optional stop time of the timer in UNIX nanoseconds.
241    pub stop_time_ns: Option<UnixNanos>,
242    next_time_ns: UnixNanos,
243    is_expired: bool,
244}
245
246impl TestTimer {
247    /// Creates a new [`TestTimer`] instance.
248    ///
249    /// # Panics
250    ///
251    /// This function panics if `name` is not a valid string.
252    #[must_use]
253    pub fn new(
254        name: Ustr,
255        interval_ns: NonZeroU64,
256        start_time_ns: UnixNanos,
257        stop_time_ns: Option<UnixNanos>,
258    ) -> Self {
259        check_valid_string(name, stringify!(name)).expect(FAILED);
260
261        Self {
262            name,
263            interval_ns,
264            start_time_ns,
265            stop_time_ns,
266            next_time_ns: start_time_ns + interval_ns.get(),
267            is_expired: false,
268        }
269    }
270
271    /// Returns the next time in UNIX nanoseconds when the timer will fire.
272    #[must_use]
273    pub const fn next_time_ns(&self) -> UnixNanos {
274        self.next_time_ns
275    }
276
277    /// Returns whether the timer is expired.
278    #[must_use]
279    pub const fn is_expired(&self) -> bool {
280        self.is_expired
281    }
282
283    #[must_use]
284    pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
285        TimeEvent {
286            name: self.name,
287            event_id,
288            ts_event: self.next_time_ns,
289            ts_init,
290        }
291    }
292
293    /// Advance the test timer forward to the given time, generating a sequence
294    /// of events. A [`TimeEvent`] is appended for each time a next event is
295    /// <= the given `to_time_ns`.
296    ///
297    /// This allows testing of multiple time intervals within a single step.
298    pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
299        let advances = to_time_ns
300            .saturating_sub(self.next_time_ns.as_u64() - self.interval_ns.get())
301            / self.interval_ns.get();
302        self.take(advances as usize).map(|(event, _)| event)
303    }
304
305    /// Cancels the timer (the timer will not generate an event).
306    ///
307    /// Used to stop the timer before its scheduled stop time.
308    pub const fn cancel(&mut self) {
309        self.is_expired = true;
310    }
311}
312
313impl Iterator for TestTimer {
314    type Item = (TimeEvent, UnixNanos);
315
316    fn next(&mut self) -> Option<Self::Item> {
317        if self.is_expired {
318            None
319        } else {
320            let item = (
321                TimeEvent {
322                    name: self.name,
323                    event_id: UUID4::new(),
324                    ts_event: self.next_time_ns,
325                    ts_init: self.next_time_ns,
326                },
327                self.next_time_ns,
328            );
329
330            // If current next event time has exceeded stop time, then expire timer
331            if let Some(stop_time_ns) = self.stop_time_ns {
332                if self.next_time_ns >= stop_time_ns {
333                    self.is_expired = true;
334                }
335            }
336
337            self.next_time_ns += self.interval_ns;
338
339            Some(item)
340        }
341    }
342}
343
344/// A live timer for use with a `LiveClock`.
345///
346/// `LiveTimer` triggers events at specified intervals in a real-time environment,
347/// using Tokio's async runtime to handle scheduling and execution.
348#[derive(Debug)]
349pub struct LiveTimer {
350    /// The name of the timer.
351    pub name: Ustr,
352    /// The start time of the timer in UNIX nanoseconds.
353    pub interval_ns: NonZeroU64,
354    /// The start time of the timer in UNIX nanoseconds.
355    pub start_time_ns: UnixNanos,
356    /// The optional stop time of the timer in UNIX nanoseconds.
357    pub stop_time_ns: Option<UnixNanos>,
358    next_time_ns: Arc<AtomicU64>,
359    callback: TimeEventCallback,
360    task_handle: Option<JoinHandle<()>>,
361    #[cfg(feature = "clock_v2")]
362    heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
363}
364
365impl LiveTimer {
366    /// Creates a new [`LiveTimer`] instance.
367    ///
368    /// # Panics
369    ///
370    /// This function panics if `name` is not a valid string.
371    #[must_use]
372    #[cfg(not(feature = "clock_v2"))]
373    pub fn new(
374        name: Ustr,
375        interval_ns: NonZeroU64,
376        start_time_ns: UnixNanos,
377        stop_time_ns: Option<UnixNanos>,
378        callback: TimeEventCallback,
379    ) -> Self {
380        check_valid_string(name, stringify!(name)).expect(FAILED);
381
382        log::debug!("Creating timer '{name}'");
383        Self {
384            name,
385            interval_ns,
386            start_time_ns,
387            stop_time_ns,
388            next_time_ns: Arc::new(AtomicU64::new(start_time_ns.as_u64() + interval_ns.get())),
389            callback,
390            task_handle: None,
391        }
392    }
393
394    /// Creates a new [`LiveTimer`] instance.
395    ///
396    /// # Panics
397    ///
398    /// This function panics if `name` is not a valid string.
399    #[must_use]
400    #[cfg(feature = "clock_v2")]
401    pub fn new(
402        name: Ustr,
403        interval_ns: NonZeroU64,
404        start_time_ns: UnixNanos,
405        stop_time_ns: Option<UnixNanos>,
406        callback: TimeEventCallback,
407        heap: Arc<Mutex<BinaryHeap<TimeEvent>>>,
408    ) -> Self {
409        check_valid_string(name, stringify!(name)).expect(FAILED);
410
411        log::debug!("Creating timer '{name}'");
412        Self {
413            name,
414            interval_ns,
415            start_time_ns,
416            stop_time_ns,
417            next_time_ns: Arc::new(AtomicU64::new(start_time_ns.as_u64() + interval_ns.get())),
418            callback,
419            heap,
420            task_handle: None,
421        }
422    }
423
424    /// Returns the next time in UNIX nanoseconds when the timer will fire.
425    ///
426    /// Provides the scheduled time for the next event based on the current state of the timer.
427    #[must_use]
428    pub fn next_time_ns(&self) -> UnixNanos {
429        UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
430    }
431
432    /// Returns whether the timer is expired.
433    ///
434    /// An expired timer will not trigger any further events.
435    /// A timer that has not been started is not expired.
436    #[must_use]
437    pub fn is_expired(&self) -> bool {
438        self.task_handle
439            .as_ref()
440            .is_some_and(tokio::task::JoinHandle::is_finished)
441    }
442
443    /// Starts the timer.
444    ///
445    /// Time events will begin triggering at the specified intervals.
446    /// The generated events are handled by the provided callback function.
447    #[allow(unused_variables)] // callback is used
448    pub fn start(&mut self) {
449        let event_name = self.name;
450        let stop_time_ns = self.stop_time_ns;
451        let interval_ns = self.interval_ns.get();
452        let callback = self.callback.clone();
453
454        // Get current time
455        let clock = get_atomic_clock_realtime();
456        let now_ns = clock.get_time_ns();
457
458        // Check if the timer's alert time is in the past and adjust if needed
459        let mut next_time_ns = self.next_time_ns.load(atomic::Ordering::SeqCst);
460        if next_time_ns <= now_ns {
461            log::warn!(
462                "Timer '{}' alert time {} was in the past, adjusted to current time for immediate fire",
463                event_name,
464                next_time_ns,
465            );
466            next_time_ns = now_ns.into();
467            self.next_time_ns
468                .store(now_ns.as_u64(), atomic::Ordering::SeqCst);
469        }
470
471        // Floor the next time to the nearest microsecond which is within the timers accuracy
472        let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(next_time_ns));
473        let next_time_atomic = self.next_time_ns.clone();
474
475        #[cfg(feature = "clock_v2")]
476        let heap = self.heap.clone();
477
478        let rt = get_runtime();
479        let handle = rt.spawn(async move {
480            let clock = get_atomic_clock_realtime();
481
482            // 1-millisecond delay to account for the overhead of initializing a tokio timer
483            let overhead = Duration::from_millis(1);
484            let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
485            let delay = Duration::from_nanos(delay_ns).saturating_sub(overhead);
486            let start = Instant::now() + delay;
487
488            let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
489
490            loop {
491                // SAFETY: `timer.tick` is cancellation safe, if the cancel branch completes
492                // first then no tick has been consumed (no event was ready).
493                timer.tick().await;
494                let now_ns = clock.get_time_ns();
495
496                #[cfg(feature = "python")]
497                {
498                    match callback {
499                        TimeEventCallback::Python(ref callback) => {
500                            call_python_with_time_event(event_name, next_time_ns, now_ns, callback);
501                        }
502                        // Note: Clock v1 style path should not be called with Rust callback
503                        TimeEventCallback::Rust(_) => {}
504                    }
505                }
506
507                #[cfg(feature = "clock_v2")]
508                {
509                    let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
510                    heap.lock().await.push(event);
511                }
512
513                // Prepare next time interval
514                next_time_ns += interval_ns;
515                next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
516
517                // Check if expired
518                if let Some(stop_time_ns) = stop_time_ns {
519                    if std::cmp::max(next_time_ns, now_ns) >= stop_time_ns {
520                        break; // Timer expired
521                    }
522                }
523            }
524        });
525
526        self.task_handle = Some(handle);
527    }
528
529    /// Cancels the timer.
530    ///
531    /// The timer will not generate a final event.
532    pub fn cancel(&mut self) {
533        log::debug!("Cancel timer '{}'", self.name);
534        if let Some(ref handle) = self.task_handle {
535            handle.abort();
536        }
537    }
538}
539
540#[cfg(feature = "python")]
541fn call_python_with_time_event(
542    name: Ustr,
543    ts_event: UnixNanos,
544    ts_init: UnixNanos,
545    callback: &PyObject,
546) {
547    use nautilus_core::python::IntoPyObjectNautilusExt;
548    use pyo3::types::PyCapsule;
549
550    Python::with_gil(|py| {
551        // Create new time event
552        let event = TimeEvent::new(name, UUID4::new(), ts_event, ts_init);
553        let capsule: PyObject = PyCapsule::new(py, event, None)
554            .expect("Error creating `PyCapsule`")
555            .into_py_any_unwrap(py);
556
557        match callback.call1(py, (capsule,)) {
558            Ok(_) => {}
559            Err(e) => tracing::error!("Error on callback: {e:?}"),
560        }
561    });
562}
563
564////////////////////////////////////////////////////////////////////////////////
565// Tests
566////////////////////////////////////////////////////////////////////////////////
567#[cfg(test)]
568mod tests {
569    use std::num::NonZeroU64;
570
571    use nautilus_core::UnixNanos;
572    use rstest::*;
573    use ustr::Ustr;
574
575    use super::{TestTimer, TimeEvent};
576
577    #[rstest]
578    fn test_test_timer_pop_event() {
579        let mut timer = TestTimer::new(
580            Ustr::from("TEST_TIMER"),
581            NonZeroU64::new(1).unwrap(),
582            UnixNanos::from(1),
583            None,
584        );
585
586        assert!(timer.next().is_some());
587        assert!(timer.next().is_some());
588        timer.is_expired = true;
589        assert!(timer.next().is_none());
590    }
591
592    #[rstest]
593    fn test_test_timer_advance_within_next_time_ns() {
594        let mut timer = TestTimer::new(
595            Ustr::from("TEST_TIMER"),
596            NonZeroU64::new(5).unwrap(),
597            UnixNanos::default(),
598            None,
599        );
600        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
601        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
602        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
603        assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
604        assert_eq!(timer.next_time_ns, 5);
605        assert!(!timer.is_expired);
606    }
607
608    #[rstest]
609    fn test_test_timer_advance_up_to_next_time_ns() {
610        let mut timer = TestTimer::new(
611            Ustr::from("TEST_TIMER"),
612            NonZeroU64::new(1).unwrap(),
613            UnixNanos::default(),
614            None,
615        );
616        assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
617        assert!(!timer.is_expired);
618    }
619
620    #[rstest]
621    fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
622        let mut timer = TestTimer::new(
623            Ustr::from("TEST_TIMER"),
624            NonZeroU64::new(1).unwrap(),
625            UnixNanos::default(),
626            Some(UnixNanos::from(2)),
627        );
628        assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
629        assert!(timer.is_expired);
630    }
631
632    #[rstest]
633    fn test_test_timer_advance_beyond_next_time_ns() {
634        let mut timer = TestTimer::new(
635            Ustr::from("TEST_TIMER"),
636            NonZeroU64::new(1).unwrap(),
637            UnixNanos::default(),
638            Some(UnixNanos::from(5)),
639        );
640        assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
641        assert!(timer.is_expired);
642    }
643
644    #[rstest]
645    fn test_test_timer_advance_beyond_stop_time() {
646        let mut timer = TestTimer::new(
647            Ustr::from("TEST_TIMER"),
648            NonZeroU64::new(1).unwrap(),
649            UnixNanos::default(),
650            Some(UnixNanos::from(5)),
651        );
652        assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
653        assert!(timer.is_expired);
654    }
655
656    #[rstest]
657    fn test_test_timer_advance_exact_boundary() {
658        let mut timer = TestTimer::new(
659            Ustr::from("TEST_TIMER"),
660            NonZeroU64::new(5).unwrap(),
661            UnixNanos::from(0),
662            None,
663        );
664        let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(5)).collect();
665        assert_eq!(events.len(), 1, "Expected one event at the 5 ns boundary");
666
667        let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
668        assert_eq!(events.len(), 1, "Expected one event at the 10 ns boundary");
669    }
670}