nautilus_common/live/
clock.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live clock implementation using Tokio for real-time operations.
17
18use std::{
19    collections::BinaryHeap,
20    ops::Deref,
21    pin::Pin,
22    sync::Arc,
23    task::{Context, Poll},
24};
25
26use ahash::AHashMap;
27use futures::Stream;
28use nautilus_core::{
29    AtomicTime, UnixNanos, correctness::check_predicate_true, time::get_atomic_clock_realtime,
30};
31use ustr::Ustr;
32
33use super::timer::LiveTimer;
34use crate::{
35    clock::{CallbackRegistry, Clock, validate_and_prepare_time_alert, validate_and_prepare_timer},
36    runner::{TimeEventSender, try_get_time_event_sender},
37    timer::{
38        ScheduledTimeEvent, TimeEvent, TimeEventCallback, TimeEventHandlerV2, create_valid_interval,
39    },
40};
41
42/// A real-time clock which uses system time.
43///
44/// Timestamps are guaranteed to be unique and monotonically increasing.
45///
46/// # Threading
47///
48/// The clock holds thread-local runtime state and must remain on its originating thread.
49#[derive(Debug)]
50pub struct LiveClock {
51    time: &'static AtomicTime,
52    timers: AHashMap<Ustr, LiveTimer>,
53    callbacks: CallbackRegistry,
54    sender: Option<Arc<dyn TimeEventSender>>,
55}
56
57impl LiveClock {
58    /// Creates a new [`LiveClock`] instance.
59    #[must_use]
60    pub fn new(sender: Option<Arc<dyn TimeEventSender>>) -> Self {
61        Self {
62            time: get_atomic_clock_realtime(),
63            timers: AHashMap::new(),
64            callbacks: CallbackRegistry::new(),
65            sender,
66        }
67    }
68
69    #[must_use]
70    pub const fn get_timers(&self) -> &AHashMap<Ustr, LiveTimer> {
71        &self.timers
72    }
73
74    fn clear_expired_timers(&mut self) {
75        self.timers.retain(|_, timer| !timer.is_expired());
76    }
77
78    fn replace_existing_timer_if_needed(&mut self, name: &Ustr) {
79        if self.timer_exists(name) {
80            self.cancel_timer(name.as_str());
81            log::warn!("Timer '{name}' replaced");
82        }
83    }
84}
85
86impl Default for LiveClock {
87    /// Creates a new default [`LiveClock`] instance.
88    ///
89    /// Uses `try_get_time_event_sender()` to allow creation before channels are initialized.
90    fn default() -> Self {
91        Self::new(try_get_time_event_sender())
92    }
93}
94
95impl Deref for LiveClock {
96    type Target = AtomicTime;
97
98    fn deref(&self) -> &Self::Target {
99        self.time
100    }
101}
102
103impl Clock for LiveClock {
104    fn timestamp_ns(&self) -> UnixNanos {
105        self.time.get_time_ns()
106    }
107
108    fn timestamp_us(&self) -> u64 {
109        self.time.get_time_us()
110    }
111
112    fn timestamp_ms(&self) -> u64 {
113        self.time.get_time_ms()
114    }
115
116    fn timestamp(&self) -> f64 {
117        self.time.get_time()
118    }
119
120    fn timer_names(&self) -> Vec<&str> {
121        self.timers
122            .iter()
123            .filter(|(_, timer)| !timer.is_expired())
124            .map(|(k, _)| k.as_str())
125            .collect()
126    }
127
128    fn timer_count(&self) -> usize {
129        self.timers
130            .iter()
131            .filter(|(_, timer)| !timer.is_expired())
132            .count()
133    }
134
135    fn timer_exists(&self, name: &Ustr) -> bool {
136        self.timers.contains_key(name)
137    }
138
139    fn register_default_handler(&mut self, handler: TimeEventCallback) {
140        self.callbacks.register_default_handler(handler);
141    }
142
143    /// # Panics
144    ///
145    /// This function panics if:
146    /// - The event does not have an associated handler (see trait documentation).
147    #[allow(unused_variables)]
148    fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
149        self.callbacks.get_handler(event)
150    }
151
152    fn set_time_alert_ns(
153        &mut self,
154        name: &str,
155        alert_time_ns: UnixNanos,
156        callback: Option<TimeEventCallback>,
157        allow_past: Option<bool>,
158    ) -> anyhow::Result<()> {
159        let ts_now = self.get_time_ns();
160        let (name, alert_time_ns) =
161            validate_and_prepare_time_alert(name, alert_time_ns, allow_past, ts_now)?;
162
163        self.replace_existing_timer_if_needed(&name);
164
165        check_predicate_true(
166            callback.is_some() | self.callbacks.has_any_callback(&name),
167            "No callbacks provided",
168        )?;
169
170        let callback = if let Some(callback) = callback {
171            self.callbacks.register_callback(name, callback.clone());
172            callback
173        } else {
174            self.callbacks
175                .get_callback(&name)
176                .expect("Callback should exist")
177        };
178
179        // Safe to calculate interval now that we've ensured alert_time_ns >= ts_now
180        let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
181
182        let mut timer = LiveTimer::new(
183            name,
184            interval_ns,
185            ts_now,
186            Some(alert_time_ns),
187            callback,
188            false,
189            self.sender.clone(),
190        );
191
192        timer.start();
193
194        self.clear_expired_timers();
195        self.timers.insert(name, timer);
196
197        Ok(())
198    }
199
200    fn set_timer_ns(
201        &mut self,
202        name: &str,
203        interval_ns: u64,
204        start_time_ns: Option<UnixNanos>,
205        stop_time_ns: Option<UnixNanos>,
206        callback: Option<TimeEventCallback>,
207        allow_past: Option<bool>,
208        fire_immediately: Option<bool>,
209    ) -> anyhow::Result<()> {
210        let ts_now = self.get_time_ns();
211        let (name, start_time_ns, stop_time_ns, _allow_past, fire_immediately) =
212            validate_and_prepare_timer(
213                name,
214                interval_ns,
215                start_time_ns,
216                stop_time_ns,
217                allow_past,
218                fire_immediately,
219                ts_now,
220            )?;
221
222        check_predicate_true(
223            callback.is_some() | self.callbacks.has_any_callback(&name),
224            "No callbacks provided",
225        )?;
226
227        self.replace_existing_timer_if_needed(&name);
228
229        let callback = if let Some(callback) = callback {
230            self.callbacks.register_callback(name, callback.clone());
231            callback
232        } else {
233            self.callbacks
234                .get_callback(&name)
235                .expect("Callback should exist")
236        };
237
238        let interval_ns = create_valid_interval(interval_ns);
239
240        let mut timer = LiveTimer::new(
241            name,
242            interval_ns,
243            start_time_ns,
244            stop_time_ns,
245            callback,
246            fire_immediately,
247            self.sender.clone(),
248        );
249        timer.start();
250
251        self.clear_expired_timers();
252        self.timers.insert(name, timer);
253
254        Ok(())
255    }
256
257    fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
258        self.timers
259            .get(&Ustr::from(name))
260            .map(|timer| timer.next_time_ns())
261    }
262
263    fn cancel_timer(&mut self, name: &str) {
264        let timer = self.timers.remove(&Ustr::from(name));
265        if let Some(mut timer) = timer {
266            timer.cancel();
267        }
268    }
269
270    fn cancel_timers(&mut self) {
271        for timer in &mut self.timers.values_mut() {
272            timer.cancel();
273        }
274
275        self.timers.clear();
276    }
277
278    fn reset(&mut self) {
279        self.cancel_timers();
280        self.callbacks.clear();
281    }
282}
283
284// Helper struct to stream events from the heap
285#[derive(Debug)]
286pub struct TimeEventStream {
287    heap: Arc<tokio::sync::Mutex<BinaryHeap<ScheduledTimeEvent>>>,
288}
289
290impl TimeEventStream {
291    pub const fn new(heap: Arc<tokio::sync::Mutex<BinaryHeap<ScheduledTimeEvent>>>) -> Self {
292        Self { heap }
293    }
294}
295
296impl Stream for TimeEventStream {
297    type Item = TimeEvent;
298
299    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
300        let mut heap = match self.heap.try_lock() {
301            Ok(guard) => guard,
302            Err(e) => {
303                tracing::error!("Unable to get LiveClock heap lock: {e}");
304                cx.waker().wake_by_ref();
305                return Poll::Pending;
306            }
307        };
308
309        if let Some(event) = heap.pop() {
310            Poll::Ready(Some(event.into_inner()))
311        } else {
312            cx.waker().wake_by_ref();
313            Poll::Pending
314        }
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use std::{
321        sync::{Arc, Mutex},
322        time::Duration,
323    };
324
325    use nautilus_core::{MUTEX_POISONED, UnixNanos, time::get_atomic_clock_realtime};
326    use rstest::rstest;
327    use ustr::Ustr;
328
329    use super::*;
330    use crate::{
331        clock::Clock,
332        runner::TimeEventSender,
333        testing::wait_until,
334        timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2},
335    };
336
337    #[derive(Debug)]
338    struct CollectingSender {
339        events: Arc<Mutex<Vec<(TimeEvent, UnixNanos)>>>,
340    }
341
342    impl CollectingSender {
343        fn new(events: Arc<Mutex<Vec<(TimeEvent, UnixNanos)>>>) -> Self {
344            Self { events }
345        }
346    }
347
348    impl TimeEventSender for CollectingSender {
349        fn send(&self, handler: TimeEventHandlerV2) {
350            let TimeEventHandlerV2 { event, callback } = handler;
351            let now_ns = get_atomic_clock_realtime().get_time_ns();
352            let event_clone = event.clone();
353            callback.call(event);
354            self.events
355                .lock()
356                .expect(MUTEX_POISONED)
357                .push((event_clone, now_ns));
358        }
359    }
360
361    fn wait_for_events(
362        events: &Arc<Mutex<Vec<(TimeEvent, UnixNanos)>>>,
363        target: usize,
364        timeout: Duration,
365    ) {
366        wait_until(
367            || events.lock().expect(MUTEX_POISONED).len() >= target,
368            timeout,
369        );
370    }
371
372    #[rstest]
373    fn test_live_clock_timer_replacement_cancels_previous_task() {
374        let events = Arc::new(Mutex::new(Vec::new()));
375        let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
376
377        let mut clock = LiveClock::new(Some(sender));
378        clock.register_default_handler(TimeEventCallback::from(|_| {}));
379
380        let fast_interval = Duration::from_millis(10).as_nanos() as u64;
381        clock
382            .set_timer_ns("replace", fast_interval, None, None, None, None, None)
383            .unwrap();
384
385        wait_for_events(&events, 2, Duration::from_millis(200));
386        events.lock().expect(MUTEX_POISONED).clear();
387
388        let slow_interval = Duration::from_millis(30).as_nanos() as u64;
389        clock
390            .set_timer_ns("replace", slow_interval, None, None, None, None, None)
391            .unwrap();
392
393        wait_for_events(&events, 3, Duration::from_millis(300));
394
395        let snapshot = events.lock().expect(MUTEX_POISONED).clone();
396        let diffs: Vec<u64> = snapshot
397            .windows(2)
398            .map(|pair| pair[1].0.ts_event.as_u64() - pair[0].0.ts_event.as_u64())
399            .collect();
400
401        assert!(!diffs.is_empty());
402        for diff in diffs {
403            assert_ne!(diff, fast_interval);
404        }
405
406        clock.cancel_timers();
407    }
408
409    #[rstest]
410    fn test_live_clock_time_alert_persists_callback() {
411        let events = Arc::new(Mutex::new(Vec::new()));
412        let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
413
414        let mut clock = LiveClock::new(Some(sender));
415        clock.register_default_handler(TimeEventCallback::from(|_| {}));
416
417        let now = clock.timestamp_ns();
418        let alert_time = now + 1_000_u64;
419
420        clock
421            .set_time_alert_ns("alert-callback", alert_time, None, None)
422            .unwrap();
423
424        assert!(
425            clock
426                .callbacks
427                .has_any_callback(&Ustr::from("alert-callback"))
428        );
429
430        clock.cancel_timers();
431    }
432
433    #[rstest]
434    fn test_live_clock_reset_stops_active_timers() {
435        let events = Arc::new(Mutex::new(Vec::new()));
436        let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
437
438        let mut clock = LiveClock::new(Some(sender));
439        clock.register_default_handler(TimeEventCallback::from(|_| {}));
440
441        clock
442            .set_timer_ns(
443                "reset-test",
444                Duration::from_millis(15).as_nanos() as u64,
445                None,
446                None,
447                None,
448                None,
449                None,
450            )
451            .unwrap();
452
453        wait_for_events(&events, 2, Duration::from_millis(250));
454
455        clock.reset();
456
457        // Wait for any in-flight events to arrive
458        let start = std::time::Instant::now();
459        wait_until(
460            || start.elapsed() >= Duration::from_millis(50),
461            Duration::from_secs(2),
462        );
463
464        // Clear any events that arrived before reset took effect
465        events.lock().expect(MUTEX_POISONED).clear();
466
467        // Verify no new events arrive (timer should be stopped)
468        let start = std::time::Instant::now();
469        wait_until(
470            || start.elapsed() >= Duration::from_millis(50),
471            Duration::from_secs(2),
472        );
473        assert!(events.lock().expect(MUTEX_POISONED).is_empty());
474    }
475
476    #[rstest]
477    fn test_live_timer_short_delay_not_early() {
478        let events = Arc::new(Mutex::new(Vec::new()));
479        let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
480
481        let mut clock = LiveClock::new(Some(sender));
482        clock.register_default_handler(TimeEventCallback::from(|_| {}));
483
484        let now = clock.timestamp_ns();
485        let start_time = UnixNanos::from(*now + 500_000); // 0.5 ms in the future
486        let interval_ns = 1_000_000;
487
488        clock
489            .set_timer_ns(
490                "short-delay",
491                interval_ns,
492                Some(start_time),
493                None,
494                None,
495                None,
496                Some(true),
497            )
498            .unwrap();
499
500        wait_for_events(&events, 1, Duration::from_millis(100));
501
502        let snapshot = events.lock().expect(MUTEX_POISONED).clone();
503        assert!(!snapshot.is_empty());
504
505        for (event, actual_ts) in &snapshot {
506            assert!(actual_ts.as_u64() >= event.ts_event.as_u64());
507        }
508
509        clock.cancel_timers();
510    }
511}