nautilus_core/time.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! The core `AtomicTime` for real-time and static clocks.
17//!
18//! This module provides an atomic time abstraction that supports both real-time and static
19//! clocks. It ensures thread-safe operations and monotonic time retrieval with nanosecond precision.
20//!
21//! # Modes
22//!
23//! - **Real-time mode:** The clock continuously syncs with system wall-clock time (via
24//! [`SystemTime::now()`]). To ensure strict monotonic increments across multiple threads,
25//! the internal updates use an atomic compare-and-exchange loop (`time_since_epoch`).
26//! While this guarantees that every new timestamp is at least one nanosecond greater than the
27//! last, it may introduce higher contention if many threads call it heavily.
28//!
29//! - **Static mode:** The clock is manually controlled via [`AtomicTime::set_time`] or [`AtomicTime::increment_time`],
30//! which can be useful for simulations or backtesting. You can switch modes at runtime using
31//! [`AtomicTime::make_realtime`] or [`AtomicTime::make_static`]. In **static mode**, we use
32//! acquire/release semantics so that updates from one thread can be observed by another;
33//! however, we do not enforce strict global ordering for manual updates. If you need strong,
34//! multi-threaded ordering in **static mode**, you must coordinate higher-level synchronization yourself.
35
36use std::{
37 ops::Deref,
38 sync::{
39 OnceLock,
40 atomic::{AtomicBool, AtomicU64, Ordering},
41 },
42 time::{Duration, SystemTime, UNIX_EPOCH},
43};
44
45use crate::{
46 UnixNanos,
47 datetime::{NANOSECONDS_IN_MICROSECOND, NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND},
48};
49
50/// Global atomic time in **real-time mode** for use across the system.
51///
52/// This clock operates in **real-time mode**, synchronizing with the system clock.
53/// It provides globally unique, strictly increasing timestamps across threads.
54pub static ATOMIC_CLOCK_REALTIME: OnceLock<AtomicTime> = OnceLock::new();
55
56/// Global atomic time in **static mode** for use across the system.
57///
58/// This clock operates in **static mode**, where the time value can be set or incremented
59/// manually. Useful for backtesting or simulated time control.
60pub static ATOMIC_CLOCK_STATIC: OnceLock<AtomicTime> = OnceLock::new();
61
62/// Returns a static reference to the global atomic clock in **real-time mode**.
63///
64/// This clock uses [`AtomicTime::time_since_epoch`] under the hood, ensuring strictly increasing
65/// timestamps across threads.
66pub fn get_atomic_clock_realtime() -> &'static AtomicTime {
67 ATOMIC_CLOCK_REALTIME.get_or_init(AtomicTime::default)
68}
69
70/// Returns a static reference to the global atomic clock in **static mode**.
71///
72/// This clock allows manual time control via [`AtomicTime::set_time`] or [`AtomicTime::increment_time`],
73/// and does not automatically sync with system time.
74pub fn get_atomic_clock_static() -> &'static AtomicTime {
75 ATOMIC_CLOCK_STATIC.get_or_init(|| AtomicTime::new(false, UnixNanos::default()))
76}
77
78/// Returns the duration since the UNIX epoch based on [`SystemTime::now()`].
79///
80/// # Panics
81///
82/// Panics if the system time is set before the UNIX epoch.
83#[inline(always)]
84#[must_use]
85pub fn duration_since_unix_epoch() -> Duration {
86 // SAFETY: The expect() is acceptable here because:
87 // - SystemTime failure indicates catastrophic system clock issues
88 // - This would affect the entire application's ability to function
89 // - Alternative error handling would complicate all time-dependent code paths
90 // - Such failures are extremely rare in practice and indicate hardware/OS problems
91 SystemTime::now()
92 .duration_since(UNIX_EPOCH)
93 .expect("Error calling `SystemTime`")
94}
95
96/// Returns the current UNIX time in nanoseconds, based on [`SystemTime::now()`].
97///
98/// # Panics
99///
100/// Panics if the duration in nanoseconds exceeds `u64::MAX`.
101#[inline(always)]
102#[must_use]
103pub fn nanos_since_unix_epoch() -> u64 {
104 let ns = duration_since_unix_epoch().as_nanos();
105 assert!(
106 ns <= u128::from(u64::MAX),
107 "System time overflow: value exceeds u64::MAX nanoseconds"
108 );
109 ns as u64
110}
111
112/// Represents an atomic timekeeping structure.
113///
114/// [`AtomicTime`] can act as a real-time clock or static clock based on its mode.
115/// It uses an [`AtomicU64`] to atomically update the value using only immutable
116/// references.
117///
118/// The `realtime` flag indicates which mode the clock is currently in.
119/// For concurrency, this struct uses atomic operations with appropriate memory orderings:
120/// - **Acquire/Release** for reading/writing in **static mode**.
121/// - **Compare-and-exchange (`AcqRel`)** in real-time mode to guarantee monotonic increments.
122#[repr(C)]
123#[derive(Debug)]
124pub struct AtomicTime {
125 /// Indicates whether the clock is operating in **real-time mode** (`true`) or **static mode** (`false`)
126 pub realtime: AtomicBool,
127 /// The last recorded time (in UNIX nanoseconds). Updated atomically with compare-and-exchange
128 /// in **real-time mode**, or simple store/fetch in **static mode**.
129 pub timestamp_ns: AtomicU64,
130}
131
132impl Deref for AtomicTime {
133 type Target = AtomicU64;
134
135 fn deref(&self) -> &Self::Target {
136 &self.timestamp_ns
137 }
138}
139
140impl Default for AtomicTime {
141 /// Creates a new default [`AtomicTime`] instance in **real-time mode**, starting at the current system time.
142 fn default() -> Self {
143 Self::new(true, UnixNanos::default())
144 }
145}
146
147impl AtomicTime {
148 /// Creates a new [`AtomicTime`] instance.
149 ///
150 /// - If `realtime` is `true`, the provided `time` is used only as an initial placeholder
151 /// and will quickly be overridden by calls to [`AtomicTime::time_since_epoch`].
152 /// - If `realtime` is `false`, this clock starts in **static mode**, with the given `time`
153 /// as its current value.
154 #[must_use]
155 pub fn new(realtime: bool, time: UnixNanos) -> Self {
156 Self {
157 realtime: AtomicBool::new(realtime),
158 timestamp_ns: AtomicU64::new(time.into()),
159 }
160 }
161
162 /// Returns the current time in nanoseconds, based on the clock’s mode.
163 ///
164 /// - In **real-time mode**, calls [`AtomicTime::time_since_epoch`], ensuring strictly increasing
165 /// timestamps across threads, using `AcqRel` semantics for the underlying atomic.
166 /// - In **static mode**, reads the stored time using [`Ordering::Acquire`]. Updates by other
167 /// threads using [`AtomicTime::set_time`] or [`AtomicTime::increment_time`] (Release/AcqRel)
168 /// will be visible here.
169 #[must_use]
170 pub fn get_time_ns(&self) -> UnixNanos {
171 if self.realtime.load(Ordering::Acquire) {
172 self.time_since_epoch()
173 } else {
174 UnixNanos::from(self.timestamp_ns.load(Ordering::Acquire))
175 }
176 }
177
178 /// Returns the current time as microseconds.
179 #[must_use]
180 pub fn get_time_us(&self) -> u64 {
181 self.get_time_ns().as_u64() / NANOSECONDS_IN_MICROSECOND
182 }
183
184 /// Returns the current time as milliseconds.
185 #[must_use]
186 pub fn get_time_ms(&self) -> u64 {
187 self.get_time_ns().as_u64() / NANOSECONDS_IN_MILLISECOND
188 }
189
190 /// Returns the current time as seconds.
191 #[must_use]
192 #[allow(
193 clippy::cast_precision_loss,
194 reason = "Precision loss acceptable for time conversion"
195 )]
196 pub fn get_time(&self) -> f64 {
197 self.get_time_ns().as_f64() / (NANOSECONDS_IN_SECOND as f64)
198 }
199
200 /// Manually sets a new time for the clock (only possible in **static mode**).
201 ///
202 /// This uses an atomic store with [`Ordering::Release`], so any thread reading with
203 /// [`Ordering::Acquire`] will see the updated time. This does *not* enforce a total ordering
204 /// among all threads, but is enough to ensure that once a thread sees this update, it also
205 /// sees all writes made before this call in the writing thread.
206 ///
207 /// Typically used in single-threaded scenarios or coordinated concurrency in **static mode**,
208 /// since there's no global ordering across threads.
209 ///
210 /// # Panics
211 ///
212 /// Panics if invoked when in real-time mode.
213 ///
214 /// # Thread Safety
215 ///
216 /// The mode check is not atomic with the subsequent store. If another thread calls
217 /// `make_realtime()` between the check and store, the invariant can be violated.
218 /// This is intentional: mode switching is a setup-time operation and should not
219 /// occur concurrently with time operations. Callers must ensure mode switches are
220 /// complete before resuming time operations.
221 pub fn set_time(&self, time: UnixNanos) {
222 assert!(
223 !self.realtime.load(Ordering::SeqCst),
224 "Cannot set time while clock is in realtime mode"
225 );
226
227 self.store(time.into(), Ordering::Release);
228
229 debug_assert!(
230 !self.realtime.load(Ordering::SeqCst),
231 "Invariant violated: mode switched to realtime during set_time"
232 );
233 }
234
235 /// Increments the current (static-mode) time by `delta` nanoseconds and returns the updated value.
236 ///
237 /// Internally this uses [`AtomicU64::fetch_update`] with [`Ordering::AcqRel`] to ensure the increment is
238 /// atomic and visible to readers using `Acquire` loads.
239 ///
240 /// # Errors
241 ///
242 /// Returns an error if the increment would overflow `u64::MAX` or if called
243 /// while the clock is in real-time mode.
244 ///
245 /// # Thread Safety
246 ///
247 /// The mode check is not atomic with the subsequent update. If another thread calls
248 /// `make_realtime()` between the check and update, the invariant can be violated.
249 /// This is intentional: mode switching is a setup-time operation and should not
250 /// occur concurrently with time operations. Callers must ensure mode switches are
251 /// complete before resuming time operations.
252 pub fn increment_time(&self, delta: u64) -> anyhow::Result<UnixNanos> {
253 anyhow::ensure!(
254 !self.realtime.load(Ordering::SeqCst),
255 "Cannot increment time while clock is in realtime mode"
256 );
257
258 let previous =
259 match self
260 .timestamp_ns
261 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
262 current.checked_add(delta)
263 }) {
264 Ok(prev) => prev,
265 Err(_) => anyhow::bail!("Cannot increment time beyond u64::MAX"),
266 };
267
268 debug_assert!(
269 !self.realtime.load(Ordering::SeqCst),
270 "Invariant violated: mode switched to realtime during increment_time"
271 );
272
273 Ok(UnixNanos::from(previous + delta))
274 }
275
276 /// Retrieves and updates the current “real-time” clock, returning a strictly increasing
277 /// timestamp based on system time.
278 ///
279 /// Internally:
280 /// - We fetch `now` from [`SystemTime::now()`].
281 /// - We do an atomic compare-and-exchange (using [`Ordering::AcqRel`]) to ensure the stored
282 /// timestamp is never less than the last timestamp.
283 ///
284 /// This ensures:
285 /// 1. **Monotonic increments**: The returned timestamp is strictly greater than the previous
286 /// one (by at least 1 nanosecond).
287 /// 2. **No backward jumps**: If the OS time moves backward, we ignore that shift to preserve
288 /// monotonicity.
289 /// 3. **Visibility**: In a multi-threaded environment, other threads see the updated value
290 /// once this compare-and-exchange completes.
291 ///
292 /// # Panics
293 ///
294 /// Panics if the internal counter has reached `u64::MAX`, which would indicate the process has
295 /// been running for longer than the representable range (~584 years) *or* the clock was
296 /// manually corrupted.
297 pub fn time_since_epoch(&self) -> UnixNanos {
298 // This method guarantees strict consistency but may incur a performance cost under
299 // high contention due to retries in the `compare_exchange` loop.
300 let now = nanos_since_unix_epoch();
301 loop {
302 // Acquire to observe the latest stored value
303 let last = self.load(Ordering::Acquire);
304 // Ensure we never wrap past u64::MAX – treat that as a fatal error
305 let incremented = last
306 .checked_add(1)
307 .expect("AtomicTime overflow: reached u64::MAX");
308 let next = now.max(incremented);
309 // AcqRel on success ensures this new value is published,
310 // Acquire on failure reloads if we lost a CAS race.
311 //
312 // Note that under heavy contention (many threads calling this in tight loops),
313 // the CAS loop may increase latency.
314 //
315 // However, in practice, the loop terminates quickly because:
316 // - System time naturally advances between iterations
317 // - Each iteration increments time by at least 1ns, preventing ABA problems
318 // - True contention requiring retry is rare in normal usage patterns
319 //
320 // The concurrent stress test (4 threads × 100k iterations) validates this approach.
321 if self
322 .compare_exchange(last, next, Ordering::AcqRel, Ordering::Acquire)
323 .is_ok()
324 {
325 return UnixNanos::from(next);
326 }
327 }
328 }
329
330 /// Switches the clock to **real-time mode** (`realtime = true`).
331 ///
332 /// Uses [`Ordering::SeqCst`] for the mode store, which ensures a global ordering for the
333 /// mode switch if other threads also do `SeqCst` loads/stores of `realtime`.
334 /// Typically, switching modes is done infrequently, so the performance impact of `SeqCst`
335 /// here is acceptable.
336 pub fn make_realtime(&self) {
337 self.realtime.store(true, Ordering::SeqCst);
338 }
339
340 /// Switches the clock to **static mode** (`realtime = false`).
341 ///
342 /// Uses [`Ordering::SeqCst`] for the mode store, which ensures a global ordering for the
343 /// mode switch if other threads also do `SeqCst` loads/stores of `realtime`.
344 pub fn make_static(&self) {
345 self.realtime.store(false, Ordering::SeqCst);
346 }
347}
348
349#[cfg(test)]
350mod tests {
351 use std::sync::Arc;
352
353 use rstest::*;
354
355 use super::*;
356
357 #[rstest]
358 fn test_global_clocks_initialization() {
359 let realtime_clock = get_atomic_clock_realtime();
360 assert!(realtime_clock.get_time_ns().as_u64() > 0);
361
362 let static_clock = get_atomic_clock_static();
363 static_clock.set_time(UnixNanos::from(500_000_000)); // 500 ms
364 assert_eq!(static_clock.get_time_ns().as_u64(), 500_000_000);
365 }
366
367 #[rstest]
368 fn test_mode_switching() {
369 let time = AtomicTime::new(true, UnixNanos::default());
370
371 // Verify real-time mode
372 let realtime_ns = time.get_time_ns();
373 assert!(realtime_ns.as_u64() > 0);
374
375 // Switch to static mode
376 time.make_static();
377 time.set_time(UnixNanos::from(1_000_000_000)); // 1 second
378 let static_ns = time.get_time_ns();
379 assert_eq!(static_ns.as_u64(), 1_000_000_000);
380
381 // Switch back to real-time mode
382 time.make_realtime();
383 let new_realtime_ns = time.get_time_ns();
384 assert!(new_realtime_ns.as_u64() > static_ns.as_u64());
385 }
386
387 #[rstest]
388 #[should_panic(expected = "Cannot set time while clock is in realtime mode")]
389 fn test_set_time_panics_in_realtime_mode() {
390 let clock = AtomicTime::new(true, UnixNanos::default());
391 clock.set_time(UnixNanos::from(123));
392 }
393
394 #[rstest]
395 fn test_increment_time_returns_error_in_realtime_mode() {
396 let clock = AtomicTime::new(true, UnixNanos::default());
397 let result = clock.increment_time(1);
398 assert!(result.is_err());
399 assert!(
400 result
401 .unwrap_err()
402 .to_string()
403 .contains("Cannot increment time while clock is in realtime mode")
404 );
405 }
406
407 #[rstest]
408 #[should_panic(expected = "AtomicTime overflow")]
409 fn test_time_since_epoch_overflow_panics() {
410 use std::sync::atomic::{AtomicBool, AtomicU64};
411
412 // Manually construct a clock with the counter already at u64::MAX
413 let clock = AtomicTime {
414 realtime: AtomicBool::new(true),
415 timestamp_ns: AtomicU64::new(u64::MAX),
416 };
417
418 // This call will attempt to add 1 and must panic
419 let _ = clock.time_since_epoch();
420 }
421
422 #[rstest]
423 fn test_mode_switching_concurrent() {
424 let clock = Arc::new(AtomicTime::new(true, UnixNanos::default()));
425 let num_threads = 4;
426 let iterations = 10000;
427 let mut handles = Vec::with_capacity(num_threads);
428
429 for _ in 0..num_threads {
430 let clock_clone = Arc::clone(&clock);
431 let handle = std::thread::spawn(move || {
432 for i in 0..iterations {
433 if i % 2 == 0 {
434 clock_clone.make_static();
435 } else {
436 clock_clone.make_realtime();
437 }
438 // Retrieve the time; we’re not asserting a particular value here,
439 // but at least we’re exercising the mode switch logic under concurrency.
440 let _ = clock_clone.get_time_ns();
441 }
442 });
443 handles.push(handle);
444 }
445
446 for handle in handles {
447 handle.join().unwrap();
448 }
449 }
450
451 #[rstest]
452 fn test_static_time_is_stable() {
453 // Create a clock in static mode with an initial value
454 let clock = AtomicTime::new(false, UnixNanos::from(42));
455 let time1 = clock.get_time_ns();
456
457 // Sleep a bit to give the system time to change, if the clock were using real-time
458 std::thread::sleep(std::time::Duration::from_millis(10));
459 let time2 = clock.get_time_ns();
460
461 // In static mode, the value should remain unchanged
462 assert_eq!(time1, time2);
463 }
464
465 #[rstest]
466 fn test_increment_time() {
467 // Start in static mode
468 let time = AtomicTime::new(false, UnixNanos::from(0));
469
470 let updated_time = time.increment_time(500).unwrap();
471 assert_eq!(updated_time.as_u64(), 500);
472
473 let updated_time = time.increment_time(1_000).unwrap();
474 assert_eq!(updated_time.as_u64(), 1_500);
475 }
476
477 #[rstest]
478 fn test_increment_time_overflow_errors() {
479 let time = AtomicTime::new(false, UnixNanos::from(u64::MAX - 5));
480
481 let err = time.increment_time(10).unwrap_err();
482 assert_eq!(err.to_string(), "Cannot increment time beyond u64::MAX");
483 }
484
485 #[rstest]
486 #[allow(
487 clippy::cast_possible_truncation,
488 clippy::cast_possible_wrap,
489 reason = "Intentional cast for Python interop"
490 )]
491 fn test_nanos_since_unix_epoch_vs_system_time() {
492 let unix_nanos = nanos_since_unix_epoch();
493 let system_ns = duration_since_unix_epoch().as_nanos() as u64;
494 assert!((unix_nanos as i64 - system_ns as i64).abs() < NANOSECONDS_IN_SECOND as i64);
495 }
496
497 #[rstest]
498 fn test_time_since_epoch_monotonicity() {
499 let clock = get_atomic_clock_realtime();
500 let mut previous = clock.time_since_epoch();
501 for _ in 0..1_000_000 {
502 let current = clock.time_since_epoch();
503 assert!(current > previous);
504 previous = current;
505 }
506 }
507
508 #[rstest]
509 fn test_time_since_epoch_strictly_increasing_concurrent() {
510 let time = Arc::new(AtomicTime::new(true, UnixNanos::default()));
511 let num_threads = 4;
512 let iterations = 100_000;
513 let mut handles = Vec::with_capacity(num_threads);
514
515 for thread_id in 0..num_threads {
516 let time_clone = Arc::clone(&time);
517
518 let handle = std::thread::spawn(move || {
519 let mut previous = time_clone.time_since_epoch().as_u64();
520
521 for i in 0..iterations {
522 let current = time_clone.time_since_epoch().as_u64();
523 assert!(
524 current > previous,
525 "Thread {thread_id}: iteration {i}: time did not increase: previous={previous}, current={current}",
526 );
527 previous = current;
528 }
529 });
530
531 handles.push(handle);
532 }
533
534 for handle in handles {
535 handle.join().unwrap();
536 }
537 }
538
539 #[rstest]
540 fn test_duration_since_unix_epoch() {
541 let time = AtomicTime::new(true, UnixNanos::default());
542 let duration = Duration::from_nanos(time.get_time_ns().into());
543 let now = SystemTime::now();
544
545 // Check if the duration is close to the actual difference between now and UNIX_EPOCH
546 let delta = now
547 .duration_since(UNIX_EPOCH)
548 .unwrap()
549 .checked_sub(duration);
550 assert!(delta.unwrap_or_default() < Duration::from_millis(100));
551
552 // Check if the duration is greater than a certain value (assuming the test is run after that point)
553 assert!(duration > Duration::from_secs(1_650_000_000));
554 }
555
556 #[rstest]
557 fn test_unix_timestamp_is_monotonic_increasing() {
558 let time = AtomicTime::new(true, UnixNanos::default());
559 let result1 = time.get_time();
560 let result2 = time.get_time();
561 let result3 = time.get_time();
562 let result4 = time.get_time();
563 let result5 = time.get_time();
564
565 assert!(result2 >= result1);
566 assert!(result3 >= result2);
567 assert!(result4 >= result3);
568 assert!(result5 >= result4);
569 assert!(result1 > 1_650_000_000.0);
570 }
571
572 #[rstest]
573 fn test_unix_timestamp_ms_is_monotonic_increasing() {
574 let time = AtomicTime::new(true, UnixNanos::default());
575 let result1 = time.get_time_ms();
576 let result2 = time.get_time_ms();
577 let result3 = time.get_time_ms();
578 let result4 = time.get_time_ms();
579 let result5 = time.get_time_ms();
580
581 assert!(result2 >= result1);
582 assert!(result3 >= result2);
583 assert!(result4 >= result3);
584 assert!(result5 >= result4);
585 assert!(result1 >= 1_650_000_000_000);
586 }
587
588 #[rstest]
589 fn test_unix_timestamp_us_is_monotonic_increasing() {
590 let time = AtomicTime::new(true, UnixNanos::default());
591 let result1 = time.get_time_us();
592 let result2 = time.get_time_us();
593 let result3 = time.get_time_us();
594 let result4 = time.get_time_us();
595 let result5 = time.get_time_us();
596
597 assert!(result2 >= result1);
598 assert!(result3 >= result2);
599 assert!(result4 >= result3);
600 assert!(result5 >= result4);
601 assert!(result1 > 1_650_000_000_000_000);
602 }
603
604 #[rstest]
605 fn test_unix_timestamp_ns_is_monotonic_increasing() {
606 let time = AtomicTime::new(true, UnixNanos::default());
607 let result1 = time.get_time_ns();
608 let result2 = time.get_time_ns();
609 let result3 = time.get_time_ns();
610 let result4 = time.get_time_ns();
611 let result5 = time.get_time_ns();
612
613 assert!(result2 >= result1);
614 assert!(result3 >= result2);
615 assert!(result4 >= result3);
616 assert!(result5 >= result4);
617 assert!(result1.as_u64() > 1_650_000_000_000_000_000);
618 }
619
620 #[rstest]
621 fn test_acquire_release_contract_static_mode() {
622 // This test explicitly proves the Acquire/Release memory ordering contract:
623 // - Writer thread uses set_time() which does Release store (see AtomicTime::set_time)
624 // - Reader thread uses get_time_ns() which does Acquire load (see AtomicTime::get_time_ns)
625 // - The Release-Acquire pair ensures all writes before Release are visible after Acquire
626
627 let clock = Arc::new(AtomicTime::new(false, UnixNanos::from(0)));
628 let aux_data = Arc::new(AtomicU64::new(0));
629 let done = Arc::new(AtomicBool::new(false));
630
631 // Writer thread: updates auxiliary data, then releases via set_time
632 let writer_clock = Arc::clone(&clock);
633 let writer_aux = Arc::clone(&aux_data);
634 let writer_done = Arc::clone(&done);
635
636 let writer = std::thread::spawn(move || {
637 for i in 1..=1_000u64 {
638 writer_aux.store(i, Ordering::Relaxed);
639
640 // Release store via set_time creates a release fence - all prior writes (including aux_data)
641 // must be visible to any thread that observes this time value via Acquire load
642 writer_clock.set_time(UnixNanos::from(i * 1000));
643
644 // Yield to encourage interleaving
645 std::thread::yield_now();
646 }
647 writer_done.store(true, Ordering::Release);
648 });
649
650 // Reader thread: acquires via get_time_ns, then checks auxiliary data
651 let reader_clock = Arc::clone(&clock);
652 let reader_aux = Arc::clone(&aux_data);
653 let reader_done = Arc::clone(&done);
654
655 let reader = std::thread::spawn(move || {
656 let mut last_time = 0u64;
657 let mut max_aux_seen = 0u64;
658
659 // Poll until writer is done, with no iteration limit
660 while !reader_done.load(Ordering::Acquire) {
661 let current_time = reader_clock.get_time_ns().as_u64();
662
663 if current_time > last_time {
664 // The Acquire in get_time_ns synchronizes with the Release in set_time,
665 // making aux_data visible
666 let aux_value = reader_aux.load(Ordering::Relaxed);
667
668 // Invariant: aux_value must never go backwards (proves Release-Acquire sync works)
669 if aux_value > 0 {
670 assert!(
671 aux_value >= max_aux_seen,
672 "Acquire/Release contract violated: aux went backwards from {max_aux_seen} to {aux_value}"
673 );
674 max_aux_seen = aux_value;
675 }
676
677 last_time = current_time;
678 }
679
680 std::thread::yield_now();
681 }
682
683 // Check final state after writer completes to ensure we observe updates
684 let final_time = reader_clock.get_time_ns().as_u64();
685 if final_time > last_time {
686 let final_aux = reader_aux.load(Ordering::Relaxed);
687 if final_aux > 0 {
688 assert!(
689 final_aux >= max_aux_seen,
690 "Acquire/Release contract violated: final aux {final_aux} < max {max_aux_seen}"
691 );
692 max_aux_seen = final_aux;
693 }
694 }
695
696 max_aux_seen
697 });
698
699 writer.join().unwrap();
700 let max_observed = reader.join().unwrap();
701
702 // Ensure the reader actually observed updates (not vacuously satisfied)
703 assert!(max_observed > 0, "Reader must observe writer updates");
704 }
705
706 #[rstest]
707 fn test_acquire_release_contract_increment_time() {
708 // Similar test for increment_time, which uses fetch_update with AcqRel (see AtomicTime::increment_time)
709
710 let clock = Arc::new(AtomicTime::new(false, UnixNanos::from(0)));
711 let aux_data = Arc::new(AtomicU64::new(0));
712 let done = Arc::new(AtomicBool::new(false));
713
714 let writer_clock = Arc::clone(&clock);
715 let writer_aux = Arc::clone(&aux_data);
716 let writer_done = Arc::clone(&done);
717
718 let writer = std::thread::spawn(move || {
719 for i in 1..=1_000u64 {
720 writer_aux.store(i, Ordering::Relaxed);
721 let _ = writer_clock.increment_time(1000).unwrap();
722 std::thread::yield_now();
723 }
724 writer_done.store(true, Ordering::Release);
725 });
726
727 let reader_clock = Arc::clone(&clock);
728 let reader_aux = Arc::clone(&aux_data);
729 let reader_done = Arc::clone(&done);
730
731 let reader = std::thread::spawn(move || {
732 let mut last_time = 0u64;
733 let mut max_aux = 0u64;
734
735 // Poll until writer is done, with no iteration limit
736 while !reader_done.load(Ordering::Acquire) {
737 let current_time = reader_clock.get_time_ns().as_u64();
738
739 if current_time > last_time {
740 let aux_value = reader_aux.load(Ordering::Relaxed);
741
742 // Invariant: aux_value must never regress (proves AcqRel sync works)
743 if aux_value > 0 {
744 assert!(
745 aux_value >= max_aux,
746 "AcqRel contract violated: aux regressed from {max_aux} to {aux_value}"
747 );
748 max_aux = aux_value;
749 }
750
751 last_time = current_time;
752 }
753
754 std::thread::yield_now();
755 }
756
757 // Check final state after writer completes to ensure we observe updates
758 let final_time = reader_clock.get_time_ns().as_u64();
759 if final_time > last_time {
760 let final_aux = reader_aux.load(Ordering::Relaxed);
761 if final_aux > 0 {
762 assert!(
763 final_aux >= max_aux,
764 "AcqRel contract violated: final aux {final_aux} < max {max_aux}"
765 );
766 max_aux = final_aux;
767 }
768 }
769
770 max_aux
771 });
772
773 writer.join().unwrap();
774 let max_observed = reader.join().unwrap();
775
776 // Ensure the reader actually observed updates (not vacuously satisfied)
777 assert!(max_observed > 0, "Reader must observe writer updates");
778 }
779}