1use std::{
19 num::NonZeroU64,
20 sync::{
21 Arc,
22 atomic::{self, AtomicU64},
23 },
24};
25
26use nautilus_core::{
27 UUID4, UnixNanos,
28 correctness::{FAILED, check_valid_string_utf8},
29 datetime::floor_to_nearest_microsecond,
30 time::get_atomic_clock_realtime,
31};
32#[cfg(feature = "python")]
33use pyo3::{Py, PyAny, Python};
34use tokio::{
35 task::JoinHandle,
36 time::{Duration, Instant},
37};
38use ustr::Ustr;
39
40use super::runtime::get_runtime;
41use crate::{
42 runner::TimeEventSender,
43 timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2},
44};
45
46#[derive(Debug)]
55pub struct LiveTimer {
56 pub name: Ustr,
58 pub interval_ns: NonZeroU64,
60 pub start_time_ns: UnixNanos,
62 pub stop_time_ns: Option<UnixNanos>,
64 pub fire_immediately: bool,
66 next_time_ns: Arc<AtomicU64>,
67 callback: TimeEventCallback,
68 task_handle: Option<JoinHandle<()>>,
69 sender: Option<Arc<dyn TimeEventSender>>,
70}
71
72impl LiveTimer {
73 #[allow(clippy::too_many_arguments)]
79 #[must_use]
80 pub fn new(
81 name: Ustr,
82 interval_ns: NonZeroU64,
83 start_time_ns: UnixNanos,
84 stop_time_ns: Option<UnixNanos>,
85 callback: TimeEventCallback,
86 fire_immediately: bool,
87 sender: Option<Arc<dyn TimeEventSender>>,
88 ) -> Self {
89 check_valid_string_utf8(name, stringify!(name)).expect(FAILED);
90
91 let next_time_ns = if fire_immediately {
92 start_time_ns.as_u64()
93 } else {
94 start_time_ns.as_u64() + interval_ns.get()
95 };
96
97 log::debug!("Creating timer '{name}'");
98
99 Self {
100 name,
101 interval_ns,
102 start_time_ns,
103 stop_time_ns,
104 fire_immediately,
105 next_time_ns: Arc::new(AtomicU64::new(next_time_ns)),
106 callback,
107 task_handle: None,
108 sender,
109 }
110 }
111
112 #[must_use]
116 pub fn next_time_ns(&self) -> UnixNanos {
117 UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
118 }
119
120 #[must_use]
125 pub fn is_expired(&self) -> bool {
126 self.task_handle
127 .as_ref()
128 .is_some_and(tokio::task::JoinHandle::is_finished)
129 }
130
131 #[allow(unused_variables)]
143 pub fn start(&mut self) {
144 #[cfg(not(test))]
149 assert!(
150 !self.callback.is_rust(),
151 "LiveTimer cannot use Rust callbacks (they are not thread-safe). \
152 Use Python callbacks for live trading, or TestClock for backtesting."
153 );
154
155 let event_name = self.name;
156 let stop_time_ns = self.stop_time_ns;
157 let interval_ns = self.interval_ns.get();
158 let callback = self.callback.clone();
159
160 let clock = get_atomic_clock_realtime();
162 let now_ns = clock.get_time_ns();
163
164 let now_raw = now_ns.as_u64();
166 let mut observed_next = self.next_time_ns.load(atomic::Ordering::SeqCst);
167
168 if observed_next <= now_raw {
169 loop {
170 match self.next_time_ns.compare_exchange(
171 observed_next,
172 now_raw,
173 atomic::Ordering::SeqCst,
174 atomic::Ordering::SeqCst,
175 ) {
176 Ok(_) => {
177 if observed_next < now_raw {
178 let original = UnixNanos::from(observed_next);
179 log::warn!(
180 "Timer '{event_name}' alert time {} was in the past, adjusted to current time for immediate fire",
181 original.to_rfc3339(),
182 );
183 }
184 observed_next = now_raw;
185 break;
186 }
187 Err(actual) => {
188 observed_next = actual;
189 if observed_next > now_raw {
190 break;
191 }
192 }
193 }
194 }
195 }
196
197 let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(observed_next));
199 let next_time_atomic = self.next_time_ns.clone();
200 next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
201
202 let sender = self.sender.clone();
203
204 let rt = get_runtime();
205 let handle = rt.spawn(async move {
206 let clock = get_atomic_clock_realtime();
207
208 let overhead = Duration::from_millis(1);
210 let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
211 let mut delay = Duration::from_nanos(delay_ns);
212
213 if delay > overhead {
215 delay -= overhead;
216 } else {
217 delay = Duration::from_nanos(0);
218 }
219
220 let start = Instant::now() + delay;
221
222 let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
223
224 loop {
225 timer.tick().await;
228 let now_ns = clock.get_time_ns();
229
230 let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
231
232 match callback {
233 #[cfg(feature = "python")]
234 TimeEventCallback::Python(ref callback) => {
235 call_python_with_time_event(event, callback);
236 }
237 TimeEventCallback::Rust(_) => {
238 debug_assert!(
239 sender.is_some(),
240 "LiveTimer with Rust callback requires TimeEventSender"
241 );
242 let sender = sender
243 .as_ref()
244 .expect("timer event sender was unset for Rust callback system");
245 let handler = TimeEventHandlerV2::new(event, callback.clone());
246 sender.send(handler);
247 }
248 }
249
250 next_time_ns += interval_ns;
252 next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
253
254 if let Some(stop_time_ns) = stop_time_ns
256 && std::cmp::max(next_time_ns, now_ns) >= stop_time_ns
257 {
258 break; }
260 }
261 });
262
263 self.task_handle = Some(handle);
264 }
265
266 pub fn cancel(&mut self) {
270 log::debug!("Cancel timer '{}'", self.name);
271 if let Some(ref handle) = self.task_handle {
272 handle.abort();
273 }
274 }
275}
276
277#[cfg(feature = "python")]
278fn call_python_with_time_event(event: TimeEvent, callback: &Py<PyAny>) {
279 use nautilus_core::python::IntoPyObjectNautilusExt;
280 use pyo3::types::PyCapsule;
281
282 Python::attach(|py| {
283 let capsule: Py<PyAny> = PyCapsule::new_with_destructor(py, event, None, |_, _| {})
291 .expect("Error creating `PyCapsule`")
292 .into_py_any_unwrap(py);
293
294 match callback.call1(py, (capsule,)) {
295 Ok(_) => {}
296 Err(e) => tracing::error!("Error on callback: {e:?}"),
297 }
298 });
299}
300
301#[cfg(test)]
302mod tests {
303 use std::{num::NonZeroU64, sync::Arc};
304
305 use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime};
306 use rstest::*;
307 use ustr::Ustr;
308
309 use super::LiveTimer;
310 use crate::{
311 runner::TimeEventSender,
312 timer::{TimeEventCallback, TimeEventHandlerV2},
313 };
314
315 #[rstest]
316 fn test_live_timer_fire_immediately_field() {
317 let timer = LiveTimer::new(
318 Ustr::from("TEST_TIMER"),
319 NonZeroU64::new(1000).unwrap(),
320 UnixNanos::from(100),
321 None,
322 TimeEventCallback::from(|_| {}),
323 true, None, );
326
327 assert!(timer.fire_immediately);
329
330 assert_eq!(timer.next_time_ns(), UnixNanos::from(100));
332 }
333
334 #[rstest]
335 fn test_live_timer_fire_immediately_false_field() {
336 let timer = LiveTimer::new(
337 Ustr::from("TEST_TIMER"),
338 NonZeroU64::new(1000).unwrap(),
339 UnixNanos::from(100),
340 None,
341 TimeEventCallback::from(|_| {}),
342 false, None, );
345
346 assert!(!timer.fire_immediately);
348
349 assert_eq!(timer.next_time_ns(), UnixNanos::from(1100));
351 }
352
353 #[rstest]
354 fn test_live_timer_adjusts_past_due_start_time() {
355 #[derive(Debug)]
356 struct NoopSender;
357
358 impl TimeEventSender for NoopSender {
359 fn send(&self, _handler: TimeEventHandlerV2) {}
360 }
361
362 let sender = Arc::new(NoopSender);
363 let mut timer = LiveTimer::new(
364 Ustr::from("PAST_TIMER"),
365 NonZeroU64::new(1).unwrap(),
366 UnixNanos::from(0),
367 None,
368 TimeEventCallback::from(|_| {}),
369 true,
370 Some(sender),
371 );
372
373 let before = get_atomic_clock_realtime().get_time_ns();
374
375 timer.start();
376
377 assert!(timer.next_time_ns() >= before);
378
379 timer.cancel();
380 }
381}