1use std::{
19 num::NonZeroU64,
20 sync::{
21 Arc,
22 atomic::{self, AtomicU64},
23 },
24};
25
26#[cfg(feature = "python")]
27use nautilus_core::consts::NAUTILUS_PREFIX;
28use nautilus_core::{
29 UUID4, UnixNanos,
30 correctness::{FAILED, check_valid_string_utf8},
31 datetime::floor_to_nearest_microsecond,
32 time::get_atomic_clock_realtime,
33};
34#[cfg(feature = "python")]
35use pyo3::{Py, PyAny, Python};
36use tokio::{
37 task::JoinHandle,
38 time::{Duration, Instant},
39};
40use ustr::Ustr;
41
42use super::runtime::get_runtime;
43use crate::{
44 runner::TimeEventSender,
45 timer::{TimeEvent, TimeEventCallback, TimeEventHandler},
46};
47
48#[derive(Debug)]
57pub struct LiveTimer {
58 pub name: Ustr,
60 pub interval_ns: NonZeroU64,
62 pub start_time_ns: UnixNanos,
64 pub stop_time_ns: Option<UnixNanos>,
66 pub fire_immediately: bool,
68 next_time_ns: Arc<AtomicU64>,
69 callback: TimeEventCallback,
70 task_handle: Option<JoinHandle<()>>,
71 sender: Option<Arc<dyn TimeEventSender>>,
72}
73
74impl LiveTimer {
75 #[allow(clippy::too_many_arguments)]
81 #[must_use]
82 pub fn new(
83 name: Ustr,
84 interval_ns: NonZeroU64,
85 start_time_ns: UnixNanos,
86 stop_time_ns: Option<UnixNanos>,
87 callback: TimeEventCallback,
88 fire_immediately: bool,
89 sender: Option<Arc<dyn TimeEventSender>>,
90 ) -> Self {
91 check_valid_string_utf8(name, stringify!(name)).expect(FAILED);
92
93 let next_time_ns = if fire_immediately {
94 start_time_ns.as_u64()
95 } else {
96 start_time_ns.as_u64() + interval_ns.get()
97 };
98
99 log::debug!("Creating timer '{name}'");
100
101 Self {
102 name,
103 interval_ns,
104 start_time_ns,
105 stop_time_ns,
106 fire_immediately,
107 next_time_ns: Arc::new(AtomicU64::new(next_time_ns)),
108 callback,
109 task_handle: None,
110 sender,
111 }
112 }
113
114 #[must_use]
118 pub fn next_time_ns(&self) -> UnixNanos {
119 UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
120 }
121
122 #[must_use]
127 pub fn is_expired(&self) -> bool {
128 self.task_handle
129 .as_ref()
130 .is_some_and(tokio::task::JoinHandle::is_finished)
131 }
132
133 #[allow(unused_variables)]
142 pub fn start(&mut self) {
143 let event_name = self.name;
144 let stop_time_ns = self.stop_time_ns;
145 let interval_ns = self.interval_ns.get();
146 let callback = self.callback.clone();
147
148 let clock = get_atomic_clock_realtime();
150 let now_ns = clock.get_time_ns();
151
152 let now_raw = now_ns.as_u64();
154 let mut observed_next = self.next_time_ns.load(atomic::Ordering::SeqCst);
155
156 if observed_next <= now_raw {
157 loop {
158 match self.next_time_ns.compare_exchange(
159 observed_next,
160 now_raw,
161 atomic::Ordering::SeqCst,
162 atomic::Ordering::SeqCst,
163 ) {
164 Ok(_) => {
165 if observed_next < now_raw {
166 let original = UnixNanos::from(observed_next);
167 log::warn!(
168 "Timer '{event_name}' alert time {} was in the past, adjusted to current time for immediate fire",
169 original.to_rfc3339(),
170 );
171 }
172 observed_next = now_raw;
173 break;
174 }
175 Err(actual) => {
176 observed_next = actual;
177 if observed_next > now_raw {
178 break;
179 }
180 }
181 }
182 }
183 }
184
185 let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(observed_next));
187 let next_time_atomic = self.next_time_ns.clone();
188 next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
189
190 let sender = self.sender.clone();
191
192 let rt = get_runtime();
193 let handle = rt.spawn(async move {
194 let clock = get_atomic_clock_realtime();
195
196 let overhead = Duration::from_millis(1);
198 let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
199 let mut delay = Duration::from_nanos(delay_ns);
200
201 if delay > overhead {
203 delay -= overhead;
204 } else {
205 delay = Duration::from_nanos(0);
206 }
207
208 let start = Instant::now() + delay;
209
210 let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
211
212 loop {
213 timer.tick().await;
216 let now_ns = clock.get_time_ns();
217
218 let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
219
220 match callback {
221 #[cfg(feature = "python")]
222 TimeEventCallback::Python(ref callback) => {
223 call_python_with_time_event(event, callback);
224 }
225 TimeEventCallback::Rust(_) | TimeEventCallback::RustLocal(_) => {
226 debug_assert!(
227 sender.is_some(),
228 "LiveTimer with Rust callback requires TimeEventSender"
229 );
230 let sender = sender
231 .as_ref()
232 .expect("timer event sender was unset for Rust callback system");
233
234 let handler = TimeEventHandler::new(event, callback.clone());
240 sender.send(handler);
241 }
242 }
243
244 next_time_ns += interval_ns;
246 next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
247
248 if let Some(stop_time_ns) = stop_time_ns
250 && std::cmp::max(next_time_ns, now_ns) >= stop_time_ns
251 {
252 break; }
254 }
255 });
256
257 self.task_handle = Some(handle);
258 }
259
260 pub fn cancel(&mut self) {
264 log::debug!("Cancel timer '{}'", self.name);
265 if let Some(ref handle) = self.task_handle {
266 handle.abort();
267 }
268 }
269}
270
271#[cfg(feature = "python")]
272fn call_python_with_time_event(event: TimeEvent, callback: &Py<PyAny>) {
273 use nautilus_core::python::IntoPyObjectNautilusExt;
274 use pyo3::types::PyCapsule;
275
276 Python::attach(|py| {
277 let capsule: Py<PyAny> = PyCapsule::new_with_destructor(py, event, None, |_, _| {})
285 .expect("Error creating `PyCapsule`")
286 .into_py_any_unwrap(py);
287
288 match callback.call1(py, (capsule,)) {
289 Ok(_) => {}
290 Err(e) => eprintln!("{NAUTILUS_PREFIX} Error on callback: {e:?}"),
291 }
292 });
293}
294
295#[cfg(test)]
296mod tests {
297 use std::{num::NonZeroU64, sync::Arc};
298
299 use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime};
300 use rstest::*;
301 use ustr::Ustr;
302
303 use super::LiveTimer;
304 use crate::{
305 runner::TimeEventSender,
306 timer::{TimeEventCallback, TimeEventHandler},
307 };
308
309 #[rstest]
310 fn test_live_timer_fire_immediately_field() {
311 let timer = LiveTimer::new(
312 Ustr::from("TEST_TIMER"),
313 NonZeroU64::new(1000).unwrap(),
314 UnixNanos::from(100),
315 None,
316 TimeEventCallback::from(|_| {}),
317 true, None, );
320
321 assert!(timer.fire_immediately);
323
324 assert_eq!(timer.next_time_ns(), UnixNanos::from(100));
326 }
327
328 #[rstest]
329 fn test_live_timer_fire_immediately_false_field() {
330 let timer = LiveTimer::new(
331 Ustr::from("TEST_TIMER"),
332 NonZeroU64::new(1000).unwrap(),
333 UnixNanos::from(100),
334 None,
335 TimeEventCallback::from(|_| {}),
336 false, None, );
339
340 assert!(!timer.fire_immediately);
342
343 assert_eq!(timer.next_time_ns(), UnixNanos::from(1100));
345 }
346
347 #[rstest]
348 fn test_live_timer_adjusts_past_due_start_time() {
349 #[derive(Debug)]
350 struct NoopSender;
351
352 impl TimeEventSender for NoopSender {
353 fn send(&self, _handler: TimeEventHandler) {}
354 }
355
356 let sender = Arc::new(NoopSender);
357 let mut timer = LiveTimer::new(
358 Ustr::from("PAST_TIMER"),
359 NonZeroU64::new(1).unwrap(),
360 UnixNanos::from(0),
361 None,
362 TimeEventCallback::from(|_| {}),
363 true,
364 Some(sender),
365 );
366
367 let before = get_atomic_clock_realtime().get_time_ns();
368
369 timer.start();
370
371 assert!(timer.next_time_ns() >= before);
372
373 timer.cancel();
374 }
375}