1use std::{
19 num::NonZeroU64,
20 sync::{
21 Arc,
22 atomic::{self, AtomicU64},
23 },
24};
25
26use nautilus_core::{
27 UUID4, UnixNanos,
28 correctness::{FAILED, check_valid_string_utf8},
29 datetime::floor_to_nearest_microsecond,
30 time::get_atomic_clock_realtime,
31};
32#[cfg(feature = "python")]
33use pyo3::{Py, PyAny, Python};
34use tokio::{
35 task::JoinHandle,
36 time::{Duration, Instant},
37};
38use ustr::Ustr;
39
40use super::runtime::get_runtime;
41use crate::{
42 runner::TimeEventSender,
43 timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2},
44};
45
46#[derive(Debug)]
55pub struct LiveTimer {
56 pub name: Ustr,
58 pub interval_ns: NonZeroU64,
60 pub start_time_ns: UnixNanos,
62 pub stop_time_ns: Option<UnixNanos>,
64 pub fire_immediately: bool,
66 next_time_ns: Arc<AtomicU64>,
67 callback: TimeEventCallback,
68 task_handle: Option<JoinHandle<()>>,
69 sender: Option<Arc<dyn TimeEventSender>>,
70}
71
72impl LiveTimer {
73 #[allow(clippy::too_many_arguments)]
79 #[must_use]
80 pub fn new(
81 name: Ustr,
82 interval_ns: NonZeroU64,
83 start_time_ns: UnixNanos,
84 stop_time_ns: Option<UnixNanos>,
85 callback: TimeEventCallback,
86 fire_immediately: bool,
87 sender: Option<Arc<dyn TimeEventSender>>,
88 ) -> Self {
89 check_valid_string_utf8(name, stringify!(name)).expect(FAILED);
90
91 let next_time_ns = if fire_immediately {
92 start_time_ns.as_u64()
93 } else {
94 start_time_ns.as_u64() + interval_ns.get()
95 };
96
97 log::debug!("Creating timer '{name}'");
98
99 Self {
100 name,
101 interval_ns,
102 start_time_ns,
103 stop_time_ns,
104 fire_immediately,
105 next_time_ns: Arc::new(AtomicU64::new(next_time_ns)),
106 callback,
107 task_handle: None,
108 sender,
109 }
110 }
111
112 #[must_use]
116 pub fn next_time_ns(&self) -> UnixNanos {
117 UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
118 }
119
120 #[must_use]
125 pub fn is_expired(&self) -> bool {
126 self.task_handle
127 .as_ref()
128 .is_some_and(tokio::task::JoinHandle::is_finished)
129 }
130
131 #[allow(unused_variables)]
140 pub fn start(&mut self) {
141 let event_name = self.name;
142 let stop_time_ns = self.stop_time_ns;
143 let interval_ns = self.interval_ns.get();
144 let callback = self.callback.clone();
145
146 let clock = get_atomic_clock_realtime();
148 let now_ns = clock.get_time_ns();
149
150 let now_raw = now_ns.as_u64();
152 let mut observed_next = self.next_time_ns.load(atomic::Ordering::SeqCst);
153
154 if observed_next <= now_raw {
155 loop {
156 match self.next_time_ns.compare_exchange(
157 observed_next,
158 now_raw,
159 atomic::Ordering::SeqCst,
160 atomic::Ordering::SeqCst,
161 ) {
162 Ok(_) => {
163 if observed_next < now_raw {
164 let original = UnixNanos::from(observed_next);
165 log::warn!(
166 "Timer '{event_name}' alert time {} was in the past, adjusted to current time for immediate fire",
167 original.to_rfc3339(),
168 );
169 }
170 observed_next = now_raw;
171 break;
172 }
173 Err(actual) => {
174 observed_next = actual;
175 if observed_next > now_raw {
176 break;
177 }
178 }
179 }
180 }
181 }
182
183 let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(observed_next));
185 let next_time_atomic = self.next_time_ns.clone();
186 next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
187
188 let sender = self.sender.clone();
189
190 let rt = get_runtime();
191 let handle = rt.spawn(async move {
192 let clock = get_atomic_clock_realtime();
193
194 let overhead = Duration::from_millis(1);
196 let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
197 let mut delay = Duration::from_nanos(delay_ns);
198
199 if delay > overhead {
201 delay -= overhead;
202 } else {
203 delay = Duration::from_nanos(0);
204 }
205
206 let start = Instant::now() + delay;
207
208 let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
209
210 loop {
211 timer.tick().await;
214 let now_ns = clock.get_time_ns();
215
216 let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
217
218 match callback {
219 #[cfg(feature = "python")]
220 TimeEventCallback::Python(ref callback) => {
221 call_python_with_time_event(event, callback);
222 }
223 TimeEventCallback::Rust(_) | TimeEventCallback::RustLocal(_) => {
224 debug_assert!(
225 sender.is_some(),
226 "LiveTimer with Rust callback requires TimeEventSender"
227 );
228 let sender = sender
229 .as_ref()
230 .expect("timer event sender was unset for Rust callback system");
231 let handler = TimeEventHandlerV2::new(event, callback.clone());
232 sender.send(handler);
233 }
234 }
235
236 next_time_ns += interval_ns;
238 next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
239
240 if let Some(stop_time_ns) = stop_time_ns
242 && std::cmp::max(next_time_ns, now_ns) >= stop_time_ns
243 {
244 break; }
246 }
247 });
248
249 self.task_handle = Some(handle);
250 }
251
252 pub fn cancel(&mut self) {
256 log::debug!("Cancel timer '{}'", self.name);
257 if let Some(ref handle) = self.task_handle {
258 handle.abort();
259 }
260 }
261}
262
263#[cfg(feature = "python")]
264fn call_python_with_time_event(event: TimeEvent, callback: &Py<PyAny>) {
265 use nautilus_core::python::IntoPyObjectNautilusExt;
266 use pyo3::types::PyCapsule;
267
268 Python::attach(|py| {
269 let capsule: Py<PyAny> = PyCapsule::new_with_destructor(py, event, None, |_, _| {})
277 .expect("Error creating `PyCapsule`")
278 .into_py_any_unwrap(py);
279
280 match callback.call1(py, (capsule,)) {
281 Ok(_) => {}
282 Err(e) => tracing::error!("Error on callback: {e:?}"),
283 }
284 });
285}
286
287#[cfg(test)]
288mod tests {
289 use std::{num::NonZeroU64, sync::Arc};
290
291 use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime};
292 use rstest::*;
293 use ustr::Ustr;
294
295 use super::LiveTimer;
296 use crate::{
297 runner::TimeEventSender,
298 timer::{TimeEventCallback, TimeEventHandlerV2},
299 };
300
301 #[rstest]
302 fn test_live_timer_fire_immediately_field() {
303 let timer = LiveTimer::new(
304 Ustr::from("TEST_TIMER"),
305 NonZeroU64::new(1000).unwrap(),
306 UnixNanos::from(100),
307 None,
308 TimeEventCallback::from(|_| {}),
309 true, None, );
312
313 assert!(timer.fire_immediately);
315
316 assert_eq!(timer.next_time_ns(), UnixNanos::from(100));
318 }
319
320 #[rstest]
321 fn test_live_timer_fire_immediately_false_field() {
322 let timer = LiveTimer::new(
323 Ustr::from("TEST_TIMER"),
324 NonZeroU64::new(1000).unwrap(),
325 UnixNanos::from(100),
326 None,
327 TimeEventCallback::from(|_| {}),
328 false, None, );
331
332 assert!(!timer.fire_immediately);
334
335 assert_eq!(timer.next_time_ns(), UnixNanos::from(1100));
337 }
338
339 #[rstest]
340 fn test_live_timer_adjusts_past_due_start_time() {
341 #[derive(Debug)]
342 struct NoopSender;
343
344 impl TimeEventSender for NoopSender {
345 fn send(&self, _handler: TimeEventHandlerV2) {}
346 }
347
348 let sender = Arc::new(NoopSender);
349 let mut timer = LiveTimer::new(
350 Ustr::from("PAST_TIMER"),
351 NonZeroU64::new(1).unwrap(),
352 UnixNanos::from(0),
353 None,
354 TimeEventCallback::from(|_| {}),
355 true,
356 Some(sender),
357 );
358
359 let before = get_atomic_clock_realtime().get_time_ns();
360
361 timer.start();
362
363 assert!(timer.next_time_ns() >= before);
364
365 timer.cancel();
366 }
367}