nautilus_common/live/
timer.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live timer implementation using Tokio for real-time scheduling.
17
18use std::{
19    num::NonZeroU64,
20    sync::{
21        Arc,
22        atomic::{self, AtomicU64},
23    },
24};
25
26use nautilus_core::{
27    UUID4, UnixNanos,
28    correctness::{FAILED, check_valid_string_utf8},
29    datetime::floor_to_nearest_microsecond,
30    time::get_atomic_clock_realtime,
31};
32#[cfg(feature = "python")]
33use pyo3::{Py, PyAny, Python};
34use tokio::{
35    task::JoinHandle,
36    time::{Duration, Instant},
37};
38use ustr::Ustr;
39
40use super::runtime::get_runtime;
41use crate::{
42    runner::TimeEventSender,
43    timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2},
44};
45
46/// A live timer for use with a `LiveClock`.
47///
48/// `LiveTimer` triggers events at specified intervals in a real-time environment,
49/// using Tokio's async runtime to handle scheduling and execution.
50///
51/// # Threading
52///
53/// The timer runs on the runtime thread that created it and dispatches events across threads as needed.
54#[derive(Debug)]
55pub struct LiveTimer {
56    /// The name of the timer.
57    pub name: Ustr,
58    /// The start time of the timer in UNIX nanoseconds.
59    pub interval_ns: NonZeroU64,
60    /// The start time of the timer in UNIX nanoseconds.
61    pub start_time_ns: UnixNanos,
62    /// The optional stop time of the timer in UNIX nanoseconds.
63    pub stop_time_ns: Option<UnixNanos>,
64    /// If the timer should fire immediately at start time.
65    pub fire_immediately: bool,
66    next_time_ns: Arc<AtomicU64>,
67    callback: TimeEventCallback,
68    task_handle: Option<JoinHandle<()>>,
69    sender: Option<Arc<dyn TimeEventSender>>,
70}
71
72impl LiveTimer {
73    /// Creates a new [`LiveTimer`] instance.
74    ///
75    /// # Panics
76    ///
77    /// Panics if `name` is not a valid string.
78    #[allow(clippy::too_many_arguments)]
79    #[must_use]
80    pub fn new(
81        name: Ustr,
82        interval_ns: NonZeroU64,
83        start_time_ns: UnixNanos,
84        stop_time_ns: Option<UnixNanos>,
85        callback: TimeEventCallback,
86        fire_immediately: bool,
87        sender: Option<Arc<dyn TimeEventSender>>,
88    ) -> Self {
89        check_valid_string_utf8(name, stringify!(name)).expect(FAILED);
90
91        let next_time_ns = if fire_immediately {
92            start_time_ns.as_u64()
93        } else {
94            start_time_ns.as_u64() + interval_ns.get()
95        };
96
97        log::debug!("Creating timer '{name}'");
98
99        Self {
100            name,
101            interval_ns,
102            start_time_ns,
103            stop_time_ns,
104            fire_immediately,
105            next_time_ns: Arc::new(AtomicU64::new(next_time_ns)),
106            callback,
107            task_handle: None,
108            sender,
109        }
110    }
111
112    /// Returns the next time in UNIX nanoseconds when the timer will fire.
113    ///
114    /// Provides the scheduled time for the next event based on the current state of the timer.
115    #[must_use]
116    pub fn next_time_ns(&self) -> UnixNanos {
117        UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
118    }
119
120    /// Returns whether the timer is expired.
121    ///
122    /// An expired timer will not trigger any further events.
123    /// A timer that has not been started is not expired.
124    #[must_use]
125    pub fn is_expired(&self) -> bool {
126        self.task_handle
127            .as_ref()
128            .is_some_and(tokio::task::JoinHandle::is_finished)
129    }
130
131    /// Starts the timer.
132    ///
133    /// Time events will begin triggering at the specified intervals.
134    /// The generated events are handled by the provided callback function.
135    ///
136    /// # Panics
137    ///
138    /// - Panics if a Rust callback is used. Rust callbacks use `Rc` internally which is not
139    ///   thread-safe. Use Python callbacks for live/async contexts, or use `TestClock` for
140    ///   Rust callbacks in single-threaded backtesting.
141    /// - Panics if Rust-based callback system is active and no time event sender has been set.
142    #[allow(unused_variables)]
143    pub fn start(&mut self) {
144        // SAFETY: Rust callbacks use Rc which is not Send/Sync. They cannot be safely
145        // moved into async tasks. This check enforces the invariant documented in
146        // TimeEventCallback's unsafe Send/Sync implementations.
147        // In tests, we allow Rust callbacks since the test environment is controlled.
148        #[cfg(not(test))]
149        assert!(
150            !self.callback.is_rust(),
151            "LiveTimer cannot use Rust callbacks (they are not thread-safe). \
152             Use Python callbacks for live trading, or TestClock for backtesting."
153        );
154
155        let event_name = self.name;
156        let stop_time_ns = self.stop_time_ns;
157        let interval_ns = self.interval_ns.get();
158        let callback = self.callback.clone();
159
160        // Get current time
161        let clock = get_atomic_clock_realtime();
162        let now_ns = clock.get_time_ns();
163
164        // Check if the timer's alert time is in the past and adjust if needed
165        let now_raw = now_ns.as_u64();
166        let mut observed_next = self.next_time_ns.load(atomic::Ordering::SeqCst);
167
168        if observed_next <= now_raw {
169            loop {
170                match self.next_time_ns.compare_exchange(
171                    observed_next,
172                    now_raw,
173                    atomic::Ordering::SeqCst,
174                    atomic::Ordering::SeqCst,
175                ) {
176                    Ok(_) => {
177                        if observed_next < now_raw {
178                            let original = UnixNanos::from(observed_next);
179                            log::warn!(
180                                "Timer '{event_name}' alert time {} was in the past, adjusted to current time for immediate fire",
181                                original.to_rfc3339(),
182                            );
183                        }
184                        observed_next = now_raw;
185                        break;
186                    }
187                    Err(actual) => {
188                        observed_next = actual;
189                        if observed_next > now_raw {
190                            break;
191                        }
192                    }
193                }
194            }
195        }
196
197        // Floor the next time to the nearest microsecond which is within the timers accuracy
198        let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(observed_next));
199        let next_time_atomic = self.next_time_ns.clone();
200        next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
201
202        let sender = self.sender.clone();
203
204        let rt = get_runtime();
205        let handle = rt.spawn(async move {
206            let clock = get_atomic_clock_realtime();
207
208            // 1-millisecond delay to account for the overhead of initializing a tokio timer
209            let overhead = Duration::from_millis(1);
210            let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
211            let mut delay = Duration::from_nanos(delay_ns);
212
213            // Subtract the estimated startup overhead; saturating to zero for sub-ms delays
214            if delay > overhead {
215                delay -= overhead;
216            } else {
217                delay = Duration::from_nanos(0);
218            }
219
220            let start = Instant::now() + delay;
221
222            let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
223
224            loop {
225                // SAFETY: `timer.tick` is cancellation safe, if the cancel branch completes
226                // first then no tick has been consumed (no event was ready).
227                timer.tick().await;
228                let now_ns = clock.get_time_ns();
229
230                let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
231
232                match callback {
233                    #[cfg(feature = "python")]
234                    TimeEventCallback::Python(ref callback) => {
235                        call_python_with_time_event(event, callback);
236                    }
237                    TimeEventCallback::Rust(_) => {
238                        debug_assert!(
239                            sender.is_some(),
240                            "LiveTimer with Rust callback requires TimeEventSender"
241                        );
242                        let sender = sender
243                            .as_ref()
244                            .expect("timer event sender was unset for Rust callback system");
245                        let handler = TimeEventHandlerV2::new(event, callback.clone());
246                        sender.send(handler);
247                    }
248                }
249
250                // Prepare next time interval
251                next_time_ns += interval_ns;
252                next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
253
254                // Check if expired
255                if let Some(stop_time_ns) = stop_time_ns
256                    && std::cmp::max(next_time_ns, now_ns) >= stop_time_ns
257                {
258                    break; // Timer expired
259                }
260            }
261        });
262
263        self.task_handle = Some(handle);
264    }
265
266    /// Cancels the timer.
267    ///
268    /// The timer will not generate a final event.
269    pub fn cancel(&mut self) {
270        log::debug!("Cancel timer '{}'", self.name);
271        if let Some(ref handle) = self.task_handle {
272            handle.abort();
273        }
274    }
275}
276
277#[cfg(feature = "python")]
278fn call_python_with_time_event(event: TimeEvent, callback: &Py<PyAny>) {
279    use nautilus_core::python::IntoPyObjectNautilusExt;
280    use pyo3::types::PyCapsule;
281
282    Python::attach(|py| {
283        // Create a new PyCapsule that owns `event` and registers a destructor so
284        // the contained `TimeEvent` is properly freed once the capsule is
285        // garbage-collected by Python. Without the destructor the memory would
286        // leak because the capsule would not know how to drop the Rust value.
287
288        // Register a destructor that simply drops the `TimeEvent` once the
289        // capsule is freed on the Python side.
290        let capsule: Py<PyAny> = PyCapsule::new_with_destructor(py, event, None, |_, _| {})
291            .expect("Error creating `PyCapsule`")
292            .into_py_any_unwrap(py);
293
294        match callback.call1(py, (capsule,)) {
295            Ok(_) => {}
296            Err(e) => tracing::error!("Error on callback: {e:?}"),
297        }
298    });
299}
300
301#[cfg(test)]
302mod tests {
303    use std::{num::NonZeroU64, sync::Arc};
304
305    use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime};
306    use rstest::*;
307    use ustr::Ustr;
308
309    use super::LiveTimer;
310    use crate::{
311        runner::TimeEventSender,
312        timer::{TimeEventCallback, TimeEventHandlerV2},
313    };
314
315    #[rstest]
316    fn test_live_timer_fire_immediately_field() {
317        let timer = LiveTimer::new(
318            Ustr::from("TEST_TIMER"),
319            NonZeroU64::new(1000).unwrap(),
320            UnixNanos::from(100),
321            None,
322            TimeEventCallback::from(|_| {}),
323            true, // fire_immediately = true
324            None, // time_event_sender
325        );
326
327        // Verify the field is set correctly
328        assert!(timer.fire_immediately);
329
330        // With fire_immediately=true, next_time_ns should be start_time_ns
331        assert_eq!(timer.next_time_ns(), UnixNanos::from(100));
332    }
333
334    #[rstest]
335    fn test_live_timer_fire_immediately_false_field() {
336        let timer = LiveTimer::new(
337            Ustr::from("TEST_TIMER"),
338            NonZeroU64::new(1000).unwrap(),
339            UnixNanos::from(100),
340            None,
341            TimeEventCallback::from(|_| {}),
342            false, // fire_immediately = false
343            None,  // time_event_sender
344        );
345
346        // Verify the field is set correctly
347        assert!(!timer.fire_immediately);
348
349        // With fire_immediately=false, next_time_ns should be start_time_ns + interval
350        assert_eq!(timer.next_time_ns(), UnixNanos::from(1100));
351    }
352
353    #[rstest]
354    fn test_live_timer_adjusts_past_due_start_time() {
355        #[derive(Debug)]
356        struct NoopSender;
357
358        impl TimeEventSender for NoopSender {
359            fn send(&self, _handler: TimeEventHandlerV2) {}
360        }
361
362        let sender = Arc::new(NoopSender);
363        let mut timer = LiveTimer::new(
364            Ustr::from("PAST_TIMER"),
365            NonZeroU64::new(1).unwrap(),
366            UnixNanos::from(0),
367            None,
368            TimeEventCallback::from(|_| {}),
369            true,
370            Some(sender),
371        );
372
373        let before = get_atomic_clock_realtime().get_time_ns();
374
375        timer.start();
376
377        assert!(timer.next_time_ns() >= before);
378
379        timer.cancel();
380    }
381}