1use std::{
19 num::NonZeroU64,
20 sync::{
21 Arc,
22 atomic::{self, AtomicU64},
23 },
24};
25
26#[cfg(feature = "python")]
27use nautilus_core::consts::NAUTILUS_PREFIX;
28use nautilus_core::{
29 UUID4, UnixNanos,
30 correctness::{FAILED, check_valid_string_utf8},
31 datetime::floor_to_nearest_microsecond,
32 time::get_atomic_clock_realtime,
33};
34#[cfg(feature = "python")]
35use pyo3::{Py, PyAny, Python};
36use tokio::{
37 task::JoinHandle,
38 time::{Duration, Instant},
39};
40use ustr::Ustr;
41
42use super::runtime::get_runtime;
43use crate::{
44 runner::TimeEventSender,
45 timer::{TimeEvent, TimeEventCallback, TimeEventHandler},
46};
47
48#[derive(Debug)]
57pub struct LiveTimer {
58 pub name: Ustr,
60 pub interval_ns: NonZeroU64,
62 pub start_time_ns: UnixNanos,
64 pub stop_time_ns: Option<UnixNanos>,
66 pub fire_immediately: bool,
68 next_time_ns: Arc<AtomicU64>,
69 callback: TimeEventCallback,
70 task_handle: Option<JoinHandle<()>>,
71 sender: Option<Arc<dyn TimeEventSender>>,
72}
73
74impl LiveTimer {
75 #[allow(clippy::too_many_arguments)]
81 #[must_use]
82 pub fn new(
83 name: Ustr,
84 interval_ns: NonZeroU64,
85 start_time_ns: UnixNanos,
86 stop_time_ns: Option<UnixNanos>,
87 callback: TimeEventCallback,
88 fire_immediately: bool,
89 sender: Option<Arc<dyn TimeEventSender>>,
90 ) -> Self {
91 check_valid_string_utf8(name, stringify!(name)).expect(FAILED);
92
93 let next_time_ns = if fire_immediately {
94 start_time_ns.as_u64()
95 } else {
96 start_time_ns.as_u64() + interval_ns.get()
97 };
98
99 log::debug!("Creating timer '{name}'");
100
101 Self {
102 name,
103 interval_ns,
104 start_time_ns,
105 stop_time_ns,
106 fire_immediately,
107 next_time_ns: Arc::new(AtomicU64::new(next_time_ns)),
108 callback,
109 task_handle: None,
110 sender,
111 }
112 }
113
114 #[must_use]
118 pub fn next_time_ns(&self) -> UnixNanos {
119 UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
120 }
121
122 #[must_use]
127 pub fn is_expired(&self) -> bool {
128 self.task_handle
129 .as_ref()
130 .is_some_and(tokio::task::JoinHandle::is_finished)
131 }
132
133 #[allow(unused_variables)]
142 pub fn start(&mut self) {
143 let event_name = self.name;
144 let stop_time_ns = self.stop_time_ns;
145 let interval_ns = self.interval_ns.get();
146 let callback = self.callback.clone();
147
148 let clock = get_atomic_clock_realtime();
150 let now_ns = clock.get_time_ns();
151
152 let now_raw = now_ns.as_u64();
154 let mut observed_next = self.next_time_ns.load(atomic::Ordering::SeqCst);
155
156 if observed_next <= now_raw {
157 loop {
158 match self.next_time_ns.compare_exchange(
159 observed_next,
160 now_raw,
161 atomic::Ordering::SeqCst,
162 atomic::Ordering::SeqCst,
163 ) {
164 Ok(_) => {
165 if observed_next < now_raw {
166 let original = UnixNanos::from(observed_next);
167 log::warn!(
168 "Timer '{event_name}' alert time {} was in the past, adjusted to current time for immediate fire",
169 original.to_rfc3339(),
170 );
171 }
172 observed_next = now_raw;
173 break;
174 }
175 Err(actual) => {
176 observed_next = actual;
177 if observed_next > now_raw {
178 break;
179 }
180 }
181 }
182 }
183 }
184
185 let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(observed_next));
187 let next_time_atomic = self.next_time_ns.clone();
188 next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
189
190 let sender = self.sender.clone();
191
192 let rt = get_runtime();
193 let handle = rt.spawn(async move {
194 let clock = get_atomic_clock_realtime();
195
196 let overhead = Duration::from_millis(1);
198 let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
199 let mut delay = Duration::from_nanos(delay_ns);
200
201 if delay > overhead {
203 delay -= overhead;
204 } else {
205 delay = Duration::from_nanos(0);
206 }
207
208 let start = Instant::now() + delay;
209
210 let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
211
212 loop {
213 timer.tick().await;
216 let now_ns = clock.get_time_ns();
217
218 let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
219
220 match callback {
221 #[cfg(feature = "python")]
222 TimeEventCallback::Python(ref callback) => {
223 call_python_with_time_event(event, callback);
224 }
225 TimeEventCallback::Rust(_) | TimeEventCallback::RustLocal(_) => {
226 debug_assert!(
227 sender.is_some(),
228 "LiveTimer with Rust callback requires TimeEventSender"
229 );
230 let sender = sender
231 .as_ref()
232 .expect("timer event sender was unset for Rust callback system");
233 let handler = TimeEventHandler::new(event, callback.clone());
234 sender.send(handler);
235 }
236 }
237
238 next_time_ns += interval_ns;
240 next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
241
242 if let Some(stop_time_ns) = stop_time_ns
244 && std::cmp::max(next_time_ns, now_ns) >= stop_time_ns
245 {
246 break; }
248 }
249 });
250
251 self.task_handle = Some(handle);
252 }
253
254 pub fn cancel(&mut self) {
258 log::debug!("Cancel timer '{}'", self.name);
259 if let Some(ref handle) = self.task_handle {
260 handle.abort();
261 }
262 }
263}
264
265#[cfg(feature = "python")]
266fn call_python_with_time_event(event: TimeEvent, callback: &Py<PyAny>) {
267 use nautilus_core::python::IntoPyObjectNautilusExt;
268 use pyo3::types::PyCapsule;
269
270 Python::attach(|py| {
271 let capsule: Py<PyAny> = PyCapsule::new_with_destructor(py, event, None, |_, _| {})
279 .expect("Error creating `PyCapsule`")
280 .into_py_any_unwrap(py);
281
282 match callback.call1(py, (capsule,)) {
283 Ok(_) => {}
284 Err(e) => eprintln!("{NAUTILUS_PREFIX} Error on callback: {e:?}"),
285 }
286 });
287}
288
289#[cfg(test)]
290mod tests {
291 use std::{num::NonZeroU64, sync::Arc};
292
293 use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime};
294 use rstest::*;
295 use ustr::Ustr;
296
297 use super::LiveTimer;
298 use crate::{
299 runner::TimeEventSender,
300 timer::{TimeEventCallback, TimeEventHandler},
301 };
302
303 #[rstest]
304 fn test_live_timer_fire_immediately_field() {
305 let timer = LiveTimer::new(
306 Ustr::from("TEST_TIMER"),
307 NonZeroU64::new(1000).unwrap(),
308 UnixNanos::from(100),
309 None,
310 TimeEventCallback::from(|_| {}),
311 true, None, );
314
315 assert!(timer.fire_immediately);
317
318 assert_eq!(timer.next_time_ns(), UnixNanos::from(100));
320 }
321
322 #[rstest]
323 fn test_live_timer_fire_immediately_false_field() {
324 let timer = LiveTimer::new(
325 Ustr::from("TEST_TIMER"),
326 NonZeroU64::new(1000).unwrap(),
327 UnixNanos::from(100),
328 None,
329 TimeEventCallback::from(|_| {}),
330 false, None, );
333
334 assert!(!timer.fire_immediately);
336
337 assert_eq!(timer.next_time_ns(), UnixNanos::from(1100));
339 }
340
341 #[rstest]
342 fn test_live_timer_adjusts_past_due_start_time() {
343 #[derive(Debug)]
344 struct NoopSender;
345
346 impl TimeEventSender for NoopSender {
347 fn send(&self, _handler: TimeEventHandler) {}
348 }
349
350 let sender = Arc::new(NoopSender);
351 let mut timer = LiveTimer::new(
352 Ustr::from("PAST_TIMER"),
353 NonZeroU64::new(1).unwrap(),
354 UnixNanos::from(0),
355 None,
356 TimeEventCallback::from(|_| {}),
357 true,
358 Some(sender),
359 );
360
361 let before = get_atomic_clock_realtime().get_time_ns();
362
363 timer.start();
364
365 assert!(timer.next_time_ns() >= before);
366
367 timer.cancel();
368 }
369}