nautilus_network/ratelimiter/
clock.rs1use std::{
26 fmt::Debug,
27 ops::Add,
28 prelude::v1::*,
29 sync::{
30 Arc,
31 atomic::{AtomicU64, Ordering},
32 },
33 time::{Duration, Instant},
34};
35
36use super::nanos::Nanos;
37
38pub trait Reference:
40 Sized + Add<Nanos, Output = Self> + PartialEq + Eq + Ord + Copy + Clone + Send + Sync + Debug
41{
42 fn duration_since(&self, earlier: Self) -> Nanos;
47
48 #[must_use]
52 fn saturating_sub(&self, duration: Nanos) -> Self;
53}
54
55pub trait Clock: Clone {
57 type Instant: Reference;
59
60 fn now(&self) -> Self::Instant;
62}
63
64impl Reference for Duration {
65 fn duration_since(&self, earlier: Self) -> Nanos {
67 self.checked_sub(earlier)
68 .unwrap_or_else(|| Self::new(0, 0))
69 .into()
70 }
71
72 fn saturating_sub(&self, duration: Nanos) -> Self {
74 self.checked_sub(duration.into()).unwrap_or(*self)
75 }
76}
77
78impl Add<Nanos> for Duration {
79 type Output = Self;
80
81 fn add(self, other: Nanos) -> Self {
82 let other: Self = other.into();
83 self + other
84 }
85}
86
87#[derive(Debug, Clone, Default)]
95pub struct FakeRelativeClock {
96 now: Arc<AtomicU64>,
97}
98
99impl FakeRelativeClock {
100 pub fn advance(&self, by: Duration) {
106 let by: u64 = by
107 .as_nanos()
108 .try_into()
109 .expect("Cannot represent durations greater than 584 years");
110
111 let mut prev = self.now.load(Ordering::Acquire);
112 let mut next = prev + by;
113 while let Err(e) =
114 self.now
115 .compare_exchange_weak(prev, next, Ordering::Release, Ordering::Relaxed)
116 {
117 prev = e;
118 next = prev + by;
119 }
120 }
121}
122
123impl PartialEq for FakeRelativeClock {
124 fn eq(&self, other: &Self) -> bool {
125 self.now.load(Ordering::Relaxed) == other.now.load(Ordering::Relaxed)
126 }
127}
128
129impl Clock for FakeRelativeClock {
130 type Instant = Nanos;
131
132 fn now(&self) -> Self::Instant {
133 self.now.load(Ordering::Relaxed).into()
134 }
135}
136
137#[derive(Clone, Debug, Default)]
139pub struct MonotonicClock;
140
141impl Add<Nanos> for Instant {
142 type Output = Self;
143
144 fn add(self, other: Nanos) -> Self {
145 let other: Duration = other.into();
146 self + other
147 }
148}
149
150impl Reference for Instant {
151 fn duration_since(&self, earlier: Self) -> Nanos {
152 if earlier < *self {
153 (*self - earlier).into()
154 } else {
155 Nanos::from(Duration::new(0, 0))
156 }
157 }
158
159 fn saturating_sub(&self, duration: Nanos) -> Self {
160 self.checked_sub(duration.into()).unwrap_or(*self)
161 }
162}
163
164impl Clock for MonotonicClock {
165 type Instant = Instant;
166
167 fn now(&self) -> Self::Instant {
168 Instant::now()
169 }
170}
171
172#[cfg(test)]
173mod test {
174 use std::{sync::Arc, thread, time::Duration};
175
176 use rstest::rstest;
177
178 use super::*;
179
180 #[rstest]
181 fn fake_clock_parallel_advances() {
182 let clock = Arc::new(FakeRelativeClock::default());
183 let threads = std::iter::repeat_n((), 10)
184 .map(move |()| {
185 let clock = Arc::clone(&clock);
186 thread::spawn(move || {
187 for _ in 0..1_000_000 {
188 let now = clock.now();
189 clock.advance(Duration::from_nanos(1));
190 assert!(clock.now() > now);
191 }
192 })
193 })
194 .collect::<Vec<_>>();
195 for t in threads {
196 t.join().unwrap();
197 }
198 }
199
200 #[rstest]
201 fn duration_addition_coverage() {
202 let d = Duration::from_secs(1);
203 let one_ns = Nanos::from(1);
204 assert!(d + one_ns > d);
205 }
206}