nautilus_core/
time.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! The core `AtomicTime` for real-time and static clocks.
17//!
18//! This module provides an atomic time abstraction that supports both real-time and static
19//! clocks. It ensures thread-safe operations and monotonic time retrieval with nanosecond precision.
20//!
21//! # Modes
22//!
23//! - **Real-time mode:** The clock continuously syncs with system wall-clock time (via
24//!   [`SystemTime::now()`]). To ensure strict monotonic increments across multiple threads,
25//!   the internal updates use an atomic compare-and-exchange loop (`time_since_epoch`).
26//!   While this guarantees that every new timestamp is at least one nanosecond greater than the
27//!   last, it may introduce higher contention if many threads call it heavily.
28//!
29//! - **Static mode:** The clock is manually controlled via [`AtomicTime::set_time`] or [`AtomicTime::increment_time`],
30//!   which can be useful for simulations or backtesting. You can switch modes at runtime using
31//!   [`AtomicTime::make_realtime`] or [`AtomicTime::make_static`]. In **static mode**, we use
32//!   acquire/release semantics so that updates from one thread can be observed by another;
33//!   however, we do not enforce strict global ordering for manual updates. If you need strong,
34//!   multi-threaded ordering in **static mode**, you must coordinate higher-level synchronization yourself.
35
36use std::{
37    ops::Deref,
38    sync::{
39        OnceLock,
40        atomic::{AtomicBool, AtomicU64, Ordering},
41    },
42    time::{Duration, SystemTime, UNIX_EPOCH},
43};
44
45use crate::{
46    UnixNanos,
47    datetime::{NANOSECONDS_IN_MICROSECOND, NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND},
48};
49
50/// Global atomic time in **real-time mode** for use across the system.
51///
52/// This clock operates in **real-time mode**, synchronizing with the system clock.
53/// It provides globally unique, strictly increasing timestamps across threads.
54pub static ATOMIC_CLOCK_REALTIME: OnceLock<AtomicTime> = OnceLock::new();
55
56/// Global atomic time in **static mode** for use across the system.
57///
58/// This clock operates in **static mode**, where the time value can be set or incremented
59/// manually. Useful for backtesting or simulated time control.
60pub static ATOMIC_CLOCK_STATIC: OnceLock<AtomicTime> = OnceLock::new();
61
62/// Returns a static reference to the global atomic clock in **real-time mode**.
63///
64/// This clock uses [`AtomicTime::time_since_epoch`] under the hood, ensuring strictly increasing
65/// timestamps across threads.
66pub fn get_atomic_clock_realtime() -> &'static AtomicTime {
67    ATOMIC_CLOCK_REALTIME.get_or_init(AtomicTime::default)
68}
69
70/// Returns a static reference to the global atomic clock in **static mode**.
71///
72/// This clock allows manual time control via [`AtomicTime::set_time`] or [`AtomicTime::increment_time`],
73/// and does not automatically sync with system time.
74pub fn get_atomic_clock_static() -> &'static AtomicTime {
75    ATOMIC_CLOCK_STATIC.get_or_init(|| AtomicTime::new(false, UnixNanos::default()))
76}
77
78/// Returns the duration since the UNIX epoch based on [`SystemTime::now()`].
79///
80/// # Panics
81///
82/// Panics if the system time is set before the UNIX epoch.
83#[inline(always)]
84#[must_use]
85pub fn duration_since_unix_epoch() -> Duration {
86    // SAFETY: The expect() is acceptable here because:
87    // - SystemTime failure indicates catastrophic system clock issues
88    // - This would affect the entire application's ability to function
89    // - Alternative error handling would complicate all time-dependent code paths
90    // - Such failures are extremely rare in practice and indicate hardware/OS problems
91    SystemTime::now()
92        .duration_since(UNIX_EPOCH)
93        .expect("Error calling `SystemTime`")
94}
95
96/// Returns the current UNIX time in nanoseconds, based on [`SystemTime::now()`].
97///
98/// # Panics
99///
100/// Panics if the duration in nanoseconds exceeds `u64::MAX`.
101#[inline(always)]
102#[must_use]
103pub fn nanos_since_unix_epoch() -> u64 {
104    let ns = duration_since_unix_epoch().as_nanos();
105    assert!(
106        ns <= u128::from(u64::MAX),
107        "System time overflow: value exceeds u64::MAX nanoseconds"
108    );
109    ns as u64
110}
111
112/// Represents an atomic timekeeping structure.
113///
114/// [`AtomicTime`] can act as a real-time clock or static clock based on its mode.
115/// It uses an [`AtomicU64`] to atomically update the value using only immutable
116/// references.
117///
118/// The `realtime` flag indicates which mode the clock is currently in.
119/// For concurrency, this struct uses atomic operations with appropriate memory orderings:
120/// - **Acquire/Release** for reading/writing in **static mode**.
121/// - **Compare-and-exchange (`AcqRel`)** in real-time mode to guarantee monotonic increments.
122#[repr(C)]
123#[derive(Debug)]
124pub struct AtomicTime {
125    /// Indicates whether the clock is operating in **real-time mode** (`true`) or **static mode** (`false`)
126    pub realtime: AtomicBool,
127    /// The last recorded time (in UNIX nanoseconds). Updated atomically with compare-and-exchange
128    /// in **real-time mode**, or simple store/fetch in **static mode**.
129    pub timestamp_ns: AtomicU64,
130}
131
132impl Deref for AtomicTime {
133    type Target = AtomicU64;
134
135    fn deref(&self) -> &Self::Target {
136        &self.timestamp_ns
137    }
138}
139
140impl Default for AtomicTime {
141    /// Creates a new default [`AtomicTime`] instance in **real-time mode**, starting at the current system time.
142    fn default() -> Self {
143        Self::new(true, UnixNanos::default())
144    }
145}
146
147impl AtomicTime {
148    /// Creates a new [`AtomicTime`] instance.
149    ///
150    /// - If `realtime` is `true`, the provided `time` is used only as an initial placeholder
151    ///   and will quickly be overridden by calls to [`AtomicTime::time_since_epoch`].
152    /// - If `realtime` is `false`, this clock starts in **static mode**, with the given `time`
153    ///   as its current value.
154    #[must_use]
155    pub fn new(realtime: bool, time: UnixNanos) -> Self {
156        Self {
157            realtime: AtomicBool::new(realtime),
158            timestamp_ns: AtomicU64::new(time.into()),
159        }
160    }
161
162    /// Returns the current time in nanoseconds, based on the clock’s mode.
163    ///
164    /// - In **real-time mode**, calls [`AtomicTime::time_since_epoch`], ensuring strictly increasing
165    ///   timestamps across threads, using `AcqRel` semantics for the underlying atomic.
166    /// - In **static mode**, reads the stored time using [`Ordering::Acquire`]. Updates by other
167    ///   threads using [`AtomicTime::set_time`] or [`AtomicTime::increment_time`] (Release/AcqRel)
168    ///   will be visible here.
169    #[must_use]
170    pub fn get_time_ns(&self) -> UnixNanos {
171        if self.realtime.load(Ordering::Acquire) {
172            self.time_since_epoch()
173        } else {
174            UnixNanos::from(self.timestamp_ns.load(Ordering::Acquire))
175        }
176    }
177
178    /// Returns the current time as microseconds.
179    #[must_use]
180    pub fn get_time_us(&self) -> u64 {
181        self.get_time_ns().as_u64() / NANOSECONDS_IN_MICROSECOND
182    }
183
184    /// Returns the current time as milliseconds.
185    #[must_use]
186    pub fn get_time_ms(&self) -> u64 {
187        self.get_time_ns().as_u64() / NANOSECONDS_IN_MILLISECOND
188    }
189
190    /// Returns the current time as seconds.
191    #[must_use]
192    #[allow(
193        clippy::cast_precision_loss,
194        reason = "Precision loss acceptable for time conversion"
195    )]
196    pub fn get_time(&self) -> f64 {
197        self.get_time_ns().as_f64() / (NANOSECONDS_IN_SECOND as f64)
198    }
199
200    /// Manually sets a new time for the clock (only meaningful in **static mode**).
201    ///
202    /// This uses an atomic store with [`Ordering::Release`], so any thread reading with
203    /// [`Ordering::Acquire`] will see the updated time. This does *not* enforce a total ordering
204    /// among all threads, but is enough to ensure that once a thread sees this update, it also
205    /// sees all writes made before this call in the writing thread.
206    ///
207    /// Typically used in single-threaded scenarios or coordinated concurrency in **static mode**,
208    /// since there’s no global ordering across threads.
209    ///
210    /// # Panics
211    ///
212    /// Panics if invoked when in real-time mode.
213    pub fn set_time(&self, time: UnixNanos) {
214        assert!(
215            !self.realtime.load(Ordering::Acquire),
216            "Cannot set time while clock is in realtime mode"
217        );
218
219        self.store(time.into(), Ordering::Release);
220    }
221
222    /// Increments the current (static-mode) time by `delta` nanoseconds and returns the updated value.
223    ///
224    /// Internally this uses [`AtomicU64::fetch_update`] with [`Ordering::AcqRel`] to ensure the increment is
225    /// atomic and visible to readers using `Acquire` loads.
226    ///
227    /// # Errors
228    ///
229    /// Returns an error if the increment would overflow `u64::MAX` or if called
230    /// while the clock is in real-time mode.
231    pub fn increment_time(&self, delta: u64) -> anyhow::Result<UnixNanos> {
232        anyhow::ensure!(
233            !self.realtime.load(Ordering::Acquire),
234            "Cannot increment time while clock is in realtime mode"
235        );
236
237        let previous =
238            match self
239                .timestamp_ns
240                .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
241                    current.checked_add(delta)
242                }) {
243                Ok(prev) => prev,
244                Err(_) => anyhow::bail!("Cannot increment time beyond u64::MAX"),
245            };
246
247        Ok(UnixNanos::from(previous + delta))
248    }
249
250    /// Retrieves and updates the current “real-time” clock, returning a strictly increasing
251    /// timestamp based on system time.
252    ///
253    /// Internally:
254    /// - We fetch `now` from [`SystemTime::now()`].
255    /// - We do an atomic compare-and-exchange (using [`Ordering::AcqRel`]) to ensure the stored
256    ///   timestamp is never less than the last timestamp.
257    ///
258    /// This ensures:
259    /// 1. **Monotonic increments**: The returned timestamp is strictly greater than the previous
260    ///    one (by at least 1 nanosecond).
261    /// 2. **No backward jumps**: If the OS time moves backward, we ignore that shift to preserve
262    ///    monotonicity.
263    /// 3. **Visibility**: In a multi-threaded environment, other threads see the updated value
264    ///    once this compare-and-exchange completes.
265    ///
266    /// # Panics
267    ///
268    /// Panics if the internal counter has reached `u64::MAX`, which would indicate the process has
269    /// been running for longer than the representable range (~584 years) *or* the clock was
270    /// manually corrupted.
271    pub fn time_since_epoch(&self) -> UnixNanos {
272        // This method guarantees strict consistency but may incur a performance cost under
273        // high contention due to retries in the `compare_exchange` loop.
274        let now = nanos_since_unix_epoch();
275        loop {
276            // Acquire to observe the latest stored value
277            let last = self.load(Ordering::Acquire);
278            // Ensure we never wrap past u64::MAX – treat that as a fatal error
279            let incremented = last
280                .checked_add(1)
281                .expect("AtomicTime overflow: reached u64::MAX");
282            let next = now.max(incremented);
283            // AcqRel on success ensures this new value is published,
284            // Acquire on failure reloads if we lost a CAS race.
285            //
286            // Note that under heavy contention (many threads calling this in tight loops),
287            // the CAS loop may increase latency.
288            //
289            // However, in practice, the loop terminates quickly because:
290            // - System time naturally advances between iterations
291            // - Each iteration increments time by at least 1ns, preventing ABA problems
292            // - True contention requiring retry is rare in normal usage patterns
293            //
294            // The concurrent stress test (4 threads × 100k iterations) validates this approach.
295            if self
296                .compare_exchange(last, next, Ordering::AcqRel, Ordering::Acquire)
297                .is_ok()
298            {
299                return UnixNanos::from(next);
300            }
301        }
302    }
303
304    /// Switches the clock to **real-time mode** (`realtime = true`).
305    ///
306    /// Uses [`Ordering::SeqCst`] for the mode store, which ensures a global ordering for the
307    /// mode switch if other threads also do `SeqCst` loads/stores of `realtime`.
308    /// Typically, switching modes is done infrequently, so the performance impact of `SeqCst`
309    /// here is acceptable.
310    pub fn make_realtime(&self) {
311        self.realtime.store(true, Ordering::SeqCst);
312    }
313
314    /// Switches the clock to **static mode** (`realtime = false`).
315    ///
316    /// Uses [`Ordering::SeqCst`] for the mode store, which ensures a global ordering for the
317    /// mode switch if other threads also do `SeqCst` loads/stores of `realtime`.
318    pub fn make_static(&self) {
319        self.realtime.store(false, Ordering::SeqCst);
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use std::sync::Arc;
326
327    use rstest::*;
328
329    use super::*;
330
331    #[rstest]
332    fn test_global_clocks_initialization() {
333        let realtime_clock = get_atomic_clock_realtime();
334        assert!(realtime_clock.get_time_ns().as_u64() > 0);
335
336        let static_clock = get_atomic_clock_static();
337        static_clock.set_time(UnixNanos::from(500_000_000)); // 500 ms
338        assert_eq!(static_clock.get_time_ns().as_u64(), 500_000_000);
339    }
340
341    #[rstest]
342    fn test_mode_switching() {
343        let time = AtomicTime::new(true, UnixNanos::default());
344
345        // Verify real-time mode
346        let realtime_ns = time.get_time_ns();
347        assert!(realtime_ns.as_u64() > 0);
348
349        // Switch to static mode
350        time.make_static();
351        time.set_time(UnixNanos::from(1_000_000_000)); // 1 second
352        let static_ns = time.get_time_ns();
353        assert_eq!(static_ns.as_u64(), 1_000_000_000);
354
355        // Switch back to real-time mode
356        time.make_realtime();
357        let new_realtime_ns = time.get_time_ns();
358        assert!(new_realtime_ns.as_u64() > static_ns.as_u64());
359    }
360
361    #[rstest]
362    #[should_panic(expected = "Cannot set time while clock is in realtime mode")]
363    fn test_set_time_panics_in_realtime_mode() {
364        let clock = AtomicTime::new(true, UnixNanos::default());
365        clock.set_time(UnixNanos::from(123));
366    }
367
368    #[rstest]
369    fn test_increment_time_returns_error_in_realtime_mode() {
370        let clock = AtomicTime::new(true, UnixNanos::default());
371        let result = clock.increment_time(1);
372        assert!(result.is_err());
373        assert!(
374            result
375                .unwrap_err()
376                .to_string()
377                .contains("Cannot increment time while clock is in realtime mode")
378        );
379    }
380
381    #[rstest]
382    #[should_panic(expected = "AtomicTime overflow")]
383    fn test_time_since_epoch_overflow_panics() {
384        use std::sync::atomic::{AtomicBool, AtomicU64};
385
386        // Manually construct a clock with the counter already at u64::MAX
387        let clock = AtomicTime {
388            realtime: AtomicBool::new(true),
389            timestamp_ns: AtomicU64::new(u64::MAX),
390        };
391
392        // This call will attempt to add 1 and must panic
393        let _ = clock.time_since_epoch();
394    }
395
396    #[rstest]
397    fn test_mode_switching_concurrent() {
398        let clock = Arc::new(AtomicTime::new(true, UnixNanos::default()));
399        let num_threads = 4;
400        let iterations = 10000;
401        let mut handles = Vec::with_capacity(num_threads);
402
403        for _ in 0..num_threads {
404            let clock_clone = Arc::clone(&clock);
405            let handle = std::thread::spawn(move || {
406                for i in 0..iterations {
407                    if i % 2 == 0 {
408                        clock_clone.make_static();
409                    } else {
410                        clock_clone.make_realtime();
411                    }
412                    // Retrieve the time; we’re not asserting a particular value here,
413                    // but at least we’re exercising the mode switch logic under concurrency.
414                    let _ = clock_clone.get_time_ns();
415                }
416            });
417            handles.push(handle);
418        }
419
420        for handle in handles {
421            handle.join().unwrap();
422        }
423    }
424
425    #[rstest]
426    fn test_static_time_is_stable() {
427        // Create a clock in static mode with an initial value
428        let clock = AtomicTime::new(false, UnixNanos::from(42));
429        let time1 = clock.get_time_ns();
430
431        // Sleep a bit to give the system time to change, if the clock were using real-time
432        std::thread::sleep(std::time::Duration::from_millis(10));
433        let time2 = clock.get_time_ns();
434
435        // In static mode, the value should remain unchanged
436        assert_eq!(time1, time2);
437    }
438
439    #[rstest]
440    fn test_increment_time() {
441        // Start in static mode
442        let time = AtomicTime::new(false, UnixNanos::from(0));
443
444        let updated_time = time.increment_time(500).unwrap();
445        assert_eq!(updated_time.as_u64(), 500);
446
447        let updated_time = time.increment_time(1_000).unwrap();
448        assert_eq!(updated_time.as_u64(), 1_500);
449    }
450
451    #[rstest]
452    fn test_increment_time_overflow_errors() {
453        let time = AtomicTime::new(false, UnixNanos::from(u64::MAX - 5));
454
455        let err = time.increment_time(10).unwrap_err();
456        assert_eq!(err.to_string(), "Cannot increment time beyond u64::MAX");
457    }
458
459    #[rstest]
460    #[allow(
461        clippy::cast_possible_truncation,
462        clippy::cast_possible_wrap,
463        reason = "Intentional cast for Python interop"
464    )]
465    fn test_nanos_since_unix_epoch_vs_system_time() {
466        let unix_nanos = nanos_since_unix_epoch();
467        let system_ns = duration_since_unix_epoch().as_nanos() as u64;
468        assert!((unix_nanos as i64 - system_ns as i64).abs() < NANOSECONDS_IN_SECOND as i64);
469    }
470
471    #[rstest]
472    fn test_time_since_epoch_monotonicity() {
473        let clock = get_atomic_clock_realtime();
474        let mut previous = clock.time_since_epoch();
475        for _ in 0..1_000_000 {
476            let current = clock.time_since_epoch();
477            assert!(current > previous);
478            previous = current;
479        }
480    }
481
482    #[rstest]
483    fn test_time_since_epoch_strictly_increasing_concurrent() {
484        let time = Arc::new(AtomicTime::new(true, UnixNanos::default()));
485        let num_threads = 4;
486        let iterations = 100_000;
487        let mut handles = Vec::with_capacity(num_threads);
488
489        for thread_id in 0..num_threads {
490            let time_clone = Arc::clone(&time);
491
492            let handle = std::thread::spawn(move || {
493                let mut previous = time_clone.time_since_epoch().as_u64();
494
495                for i in 0..iterations {
496                    let current = time_clone.time_since_epoch().as_u64();
497                    assert!(
498                        current > previous,
499                        "Thread {thread_id}: iteration {i}: time did not increase: previous={previous}, current={current}",
500                    );
501                    previous = current;
502                }
503            });
504
505            handles.push(handle);
506        }
507
508        for handle in handles {
509            handle.join().unwrap();
510        }
511    }
512
513    #[rstest]
514    fn test_duration_since_unix_epoch() {
515        let time = AtomicTime::new(true, UnixNanos::default());
516        let duration = Duration::from_nanos(time.get_time_ns().into());
517        let now = SystemTime::now();
518
519        // Check if the duration is close to the actual difference between now and UNIX_EPOCH
520        let delta = now
521            .duration_since(UNIX_EPOCH)
522            .unwrap()
523            .checked_sub(duration);
524        assert!(delta.unwrap_or_default() < Duration::from_millis(100));
525
526        // Check if the duration is greater than a certain value (assuming the test is run after that point)
527        assert!(duration > Duration::from_secs(1_650_000_000));
528    }
529
530    #[rstest]
531    fn test_unix_timestamp_is_monotonic_increasing() {
532        let time = AtomicTime::new(true, UnixNanos::default());
533        let result1 = time.get_time();
534        let result2 = time.get_time();
535        let result3 = time.get_time();
536        let result4 = time.get_time();
537        let result5 = time.get_time();
538
539        assert!(result2 >= result1);
540        assert!(result3 >= result2);
541        assert!(result4 >= result3);
542        assert!(result5 >= result4);
543        assert!(result1 > 1_650_000_000.0);
544    }
545
546    #[rstest]
547    fn test_unix_timestamp_ms_is_monotonic_increasing() {
548        let time = AtomicTime::new(true, UnixNanos::default());
549        let result1 = time.get_time_ms();
550        let result2 = time.get_time_ms();
551        let result3 = time.get_time_ms();
552        let result4 = time.get_time_ms();
553        let result5 = time.get_time_ms();
554
555        assert!(result2 >= result1);
556        assert!(result3 >= result2);
557        assert!(result4 >= result3);
558        assert!(result5 >= result4);
559        assert!(result1 >= 1_650_000_000_000);
560    }
561
562    #[rstest]
563    fn test_unix_timestamp_us_is_monotonic_increasing() {
564        let time = AtomicTime::new(true, UnixNanos::default());
565        let result1 = time.get_time_us();
566        let result2 = time.get_time_us();
567        let result3 = time.get_time_us();
568        let result4 = time.get_time_us();
569        let result5 = time.get_time_us();
570
571        assert!(result2 >= result1);
572        assert!(result3 >= result2);
573        assert!(result4 >= result3);
574        assert!(result5 >= result4);
575        assert!(result1 > 1_650_000_000_000_000);
576    }
577
578    #[rstest]
579    fn test_unix_timestamp_ns_is_monotonic_increasing() {
580        let time = AtomicTime::new(true, UnixNanos::default());
581        let result1 = time.get_time_ns();
582        let result2 = time.get_time_ns();
583        let result3 = time.get_time_ns();
584        let result4 = time.get_time_ns();
585        let result5 = time.get_time_ns();
586
587        assert!(result2 >= result1);
588        assert!(result3 >= result2);
589        assert!(result4 >= result3);
590        assert!(result5 >= result4);
591        assert!(result1.as_u64() > 1_650_000_000_000_000_000);
592    }
593
594    #[rstest]
595    fn test_acquire_release_contract_static_mode() {
596        // This test explicitly proves the Acquire/Release memory ordering contract:
597        // - Writer thread uses set_time() which does Release store (see AtomicTime::set_time)
598        // - Reader thread uses get_time_ns() which does Acquire load (see AtomicTime::get_time_ns)
599        // - The Release-Acquire pair ensures all writes before Release are visible after Acquire
600
601        let clock = Arc::new(AtomicTime::new(false, UnixNanos::from(0)));
602        let aux_data = Arc::new(AtomicU64::new(0));
603        let done = Arc::new(AtomicBool::new(false));
604
605        // Writer thread: updates auxiliary data, then releases via set_time
606        let writer_clock = Arc::clone(&clock);
607        let writer_aux = Arc::clone(&aux_data);
608        let writer_done = Arc::clone(&done);
609
610        let writer = std::thread::spawn(move || {
611            for i in 1..=1_000u64 {
612                writer_aux.store(i, Ordering::Relaxed);
613
614                // Release store via set_time creates a release fence - all prior writes (including aux_data)
615                // must be visible to any thread that observes this time value via Acquire load
616                writer_clock.set_time(UnixNanos::from(i * 1000));
617
618                // Yield to encourage interleaving
619                std::thread::yield_now();
620            }
621            writer_done.store(true, Ordering::Release);
622        });
623
624        // Reader thread: acquires via get_time_ns, then checks auxiliary data
625        let reader_clock = Arc::clone(&clock);
626        let reader_aux = Arc::clone(&aux_data);
627        let reader_done = Arc::clone(&done);
628
629        let reader = std::thread::spawn(move || {
630            let mut last_time = 0u64;
631            let mut max_aux_seen = 0u64;
632
633            // Poll until writer is done, with no iteration limit
634            while !reader_done.load(Ordering::Acquire) {
635                let current_time = reader_clock.get_time_ns().as_u64();
636
637                if current_time > last_time {
638                    // The Acquire in get_time_ns synchronizes with the Release in set_time,
639                    // making aux_data visible
640                    let aux_value = reader_aux.load(Ordering::Relaxed);
641
642                    // Invariant: aux_value must never go backwards (proves Release-Acquire sync works)
643                    if aux_value > 0 {
644                        assert!(
645                            aux_value >= max_aux_seen,
646                            "Acquire/Release contract violated: aux went backwards from {max_aux_seen} to {aux_value}"
647                        );
648                        max_aux_seen = aux_value;
649                    }
650
651                    last_time = current_time;
652                }
653
654                std::thread::yield_now();
655            }
656
657            // Check final state after writer completes to ensure we observe updates
658            let final_time = reader_clock.get_time_ns().as_u64();
659            if final_time > last_time {
660                let final_aux = reader_aux.load(Ordering::Relaxed);
661                if final_aux > 0 {
662                    assert!(
663                        final_aux >= max_aux_seen,
664                        "Acquire/Release contract violated: final aux {final_aux} < max {max_aux_seen}"
665                    );
666                    max_aux_seen = final_aux;
667                }
668            }
669
670            max_aux_seen
671        });
672
673        writer.join().unwrap();
674        let max_observed = reader.join().unwrap();
675
676        // Ensure the reader actually observed updates (not vacuously satisfied)
677        assert!(max_observed > 0, "Reader must observe writer updates");
678    }
679
680    #[rstest]
681    fn test_acquire_release_contract_increment_time() {
682        // Similar test for increment_time, which uses fetch_update with AcqRel (see AtomicTime::increment_time)
683
684        let clock = Arc::new(AtomicTime::new(false, UnixNanos::from(0)));
685        let aux_data = Arc::new(AtomicU64::new(0));
686        let done = Arc::new(AtomicBool::new(false));
687
688        let writer_clock = Arc::clone(&clock);
689        let writer_aux = Arc::clone(&aux_data);
690        let writer_done = Arc::clone(&done);
691
692        let writer = std::thread::spawn(move || {
693            for i in 1..=1_000u64 {
694                writer_aux.store(i, Ordering::Relaxed);
695                let _ = writer_clock.increment_time(1000).unwrap();
696                std::thread::yield_now();
697            }
698            writer_done.store(true, Ordering::Release);
699        });
700
701        let reader_clock = Arc::clone(&clock);
702        let reader_aux = Arc::clone(&aux_data);
703        let reader_done = Arc::clone(&done);
704
705        let reader = std::thread::spawn(move || {
706            let mut last_time = 0u64;
707            let mut max_aux = 0u64;
708
709            // Poll until writer is done, with no iteration limit
710            while !reader_done.load(Ordering::Acquire) {
711                let current_time = reader_clock.get_time_ns().as_u64();
712
713                if current_time > last_time {
714                    let aux_value = reader_aux.load(Ordering::Relaxed);
715
716                    // Invariant: aux_value must never regress (proves AcqRel sync works)
717                    if aux_value > 0 {
718                        assert!(
719                            aux_value >= max_aux,
720                            "AcqRel contract violated: aux regressed from {max_aux} to {aux_value}"
721                        );
722                        max_aux = aux_value;
723                    }
724
725                    last_time = current_time;
726                }
727
728                std::thread::yield_now();
729            }
730
731            // Check final state after writer completes to ensure we observe updates
732            let final_time = reader_clock.get_time_ns().as_u64();
733            if final_time > last_time {
734                let final_aux = reader_aux.load(Ordering::Relaxed);
735                if final_aux > 0 {
736                    assert!(
737                        final_aux >= max_aux,
738                        "AcqRel contract violated: final aux {final_aux} < max {max_aux}"
739                    );
740                    max_aux = final_aux;
741                }
742            }
743
744            max_aux
745        });
746
747        writer.join().unwrap();
748        let max_observed = reader.join().unwrap();
749
750        // Ensure the reader actually observed updates (not vacuously satisfied)
751        assert!(max_observed > 0, "Reader must observe writer updates");
752    }
753}