nautilus_core/
time.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! The core `AtomicTime` for real-time and static clocks.
17//!
18//! This module provides an atomic time abstraction that supports both real-time and static
19//! clocks. It ensures thread-safe operations and monotonic time retrieval with nanosecond precision.
20//!
21//! # Modes
22//!
23//! - **Real-time mode:** The clock continuously syncs with system wall-clock time (via
24//!   [`SystemTime::now()`]). To ensure strict monotonic increments across multiple threads,
25//!   the internal updates use an atomic compare-and-exchange loop (`time_since_epoch`).
26//!   While this guarantees that every new timestamp is at least one nanosecond greater than the
27//!   last, it may introduce higher contention if many threads call it heavily.
28//!
29//! - **Static mode:** The clock is manually controlled via [`AtomicTime::set_time`] or [`AtomicTime::increment_time`],
30//!   which can be useful for simulations or backtesting. You can switch modes at runtime using
31//!   [`AtomicTime::make_realtime`] or [`AtomicTime::make_static`]. In **static mode**, we use
32//!   acquire/release semantics so that updates from one thread can be observed by another;
33//!   however, we do not enforce strict global ordering for manual updates. If you need strong,
34//!   multi-threaded ordering in **static mode**, you must coordinate higher-level synchronization yourself.
35
36use std::{
37    ops::Deref,
38    sync::{
39        OnceLock,
40        atomic::{AtomicBool, AtomicU64, Ordering},
41    },
42    time::{Duration, SystemTime, UNIX_EPOCH},
43};
44
45use crate::{
46    UnixNanos,
47    datetime::{NANOSECONDS_IN_MICROSECOND, NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND},
48};
49
50/// Global atomic time in **real-time mode** for use across the system.
51///
52/// This clock operates in **real-time mode**, synchronizing with the system clock.
53/// It provides globally unique, strictly increasing timestamps across threads.
54pub static ATOMIC_CLOCK_REALTIME: OnceLock<AtomicTime> = OnceLock::new();
55
56/// Global atomic time in **static mode** for use across the system.
57///
58/// This clock operates in **static mode**, where the time value can be set or incremented
59/// manually. Useful for backtesting or simulated time control.
60pub static ATOMIC_CLOCK_STATIC: OnceLock<AtomicTime> = OnceLock::new();
61
62/// Returns a static reference to the global atomic clock in **real-time mode**.
63///
64/// This clock uses [`AtomicTime::time_since_epoch`] under the hood, ensuring strictly increasing
65/// timestamps across threads.
66pub fn get_atomic_clock_realtime() -> &'static AtomicTime {
67    ATOMIC_CLOCK_REALTIME.get_or_init(AtomicTime::default)
68}
69
70/// Returns a static reference to the global atomic clock in **static mode**.
71///
72/// This clock allows manual time control via [`AtomicTime::set_time`] or [`AtomicTime::increment_time`],
73/// and does not automatically sync with system time.
74pub fn get_atomic_clock_static() -> &'static AtomicTime {
75    ATOMIC_CLOCK_STATIC.get_or_init(|| AtomicTime::new(false, UnixNanos::default()))
76}
77
78/// Returns the duration since the UNIX epoch based on [`SystemTime::now()`].
79///
80/// Panics if the system time is set before the UNIX epoch.
81#[inline(always)]
82#[must_use]
83pub fn duration_since_unix_epoch() -> Duration {
84    SystemTime::now()
85        .duration_since(UNIX_EPOCH)
86        .expect("Error calling `SystemTime`")
87}
88
89/// Returns the current UNIX time in nanoseconds, based on [`SystemTime::now()`].
90#[inline(always)]
91#[must_use]
92pub fn nanos_since_unix_epoch() -> u64 {
93    duration_since_unix_epoch().as_nanos() as u64
94}
95
96/// Represents an atomic timekeeping structure.
97///
98/// [`AtomicTime`] can act as a real-time clock or static clock based on its mode.
99/// It uses an [`AtomicU64`] to atomically update the value using only immutable
100/// references.
101///
102/// The `realtime` flag indicates which mode the clock is currently in.
103/// For concurrency, this struct uses atomic operations with appropriate memory orderings:
104/// - **Acquire/Release** for reading/writing in **static mode**,
105/// - **Compare-and-exchange (`AcqRel`)** in real-time mode to guarantee monotonic increments.
106#[repr(C)]
107#[derive(Debug)]
108pub struct AtomicTime {
109    /// Indicates whether the clock is operating in **real-time mode** (`true`) or **static mode** (`false`)
110    pub realtime: AtomicBool,
111    /// The last recorded time (in UNIX nanoseconds). Updated atomically with compare-and-exchange
112    /// in **real-time mode**, or simple store/fetch in **static mode**.
113    pub timestamp_ns: AtomicU64,
114}
115
116impl Deref for AtomicTime {
117    type Target = AtomicU64;
118
119    fn deref(&self) -> &Self::Target {
120        &self.timestamp_ns
121    }
122}
123
124impl Default for AtomicTime {
125    /// Creates a new default [`AtomicTime`] instance in **real-time mode**, starting at the current system time.
126    fn default() -> Self {
127        Self::new(true, UnixNanos::default())
128    }
129}
130
131impl AtomicTime {
132    /// Creates a new [`AtomicTime`] instance.
133    ///
134    /// - If `realtime` is `true`, the provided `time` is used only as an initial placeholder
135    ///   and will quickly be overridden by calls to [`AtomicTime::time_since_epoch`].
136    /// - If `realtime` is `false`, this clock starts in **static mode**, with the given `time`
137    ///   as its current value.
138    #[must_use]
139    pub fn new(realtime: bool, time: UnixNanos) -> Self {
140        Self {
141            realtime: AtomicBool::new(realtime),
142            timestamp_ns: AtomicU64::new(time.into()),
143        }
144    }
145
146    /// Returns the current time in nanoseconds, based on the clock’s mode.
147    ///
148    /// - In **real-time mode**, calls [`AtomicTime::time_since_epoch`], ensuring strictly increasing
149    ///   timestamps across threads, using `AcqRel` semantics for the underlying atomic.
150    /// - In **static mode**, reads the stored time using [`Ordering::Acquire`]. Updates by other
151    ///   threads using [`AtomicTime::set_time`] or [`AtomicTime::increment_time`] (Release/AcqRel)
152    ///   will be visible here.
153    #[must_use]
154    pub fn get_time_ns(&self) -> UnixNanos {
155        if self.realtime.load(Ordering::Acquire) {
156            self.time_since_epoch()
157        } else {
158            UnixNanos::from(self.timestamp_ns.load(Ordering::Acquire))
159        }
160    }
161
162    /// Return the current time as microseconds.
163    #[must_use]
164    pub fn get_time_us(&self) -> u64 {
165        self.get_time_ns().as_u64() / NANOSECONDS_IN_MICROSECOND
166    }
167
168    /// Return the current time as milliseconds.
169    #[must_use]
170    pub fn get_time_ms(&self) -> u64 {
171        self.get_time_ns().as_u64() / NANOSECONDS_IN_MILLISECOND
172    }
173
174    /// Return the current time as seconds.
175    #[must_use]
176    pub fn get_time(&self) -> f64 {
177        self.get_time_ns().as_f64() / (NANOSECONDS_IN_SECOND as f64)
178    }
179
180    /// Manually sets a new time for the clock (only meaningful in **static mode**).
181    ///
182    /// This uses an atomic store with [`Ordering::Release`], so any thread reading with
183    /// [`Ordering::Acquire`] will see the updated time. This does *not* enforce a total ordering
184    /// among all threads, but is enough to ensure that once a thread sees this update, it also
185    /// sees all writes made before this call in the writing thread.
186    ///
187    /// Typically used in single-threaded scenarios or coordinated concurrency in **static mode**,
188    /// since there’s no global ordering across threads.
189    pub fn set_time(&self, time: UnixNanos) {
190        self.store(time.into(), Ordering::Release);
191    }
192
193    /// Increments the current time by `delta` nanoseconds and returns the *updated* value
194    /// (only meaningful in **static mode**).
195    ///
196    /// Uses `fetch_add` with [`Ordering::AcqRel`], ensuring that:
197    /// - The increment is atomic (no lost updates if multiple threads do increments).
198    /// - Other threads reading with [`Ordering::Acquire`] will see the incremented result.
199    ///
200    /// Typically used in single-threaded scenarios or coordinated concurrency in **static mode**,
201    /// since there’s no global ordering across threads.
202    pub fn increment_time(&self, delta: u64) -> UnixNanos {
203        UnixNanos::from(self.fetch_add(delta, Ordering::AcqRel) + delta)
204    }
205
206    /// Retrieves and updates the current “real-time” clock, returning a strictly increasing
207    /// timestamp based on system time.
208    ///
209    /// Internally:
210    /// - We fetch `now` from [`SystemTime::now()`].
211    /// - We do an atomic compare-and-exchange (using [`Ordering::AcqRel`]) to ensure the stored
212    ///   timestamp is never less than the last timestamp.
213    ///
214    /// This ensures:
215    /// 1. **Monotonic increments**: The returned timestamp is strictly greater than the previous
216    ///    one (by at least 1 nanosecond).
217    /// 2. **No backward jumps**: If the OS time moves backward, we ignore that shift to preserve
218    ///    monotonicity.
219    /// 3. **Visibility**: In a multi-threaded environment, other threads see the updated value
220    ///    once this compare-and-exchange completes.
221    ///
222    /// Note that under heavy contention (many threads calling this in tight loops), the CAS loop
223    /// may increase latency. If you need extremely high-frequency, concurrent updates, consider
224    /// using a more specialized approach or relaxing some ordering requirements.
225    pub fn time_since_epoch(&self) -> UnixNanos {
226        // This method guarantees strict consistency but may incur a performance cost under
227        // high contention due to retries in the `compare_exchange` loop.
228        let now = nanos_since_unix_epoch();
229        loop {
230            // Acquire to observe the latest stored value
231            let last = self.load(Ordering::Acquire);
232            let next = now.max(last + 1);
233            // AcqRel on success ensures this new value is published,
234            // Acquire on failure reloads if we lost a CAS race.
235            match self.compare_exchange(last, next, Ordering::AcqRel, Ordering::Acquire) {
236                Ok(_) => return UnixNanos::from(next),
237                Err(_) => continue,
238            }
239        }
240    }
241
242    /// Switches the clock to **real-time mode** (`realtime = true`).
243    ///
244    /// Uses [`Ordering::SeqCst`] for the mode store, which ensures a global ordering for the
245    /// mode switch if other threads also do `SeqCst` loads/stores of `realtime`.
246    /// Typically, switching modes is done infrequently, so the performance impact of `SeqCst`
247    /// here is acceptable.
248    pub fn make_realtime(&self) {
249        self.realtime.store(true, Ordering::SeqCst);
250    }
251
252    /// Switches the clock to **static mode** (`realtime = false`).
253    ///
254    /// Uses [`Ordering::SeqCst`] for the mode store, which ensures a global ordering for the
255    /// mode switch if other threads also do `SeqCst` loads/stores of `realtime`.
256    pub fn make_static(&self) {
257        self.realtime.store(false, Ordering::SeqCst);
258    }
259}
260
261////////////////////////////////////////////////////////////////////////////////
262// Tests
263////////////////////////////////////////////////////////////////////////////////
264#[cfg(test)]
265mod tests {
266    use std::sync::Arc;
267
268    use rstest::*;
269
270    use super::*;
271
272    #[rstest]
273    fn test_global_clocks_initialization() {
274        let realtime_clock = get_atomic_clock_realtime();
275        assert!(realtime_clock.get_time_ns().as_u64() > 0);
276
277        let static_clock = get_atomic_clock_static();
278        static_clock.set_time(UnixNanos::from(500_000_000)); // 500 ms
279        assert_eq!(static_clock.get_time_ns().as_u64(), 500_000_000);
280    }
281
282    #[rstest]
283    fn test_mode_switching() {
284        let time = AtomicTime::new(true, UnixNanos::default());
285
286        // Verify real-time mode
287        let realtime_ns = time.get_time_ns();
288        assert!(realtime_ns.as_u64() > 0);
289
290        // Switch to static mode
291        time.make_static();
292        time.set_time(UnixNanos::from(1_000_000_000)); // 1 second
293        let static_ns = time.get_time_ns();
294        assert_eq!(static_ns.as_u64(), 1_000_000_000);
295
296        // Switch back to real-time mode
297        time.make_realtime();
298        let new_realtime_ns = time.get_time_ns();
299        assert!(new_realtime_ns.as_u64() > static_ns.as_u64());
300    }
301
302    #[rstest]
303    fn test_mode_switching_concurrent() {
304        let clock = Arc::new(AtomicTime::new(true, UnixNanos::default()));
305        let num_threads = 4;
306        let iterations = 10000;
307        let mut handles = Vec::with_capacity(num_threads);
308
309        for _ in 0..num_threads {
310            let clock_clone = Arc::clone(&clock);
311            let handle = std::thread::spawn(move || {
312                for i in 0..iterations {
313                    if i % 2 == 0 {
314                        clock_clone.make_static();
315                    } else {
316                        clock_clone.make_realtime();
317                    }
318                    // Retrieve the time; we’re not asserting a particular value here,
319                    // but at least we’re exercising the mode switch logic under concurrency.
320                    let _ = clock_clone.get_time_ns();
321                }
322            });
323            handles.push(handle);
324        }
325
326        for handle in handles {
327            handle.join().unwrap();
328        }
329    }
330
331    #[rstest]
332    fn test_static_time_is_stable() {
333        // Create a clock in static mode with an initial value
334        let clock = AtomicTime::new(false, UnixNanos::from(42));
335        let time1 = clock.get_time_ns();
336
337        // Sleep a bit to give the system time to change, if the clock were using real-time
338        std::thread::sleep(std::time::Duration::from_millis(10));
339        let time2 = clock.get_time_ns();
340
341        // In static mode, the value should remain unchanged
342        assert_eq!(time1, time2);
343    }
344
345    #[rstest]
346    fn test_increment_time() {
347        // Start in static mode
348        let time = AtomicTime::new(false, UnixNanos::from(0));
349
350        let updated_time = time.increment_time(500);
351        assert_eq!(updated_time.as_u64(), 500);
352
353        let updated_time = time.increment_time(1_000);
354        assert_eq!(updated_time.as_u64(), 1_500);
355    }
356
357    #[rstest]
358    fn test_nanos_since_unix_epoch_vs_system_time() {
359        let unix_nanos = nanos_since_unix_epoch();
360        let system_ns = duration_since_unix_epoch().as_nanos() as u64;
361        assert!((unix_nanos as i64 - system_ns as i64).abs() < NANOSECONDS_IN_SECOND as i64);
362    }
363
364    #[rstest]
365    fn test_time_since_epoch_monotonicity() {
366        let clock = get_atomic_clock_realtime();
367        let mut previous = clock.time_since_epoch();
368        for _ in 0..1_000_000 {
369            let current = clock.time_since_epoch();
370            assert!(current > previous);
371            previous = current;
372        }
373    }
374
375    #[rstest]
376    fn test_time_since_epoch_strictly_increasing_concurrent() {
377        let time = Arc::new(AtomicTime::new(true, UnixNanos::default()));
378        let num_threads = 4;
379        let iterations = 100_000;
380        let mut handles = Vec::with_capacity(num_threads);
381
382        for thread_id in 0..num_threads {
383            let time_clone = Arc::clone(&time);
384
385            let handle = std::thread::spawn(move || {
386                let mut previous = time_clone.time_since_epoch().as_u64();
387
388                for i in 0..iterations {
389                    let current = time_clone.time_since_epoch().as_u64();
390                    assert!(
391                        current > previous,
392                        "Thread {thread_id}: iteration {i}: time did not increase: previous={previous}, current={current}",
393                    );
394                    previous = current;
395                }
396            });
397
398            handles.push(handle);
399        }
400
401        for handle in handles {
402            handle.join().unwrap();
403        }
404    }
405
406    #[rstest]
407    fn test_duration_since_unix_epoch() {
408        let time = AtomicTime::new(true, UnixNanos::default());
409        let duration = Duration::from_nanos(time.get_time_ns().into());
410        let now = SystemTime::now();
411
412        // Check if the duration is close to the actual difference between now and UNIX_EPOCH
413        let delta = now
414            .duration_since(UNIX_EPOCH)
415            .unwrap()
416            .checked_sub(duration);
417        assert!(delta.unwrap_or_default() < Duration::from_millis(100));
418
419        // Check if the duration is greater than a certain value (assuming the test is run after that point)
420        assert!(duration > Duration::from_secs(1_650_000_000));
421    }
422
423    #[rstest]
424    fn test_unix_timestamp_is_monotonic_increasing() {
425        let time = AtomicTime::new(true, UnixNanos::default());
426        let result1 = time.get_time();
427        let result2 = time.get_time();
428        let result3 = time.get_time();
429        let result4 = time.get_time();
430        let result5 = time.get_time();
431
432        assert!(result2 >= result1);
433        assert!(result3 >= result2);
434        assert!(result4 >= result3);
435        assert!(result5 >= result4);
436        assert!(result1 > 1_650_000_000.0);
437    }
438
439    #[rstest]
440    fn test_unix_timestamp_ms_is_monotonic_increasing() {
441        let time = AtomicTime::new(true, UnixNanos::default());
442        let result1 = time.get_time_ms();
443        let result2 = time.get_time_ms();
444        let result3 = time.get_time_ms();
445        let result4 = time.get_time_ms();
446        let result5 = time.get_time_ms();
447
448        assert!(result2 >= result1);
449        assert!(result3 >= result2);
450        assert!(result4 >= result3);
451        assert!(result5 >= result4);
452        assert!(result1 >= 1_650_000_000_000);
453    }
454
455    #[rstest]
456    fn test_unix_timestamp_us_is_monotonic_increasing() {
457        let time = AtomicTime::new(true, UnixNanos::default());
458        let result1 = time.get_time_us();
459        let result2 = time.get_time_us();
460        let result3 = time.get_time_us();
461        let result4 = time.get_time_us();
462        let result5 = time.get_time_us();
463
464        assert!(result2 >= result1);
465        assert!(result3 >= result2);
466        assert!(result4 >= result3);
467        assert!(result5 >= result4);
468        assert!(result1 > 1_650_000_000_000_000);
469    }
470
471    #[rstest]
472    fn test_unix_timestamp_ns_is_monotonic_increasing() {
473        let time = AtomicTime::new(true, UnixNanos::default());
474        let result1 = time.get_time_ns();
475        let result2 = time.get_time_ns();
476        let result3 = time.get_time_ns();
477        let result4 = time.get_time_ns();
478        let result5 = time.get_time_ns();
479
480        assert!(result2 >= result1);
481        assert!(result3 >= result2);
482        assert!(result4 >= result3);
483        assert!(result5 >= result4);
484        assert!(result1.as_u64() > 1_650_000_000_000_000_000);
485    }
486}