1use std::{
19 collections::BinaryHeap,
20 ops::Deref,
21 pin::Pin,
22 sync::Arc,
23 task::{Context, Poll},
24};
25
26use ahash::AHashMap;
27use futures::Stream;
28use nautilus_core::{
29 AtomicTime, UnixNanos, correctness::check_predicate_true, time::get_atomic_clock_realtime,
30};
31use ustr::Ustr;
32
33use super::timer::LiveTimer;
34use crate::{
35 clock::{CallbackRegistry, Clock, validate_and_prepare_time_alert, validate_and_prepare_timer},
36 runner::{TimeEventSender, try_get_time_event_sender},
37 timer::{
38 ScheduledTimeEvent, TimeEvent, TimeEventCallback, TimeEventHandlerV2, create_valid_interval,
39 },
40};
41
42#[derive(Debug)]
50pub struct LiveClock {
51 time: &'static AtomicTime,
52 timers: AHashMap<Ustr, LiveTimer>,
53 callbacks: CallbackRegistry,
54 sender: Option<Arc<dyn TimeEventSender>>,
55}
56
57impl LiveClock {
58 #[must_use]
60 pub fn new(sender: Option<Arc<dyn TimeEventSender>>) -> Self {
61 Self {
62 time: get_atomic_clock_realtime(),
63 timers: AHashMap::new(),
64 callbacks: CallbackRegistry::new(),
65 sender,
66 }
67 }
68
69 #[must_use]
70 pub const fn get_timers(&self) -> &AHashMap<Ustr, LiveTimer> {
71 &self.timers
72 }
73
74 fn clear_expired_timers(&mut self) {
75 self.timers.retain(|_, timer| !timer.is_expired());
76 }
77
78 fn replace_existing_timer_if_needed(&mut self, name: &Ustr) {
79 if self.timer_exists(name) {
80 self.cancel_timer(name.as_str());
81 log::warn!("Timer '{name}' replaced");
82 }
83 }
84}
85
86impl Default for LiveClock {
87 fn default() -> Self {
91 Self::new(try_get_time_event_sender())
92 }
93}
94
95impl Deref for LiveClock {
96 type Target = AtomicTime;
97
98 fn deref(&self) -> &Self::Target {
99 self.time
100 }
101}
102
103impl Clock for LiveClock {
104 fn timestamp_ns(&self) -> UnixNanos {
105 self.time.get_time_ns()
106 }
107
108 fn timestamp_us(&self) -> u64 {
109 self.time.get_time_us()
110 }
111
112 fn timestamp_ms(&self) -> u64 {
113 self.time.get_time_ms()
114 }
115
116 fn timestamp(&self) -> f64 {
117 self.time.get_time()
118 }
119
120 fn timer_names(&self) -> Vec<&str> {
121 self.timers
122 .iter()
123 .filter(|(_, timer)| !timer.is_expired())
124 .map(|(k, _)| k.as_str())
125 .collect()
126 }
127
128 fn timer_count(&self) -> usize {
129 self.timers
130 .iter()
131 .filter(|(_, timer)| !timer.is_expired())
132 .count()
133 }
134
135 fn timer_exists(&self, name: &Ustr) -> bool {
136 self.timers.contains_key(name)
137 }
138
139 fn register_default_handler(&mut self, handler: TimeEventCallback) {
140 self.callbacks.register_default_handler(handler);
141 }
142
143 #[allow(unused_variables)]
148 fn get_handler(&self, event: TimeEvent) -> TimeEventHandlerV2 {
149 self.callbacks.get_handler(event)
150 }
151
152 fn set_time_alert_ns(
153 &mut self,
154 name: &str,
155 alert_time_ns: UnixNanos,
156 callback: Option<TimeEventCallback>,
157 allow_past: Option<bool>,
158 ) -> anyhow::Result<()> {
159 let ts_now = self.get_time_ns();
160 let (name, alert_time_ns) =
161 validate_and_prepare_time_alert(name, alert_time_ns, allow_past, ts_now)?;
162
163 self.replace_existing_timer_if_needed(&name);
164
165 check_predicate_true(
166 callback.is_some() | self.callbacks.has_any_callback(&name),
167 "No callbacks provided",
168 )?;
169
170 let callback = if let Some(callback) = callback {
171 self.callbacks.register_callback(name, callback.clone());
172 callback
173 } else {
174 self.callbacks
175 .get_callback(&name)
176 .expect("Callback should exist")
177 };
178
179 let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
181
182 let mut timer = LiveTimer::new(
183 name,
184 interval_ns,
185 ts_now,
186 Some(alert_time_ns),
187 callback,
188 false,
189 self.sender.clone(),
190 );
191
192 timer.start();
193
194 self.clear_expired_timers();
195 self.timers.insert(name, timer);
196
197 Ok(())
198 }
199
200 fn set_timer_ns(
201 &mut self,
202 name: &str,
203 interval_ns: u64,
204 start_time_ns: Option<UnixNanos>,
205 stop_time_ns: Option<UnixNanos>,
206 callback: Option<TimeEventCallback>,
207 allow_past: Option<bool>,
208 fire_immediately: Option<bool>,
209 ) -> anyhow::Result<()> {
210 let ts_now = self.get_time_ns();
211 let (name, start_time_ns, stop_time_ns, _allow_past, fire_immediately) =
212 validate_and_prepare_timer(
213 name,
214 interval_ns,
215 start_time_ns,
216 stop_time_ns,
217 allow_past,
218 fire_immediately,
219 ts_now,
220 )?;
221
222 check_predicate_true(
223 callback.is_some() | self.callbacks.has_any_callback(&name),
224 "No callbacks provided",
225 )?;
226
227 self.replace_existing_timer_if_needed(&name);
228
229 let callback = if let Some(callback) = callback {
230 self.callbacks.register_callback(name, callback.clone());
231 callback
232 } else {
233 self.callbacks
234 .get_callback(&name)
235 .expect("Callback should exist")
236 };
237
238 let interval_ns = create_valid_interval(interval_ns);
239
240 let mut timer = LiveTimer::new(
241 name,
242 interval_ns,
243 start_time_ns,
244 stop_time_ns,
245 callback,
246 fire_immediately,
247 self.sender.clone(),
248 );
249 timer.start();
250
251 self.clear_expired_timers();
252 self.timers.insert(name, timer);
253
254 Ok(())
255 }
256
257 fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
258 self.timers
259 .get(&Ustr::from(name))
260 .map(|timer| timer.next_time_ns())
261 }
262
263 fn cancel_timer(&mut self, name: &str) {
264 let timer = self.timers.remove(&Ustr::from(name));
265 if let Some(mut timer) = timer {
266 timer.cancel();
267 }
268 }
269
270 fn cancel_timers(&mut self) {
271 for timer in &mut self.timers.values_mut() {
272 timer.cancel();
273 }
274
275 self.timers.clear();
276 }
277
278 fn reset(&mut self) {
279 self.cancel_timers();
280 self.callbacks.clear();
281 }
282}
283
284#[derive(Debug)]
286pub struct TimeEventStream {
287 heap: Arc<tokio::sync::Mutex<BinaryHeap<ScheduledTimeEvent>>>,
288}
289
290impl TimeEventStream {
291 pub const fn new(heap: Arc<tokio::sync::Mutex<BinaryHeap<ScheduledTimeEvent>>>) -> Self {
292 Self { heap }
293 }
294}
295
296impl Stream for TimeEventStream {
297 type Item = TimeEvent;
298
299 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
300 let mut heap = match self.heap.try_lock() {
301 Ok(guard) => guard,
302 Err(e) => {
303 tracing::error!("Unable to get LiveClock heap lock: {e}");
304 cx.waker().wake_by_ref();
305 return Poll::Pending;
306 }
307 };
308
309 if let Some(event) = heap.pop() {
310 Poll::Ready(Some(event.into_inner()))
311 } else {
312 cx.waker().wake_by_ref();
313 Poll::Pending
314 }
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use std::{
321 sync::{Arc, Mutex},
322 time::Duration,
323 };
324
325 use nautilus_core::{MUTEX_POISONED, UnixNanos, time::get_atomic_clock_realtime};
326 use rstest::rstest;
327 use ustr::Ustr;
328
329 use super::*;
330 use crate::{
331 clock::Clock,
332 runner::TimeEventSender,
333 testing::wait_until,
334 timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2},
335 };
336
337 #[derive(Debug)]
338 struct CollectingSender {
339 events: Arc<Mutex<Vec<(TimeEvent, UnixNanos)>>>,
340 }
341
342 impl CollectingSender {
343 fn new(events: Arc<Mutex<Vec<(TimeEvent, UnixNanos)>>>) -> Self {
344 Self { events }
345 }
346 }
347
348 impl TimeEventSender for CollectingSender {
349 fn send(&self, handler: TimeEventHandlerV2) {
350 let TimeEventHandlerV2 { event, callback } = handler;
351 let now_ns = get_atomic_clock_realtime().get_time_ns();
352 let event_clone = event.clone();
353 callback.call(event);
354 self.events
355 .lock()
356 .expect(MUTEX_POISONED)
357 .push((event_clone, now_ns));
358 }
359 }
360
361 fn wait_for_events(
362 events: &Arc<Mutex<Vec<(TimeEvent, UnixNanos)>>>,
363 target: usize,
364 timeout: Duration,
365 ) {
366 wait_until(
367 || events.lock().expect(MUTEX_POISONED).len() >= target,
368 timeout,
369 );
370 }
371
372 #[rstest]
373 fn test_live_clock_timer_replacement_cancels_previous_task() {
374 let events = Arc::new(Mutex::new(Vec::new()));
375 let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
376
377 let mut clock = LiveClock::new(Some(sender));
378 clock.register_default_handler(TimeEventCallback::from(|_| {}));
379
380 let fast_interval = Duration::from_millis(10).as_nanos() as u64;
381 clock
382 .set_timer_ns("replace", fast_interval, None, None, None, None, None)
383 .unwrap();
384
385 wait_for_events(&events, 2, Duration::from_millis(200));
386 events.lock().expect(MUTEX_POISONED).clear();
387
388 let slow_interval = Duration::from_millis(30).as_nanos() as u64;
389 clock
390 .set_timer_ns("replace", slow_interval, None, None, None, None, None)
391 .unwrap();
392
393 wait_for_events(&events, 3, Duration::from_millis(300));
394
395 let snapshot = events.lock().expect(MUTEX_POISONED).clone();
396 let diffs: Vec<u64> = snapshot
397 .windows(2)
398 .map(|pair| pair[1].0.ts_event.as_u64() - pair[0].0.ts_event.as_u64())
399 .collect();
400
401 assert!(!diffs.is_empty());
402 for diff in diffs {
403 assert_ne!(diff, fast_interval);
404 }
405
406 clock.cancel_timers();
407 }
408
409 #[rstest]
410 fn test_live_clock_time_alert_persists_callback() {
411 let events = Arc::new(Mutex::new(Vec::new()));
412 let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
413
414 let mut clock = LiveClock::new(Some(sender));
415 clock.register_default_handler(TimeEventCallback::from(|_| {}));
416
417 let now = clock.timestamp_ns();
418 let alert_time = now + 1_000_u64;
419
420 clock
421 .set_time_alert_ns("alert-callback", alert_time, None, None)
422 .unwrap();
423
424 assert!(
425 clock
426 .callbacks
427 .has_any_callback(&Ustr::from("alert-callback"))
428 );
429
430 clock.cancel_timers();
431 }
432
433 #[rstest]
434 fn test_live_clock_reset_stops_active_timers() {
435 let events = Arc::new(Mutex::new(Vec::new()));
436 let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
437
438 let mut clock = LiveClock::new(Some(sender));
439 clock.register_default_handler(TimeEventCallback::from(|_| {}));
440
441 clock
442 .set_timer_ns(
443 "reset-test",
444 Duration::from_millis(15).as_nanos() as u64,
445 None,
446 None,
447 None,
448 None,
449 None,
450 )
451 .unwrap();
452
453 wait_for_events(&events, 2, Duration::from_millis(250));
454
455 clock.reset();
456
457 let start = std::time::Instant::now();
459 wait_until(
460 || start.elapsed() >= Duration::from_millis(50),
461 Duration::from_secs(2),
462 );
463
464 events.lock().expect(MUTEX_POISONED).clear();
466
467 let start = std::time::Instant::now();
469 wait_until(
470 || start.elapsed() >= Duration::from_millis(50),
471 Duration::from_secs(2),
472 );
473 assert!(events.lock().expect(MUTEX_POISONED).is_empty());
474 }
475
476 #[rstest]
477 fn test_live_timer_short_delay_not_early() {
478 let events = Arc::new(Mutex::new(Vec::new()));
479 let sender = Arc::new(CollectingSender::new(Arc::clone(&events)));
480
481 let mut clock = LiveClock::new(Some(sender));
482 clock.register_default_handler(TimeEventCallback::from(|_| {}));
483
484 let now = clock.timestamp_ns();
485 let start_time = UnixNanos::from(*now + 500_000); let interval_ns = 1_000_000;
487
488 clock
489 .set_timer_ns(
490 "short-delay",
491 interval_ns,
492 Some(start_time),
493 None,
494 None,
495 None,
496 Some(true),
497 )
498 .unwrap();
499
500 wait_for_events(&events, 1, Duration::from_millis(100));
501
502 let snapshot = events.lock().expect(MUTEX_POISONED).clone();
503 assert!(!snapshot.is_empty());
504
505 for (event, actual_ts) in &snapshot {
506 assert!(actual_ts.as_u64() >= event.ts_event.as_u64());
507 }
508
509 clock.cancel_timers();
510 }
511}