nautilus_common/throttler/
mod.rs1pub mod callbacks;
17pub mod inner;
18
19use std::{cell::RefCell, fmt::Debug, rc::Rc};
20
21use callbacks::{ThrottlerProcess, ThrottlerResume};
22use inner::InnerThrottler;
23
24use crate::clock::Clock;
25
26#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct RateLimit {
29 pub limit: usize,
30 pub interval_ns: u64,
31}
32
33impl RateLimit {
34 #[must_use]
36 pub const fn new(limit: usize, interval_ns: u64) -> Self {
37 Self { limit, interval_ns }
38 }
39}
40
41#[derive(Clone)]
46pub struct Throttler<T, F> {
47 inner: Rc<RefCell<InnerThrottler<T, F>>>,
48}
49
50impl<T, F> Debug for Throttler<T, F>
51where
52 T: Debug,
53{
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct(stringify!(Throttler))
56 .field("inner", &self.inner)
57 .finish()
58 }
59}
60
61impl<T, F> Throttler<T, F> {
62 pub fn new(
64 rate_limit: RateLimit,
65 clock: Rc<RefCell<dyn Clock>>,
66 timer_name: String,
67 output_send: F,
68 output_drop: Option<F>,
69 ) -> Self {
70 let inner = InnerThrottler::new(
71 rate_limit.limit,
72 rate_limit.interval_ns,
73 clock,
74 timer_name,
75 output_send,
76 output_drop,
77 );
78
79 Self {
80 inner: Rc::new(RefCell::new(inner)),
81 }
82 }
83
84 #[must_use]
85 pub fn qsize(&self) -> usize {
86 let inner = self.inner.borrow();
87 inner.buffer.len()
88 }
89
90 pub fn reset(&self) {
91 let mut inner = self.inner.borrow_mut();
92 inner.reset();
93 }
94
95 #[must_use]
96 pub fn used(&self) -> f64 {
97 let inner = self.inner.borrow();
98 inner.used()
99 }
100}
101
102impl<T, F> Throttler<T, F>
103where
104 T: 'static,
105 F: Fn(T) + 'static,
106{
107 pub fn send(&self, msg: T) {
108 let throttler_clone = Self {
109 inner: self.inner.clone(),
110 };
111 let mut inner = self.inner.borrow_mut();
112 inner.recv_count += 1;
113
114 if inner.is_limiting || inner.delta_next() > 0 {
115 inner.limit_msg(msg, throttler_clone);
116 } else {
117 inner.send_msg(msg);
118 }
119 }
120
121 fn get_process_callback(&self) -> ThrottlerProcess<T, F> {
122 ThrottlerProcess::new(self.inner.clone())
123 }
124
125 fn get_resume_callback(&self) -> ThrottlerResume<T, F> {
126 ThrottlerResume::new(self.inner.clone())
127 }
128}
129
130#[cfg(test)]
134mod tests {
135 use std::{cell::RefCell, rc::Rc};
136
137 use rstest::{fixture, rstest};
138
139 use super::{RateLimit, Throttler};
140 use crate::clock::TestClock;
141
142 struct TestThrottler {
147 throttler: Throttler<u64, Box<dyn Fn(u64)>>,
148 clock: Rc<RefCell<TestClock>>,
149 interval: u64,
150 }
151
152 #[fixture]
153 pub fn test_throttler_buffered() -> TestThrottler {
154 let output_send: Box<dyn Fn(u64)> = Box::new(|msg: u64| {
155 log::debug!("Sent: {msg}");
156 });
157 let clock = Rc::new(RefCell::new(TestClock::new()));
158 let inner_clock = Rc::clone(&clock);
159 let rate_limit = RateLimit::new(5, 10);
160 let interval = rate_limit.interval_ns;
161
162 TestThrottler {
163 throttler: Throttler::new(
164 rate_limit,
165 clock,
166 "buffer_timer".to_string(),
167 output_send,
168 None,
169 ),
170 clock: inner_clock,
171 interval,
172 }
173 }
174
175 #[fixture]
176 pub fn test_throttler_unbuffered() -> TestThrottler {
177 let output_send: Box<dyn Fn(u64)> = Box::new(|msg: u64| {
178 log::debug!("Sent: {msg}");
179 });
180 let output_drop: Box<dyn Fn(u64)> = Box::new(|msg: u64| {
181 log::debug!("Dropped: {msg}");
182 });
183 let clock = Rc::new(RefCell::new(TestClock::new()));
184 let inner_clock = Rc::clone(&clock);
185 let rate_limit = RateLimit::new(5, 10);
186 let interval = rate_limit.interval_ns;
187
188 TestThrottler {
189 throttler: Throttler::new(
190 rate_limit,
191 clock,
192 "dropper_timer".to_string(),
193 output_send,
194 Some(output_drop),
195 ),
196 clock: inner_clock,
197 interval,
198 }
199 }
200
201 #[rstest]
202 fn test_buffering_send_to_limit_becomes_throttled(mut test_throttler_buffered: TestThrottler) {
203 let throttler = &mut test_throttler_buffered.throttler;
204 for _ in 0..6 {
205 throttler.send(42);
206 }
207 assert_eq!(throttler.qsize(), 1);
208
209 let inner = throttler.inner.borrow();
210 assert!(inner.is_limiting);
211 assert_eq!(inner.recv_count, 6);
212 assert_eq!(inner.sent_count, 5);
213 assert_eq!(inner.clock.borrow().timer_names(), vec!["buffer_timer"]);
214 }
215
216 #[rstest]
217 fn test_buffering_used_when_sent_to_limit_returns_one(
218 mut test_throttler_buffered: TestThrottler,
219 ) {
220 let throttler = &mut test_throttler_buffered.throttler;
221
222 for _ in 0..5 {
223 throttler.send(42);
224 }
225
226 let inner = throttler.inner.borrow();
227 assert_eq!(inner.used(), 1.0);
228 assert_eq!(inner.recv_count, 5);
229 assert_eq!(inner.sent_count, 5);
230 }
231
232 #[rstest]
233 fn test_buffering_used_when_half_interval_from_limit_returns_one(
234 mut test_throttler_buffered: TestThrottler,
235 ) {
236 let throttler = &mut test_throttler_buffered.throttler;
237
238 for _ in 0..5 {
239 throttler.send(42);
240 }
241
242 let half_interval = test_throttler_buffered.interval / 2;
243 {
245 let mut clock = test_throttler_buffered.clock.borrow_mut();
246 clock.advance_time(half_interval.into(), true);
247 }
248
249 let inner = throttler.inner.borrow();
250 assert_eq!(inner.used(), 1.0);
251 assert_eq!(inner.recv_count, 5);
252 assert_eq!(inner.sent_count, 5);
253 }
254
255 #[rstest]
256 fn test_buffering_used_before_limit_when_halfway_returns_half(
257 mut test_throttler_buffered: TestThrottler,
258 ) {
259 let throttler = &mut test_throttler_buffered.throttler;
260
261 for _ in 0..3 {
262 throttler.send(42);
263 }
264
265 let inner = throttler.inner.borrow();
266 assert_eq!(inner.used(), 0.6);
267 assert_eq!(inner.recv_count, 3);
268 assert_eq!(inner.sent_count, 3);
269 }
270
271 #[rstest]
272 fn test_buffering_refresh_when_at_limit_sends_remaining_items(
273 mut test_throttler_buffered: TestThrottler,
274 ) {
275 let throttler = &mut test_throttler_buffered.throttler;
276
277 for _ in 0..6 {
278 throttler.send(42);
279 }
280
281 {
283 let mut clock = test_throttler_buffered.clock.borrow_mut();
284 let time_events = clock.advance_time(test_throttler_buffered.interval.into(), true);
285 for each_event in clock.match_handlers(time_events) {
286 drop(clock); each_event.callback.call(each_event.event);
289
290 clock = test_throttler_buffered.clock.borrow_mut();
292 }
293 }
294
295 {
297 let inner = throttler.inner.borrow();
298 assert_eq!(inner.used(), 0.2);
299 assert_eq!(inner.recv_count, 6);
300 assert_eq!(inner.sent_count, 6);
301 assert_eq!(inner.qsize(), 0);
302 }
303 }
304
305 #[rstest]
306 fn test_buffering_send_message_after_buffering_message(
307 mut test_throttler_buffered: TestThrottler,
308 ) {
309 let throttler = &mut test_throttler_buffered.throttler;
310
311 for _ in 0..6 {
312 throttler.send(42);
313 }
314
315 {
317 let mut clock = test_throttler_buffered.clock.borrow_mut();
318 let time_events = clock.advance_time(test_throttler_buffered.interval.into(), true);
319 for each_event in clock.match_handlers(time_events) {
320 drop(clock); each_event.callback.call(each_event.event);
323
324 clock = test_throttler_buffered.clock.borrow_mut();
326 }
327 }
328
329 for _ in 0..6 {
330 throttler.send(42);
331 }
332
333 {
335 let inner = throttler.inner.borrow();
336 assert_eq!(inner.used(), 1.0);
337 assert_eq!(inner.recv_count, 12);
338 assert_eq!(inner.sent_count, 10);
339 assert_eq!(inner.qsize(), 2);
340 }
341 }
342
343 #[rstest]
344 fn test_buffering_send_message_after_halfway_after_buffering_message(
345 mut test_throttler_buffered: TestThrottler,
346 ) {
347 let throttler = &mut test_throttler_buffered.throttler;
348
349 for _ in 0..6 {
350 throttler.send(42);
351 }
352
353 {
355 let mut clock = test_throttler_buffered.clock.borrow_mut();
356 let time_events = clock.advance_time(test_throttler_buffered.interval.into(), true);
357 for each_event in clock.match_handlers(time_events) {
358 drop(clock); each_event.callback.call(each_event.event);
361
362 clock = test_throttler_buffered.clock.borrow_mut();
364 }
365 }
366
367 for _ in 0..3 {
368 throttler.send(42);
369 }
370
371 {
373 let inner = throttler.inner.borrow();
374 assert_eq!(inner.used(), 0.8);
375 assert_eq!(inner.recv_count, 9);
376 assert_eq!(inner.sent_count, 9);
377 assert_eq!(inner.qsize(), 0);
378 }
379 }
380
381 #[rstest]
382 fn test_dropping_send_sends_message_to_handler(mut test_throttler_unbuffered: TestThrottler) {
383 let throttler = &mut test_throttler_unbuffered.throttler;
384 throttler.send(42);
385 let inner = throttler.inner.borrow();
386
387 assert!(!inner.is_limiting);
388 assert_eq!(inner.recv_count, 1);
389 assert_eq!(inner.sent_count, 1);
390 }
391
392 #[rstest]
393 fn test_dropping_send_to_limit_drops_message(mut test_throttler_unbuffered: TestThrottler) {
394 let throttler = &mut test_throttler_unbuffered.throttler;
395 for _ in 0..6 {
396 throttler.send(42);
397 }
398 assert_eq!(throttler.qsize(), 0);
399
400 let inner = throttler.inner.borrow();
401 assert!(inner.is_limiting);
402 assert_eq!(inner.used(), 1.0);
403 assert_eq!(inner.clock.borrow().timer_count(), 1);
404 assert_eq!(inner.clock.borrow().timer_names(), vec!["dropper_timer"]);
405 assert_eq!(inner.recv_count, 6);
406 assert_eq!(inner.sent_count, 5);
407 }
408
409 #[rstest]
410 fn test_dropping_advance_time_when_at_limit_dropped_message(
411 mut test_throttler_unbuffered: TestThrottler,
412 ) {
413 let throttler = &mut test_throttler_unbuffered.throttler;
414 for _ in 0..6 {
415 throttler.send(42);
416 }
417
418 {
420 let mut clock = test_throttler_unbuffered.clock.borrow_mut();
421 let time_events = clock.advance_time(test_throttler_unbuffered.interval.into(), true);
422 for each_event in clock.match_handlers(time_events) {
423 drop(clock); each_event.callback.call(each_event.event);
426
427 clock = test_throttler_unbuffered.clock.borrow_mut();
429 }
430 }
431
432 let inner = throttler.inner.borrow();
433 assert_eq!(inner.clock.borrow().timer_count(), 0);
434 assert!(!inner.is_limiting);
435 assert_eq!(inner.used(), 0.0);
436 assert_eq!(inner.recv_count, 6);
437 assert_eq!(inner.sent_count, 5);
438 }
439
440 #[rstest]
441 fn test_dropping_send_message_after_dropping_message(
442 mut test_throttler_unbuffered: TestThrottler,
443 ) {
444 let throttler = &mut test_throttler_unbuffered.throttler;
445 for _ in 0..6 {
446 throttler.send(42);
447 }
448
449 {
451 let mut clock = test_throttler_unbuffered.clock.borrow_mut();
452 let time_events = clock.advance_time(test_throttler_unbuffered.interval.into(), true);
453 for each_event in clock.match_handlers(time_events) {
454 drop(clock); each_event.callback.call(each_event.event);
457
458 clock = test_throttler_unbuffered.clock.borrow_mut();
460 }
461 }
462
463 throttler.send(42);
464
465 let inner = throttler.inner.borrow();
466 assert_eq!(inner.used(), 0.2);
467 assert_eq!(inner.clock.borrow().timer_count(), 0);
468 assert!(!inner.is_limiting);
469 assert_eq!(inner.recv_count, 7);
470 assert_eq!(inner.sent_count, 6);
471 }
472
473 use proptest::prelude::*;
474
475 #[derive(Clone, Debug)]
476 enum ThrottlerInput {
477 SendMessage(u64),
478 AdvanceClock(u8),
479 }
480
481 fn throttler_input_strategy() -> impl Strategy<Value = ThrottlerInput> {
483 prop_oneof![
484 2 => prop::bool::ANY.prop_map(|_| ThrottlerInput::SendMessage(42)),
485 8 => prop::num::u8::ANY.prop_map(|v| ThrottlerInput::AdvanceClock(v % 5 + 5)),
486 ]
487 }
488
489 fn throttler_test_strategy() -> impl Strategy<Value = Vec<ThrottlerInput>> {
491 prop::collection::vec(throttler_input_strategy(), 10..=150)
492 }
493
494 fn test_throttler_with_inputs(inputs: Vec<ThrottlerInput>) {
495 let TestThrottler {
496 throttler,
497 clock: test_clock,
498 interval,
499 } = test_throttler_buffered();
500 let mut sent_count = 0;
501
502 for input in inputs {
503 match input {
504 ThrottlerInput::SendMessage(msg) => {
505 throttler.send(msg);
506 sent_count += 1;
507 }
508 ThrottlerInput::AdvanceClock(duration) => {
509 let mut clock_ref = test_clock.borrow_mut();
510 let current_time = clock_ref.get_time_ns();
511 let time_events =
512 clock_ref.advance_time(current_time + u64::from(duration), true);
513 for each_event in clock_ref.match_handlers(time_events) {
514 drop(clock_ref);
515 each_event.callback.call(each_event.event);
516 clock_ref = test_clock.borrow_mut();
517 }
518 }
519 }
520
521 let inner = throttler.inner.borrow();
526 let buffered_messages = inner.qsize() > 0;
527 let now = inner.clock.borrow().timestamp_ns().as_u64();
528 let limit_filled_within_interval = inner
529 .timestamps
530 .get(inner.limit - 1)
531 .is_some_and(|&ts| (now - ts.as_u64()) < interval);
532 let expected_limiting = buffered_messages && limit_filled_within_interval;
533 assert_eq!(inner.is_limiting, expected_limiting);
534
535 let inner = throttler.inner.borrow();
537 assert_eq!(sent_count, inner.sent_count + inner.qsize());
538 }
539
540 let time_events = test_clock
542 .borrow_mut()
543 .advance_time((interval * 100).into(), true);
544 let mut clock_ref = test_clock.borrow_mut();
545 for each_event in clock_ref.match_handlers(time_events) {
546 drop(clock_ref);
547 each_event.callback.call(each_event.event);
548 clock_ref = test_clock.borrow_mut();
549 }
550 assert_eq!(throttler.qsize(), 0);
551 }
552
553 #[test]
554 #[ignore = "Used for manually testing failing cases"]
555 fn test_case() {
556 let inputs = [
557 ThrottlerInput::SendMessage(42),
558 ThrottlerInput::AdvanceClock(5),
559 ThrottlerInput::SendMessage(42),
560 ThrottlerInput::SendMessage(42),
561 ThrottlerInput::SendMessage(42),
562 ThrottlerInput::SendMessage(42),
563 ThrottlerInput::SendMessage(42),
564 ThrottlerInput::AdvanceClock(5),
565 ThrottlerInput::SendMessage(42),
566 ThrottlerInput::SendMessage(42),
567 ]
568 .to_vec();
569
570 test_throttler_with_inputs(inputs);
571 }
572
573 proptest! {
574 #[test]
575 fn test(inputs in throttler_test_strategy()) {
576 test_throttler_with_inputs(inputs);
577 }
578 }
579}