nautilus_common/throttler/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16pub mod callbacks;
17pub mod inner;
18
19use std::{cell::RefCell, fmt::Debug, rc::Rc};
20
21use callbacks::{ThrottlerProcess, ThrottlerResume};
22use inner::InnerThrottler;
23
24use crate::clock::Clock;
25
26/// Represents a throttling limit per interval.
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct RateLimit {
29    pub limit: usize,
30    pub interval_ns: u64,
31}
32
33impl RateLimit {
34    /// Creates a new [`RateLimit`] instance.
35    #[must_use]
36    pub const fn new(limit: usize, interval_ns: u64) -> Self {
37        Self { limit, interval_ns }
38    }
39}
40
41/// Shareable reference to an [`InnerThrottler`]
42///
43/// Throttler takes messages of type T and callback of type F for dropping
44/// or processing messages.
45#[derive(Clone)]
46pub struct Throttler<T, F> {
47    inner: Rc<RefCell<InnerThrottler<T, F>>>,
48}
49
50impl<T, F> Debug for Throttler<T, F>
51where
52    T: Debug,
53{
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        f.debug_struct(stringify!(Throttler))
56            .field("inner", &self.inner)
57            .finish()
58    }
59}
60
61impl<T, F> Throttler<T, F> {
62    /// Creates a new [`Throttler`] instance.
63    pub fn new(
64        rate_limit: RateLimit,
65        clock: Rc<RefCell<dyn Clock>>,
66        timer_name: String,
67        output_send: F,
68        output_drop: Option<F>,
69    ) -> Self {
70        let inner = InnerThrottler::new(
71            rate_limit.limit,
72            rate_limit.interval_ns,
73            clock,
74            timer_name,
75            output_send,
76            output_drop,
77        );
78
79        Self {
80            inner: Rc::new(RefCell::new(inner)),
81        }
82    }
83
84    #[must_use]
85    pub fn qsize(&self) -> usize {
86        let inner = self.inner.borrow();
87        inner.buffer.len()
88    }
89
90    pub fn reset(&self) {
91        let mut inner = self.inner.borrow_mut();
92        inner.reset();
93    }
94
95    #[must_use]
96    pub fn used(&self) -> f64 {
97        let inner = self.inner.borrow();
98        inner.used()
99    }
100}
101
102impl<T, F> Throttler<T, F>
103where
104    T: 'static,
105    F: Fn(T) + 'static,
106{
107    pub fn send(&self, msg: T) {
108        let throttler_clone = Self {
109            inner: self.inner.clone(),
110        };
111        let mut inner = self.inner.borrow_mut();
112        inner.recv_count += 1;
113
114        if inner.is_limiting || inner.delta_next() > 0 {
115            inner.limit_msg(msg, throttler_clone);
116        } else {
117            inner.send_msg(msg);
118        }
119    }
120
121    fn get_process_callback(&self) -> ThrottlerProcess<T, F> {
122        ThrottlerProcess::new(self.inner.clone())
123    }
124
125    fn get_resume_callback(&self) -> ThrottlerResume<T, F> {
126        ThrottlerResume::new(self.inner.clone())
127    }
128}
129
130////////////////////////////////////////////////////////////////////////////////
131// Tests
132////////////////////////////////////////////////////////////////////////////////
133#[cfg(test)]
134mod tests {
135    use std::{cell::RefCell, rc::Rc};
136
137    use rstest::{fixture, rstest};
138
139    use super::{RateLimit, Throttler};
140    use crate::clock::TestClock;
141
142    /// Test throttler with default values for testing
143    ///
144    /// - Rate limit is 5 messages in 10 intervals.
145    /// - Message handling is decided by specific fixture
146    struct TestThrottler {
147        throttler: Throttler<u64, Box<dyn Fn(u64)>>,
148        clock: Rc<RefCell<TestClock>>,
149        interval: u64,
150    }
151
152    #[fixture]
153    pub fn test_throttler_buffered() -> TestThrottler {
154        let output_send: Box<dyn Fn(u64)> = Box::new(|msg: u64| {
155            log::debug!("Sent: {msg}");
156        });
157        let clock = Rc::new(RefCell::new(TestClock::new()));
158        let inner_clock = Rc::clone(&clock);
159        let rate_limit = RateLimit::new(5, 10);
160        let interval = rate_limit.interval_ns;
161
162        TestThrottler {
163            throttler: Throttler::new(
164                rate_limit,
165                clock,
166                "buffer_timer".to_string(),
167                output_send,
168                None,
169            ),
170            clock: inner_clock,
171            interval,
172        }
173    }
174
175    #[fixture]
176    pub fn test_throttler_unbuffered() -> TestThrottler {
177        let output_send: Box<dyn Fn(u64)> = Box::new(|msg: u64| {
178            log::debug!("Sent: {msg}");
179        });
180        let output_drop: Box<dyn Fn(u64)> = Box::new(|msg: u64| {
181            log::debug!("Dropped: {msg}");
182        });
183        let clock = Rc::new(RefCell::new(TestClock::new()));
184        let inner_clock = Rc::clone(&clock);
185        let rate_limit = RateLimit::new(5, 10);
186        let interval = rate_limit.interval_ns;
187
188        TestThrottler {
189            throttler: Throttler::new(
190                rate_limit,
191                clock,
192                "dropper_timer".to_string(),
193                output_send,
194                Some(output_drop),
195            ),
196            clock: inner_clock,
197            interval,
198        }
199    }
200
201    #[rstest]
202    fn test_buffering_send_to_limit_becomes_throttled(mut test_throttler_buffered: TestThrottler) {
203        let throttler = &mut test_throttler_buffered.throttler;
204        for _ in 0..6 {
205            throttler.send(42);
206        }
207        assert_eq!(throttler.qsize(), 1);
208
209        let inner = throttler.inner.borrow();
210        assert!(inner.is_limiting);
211        assert_eq!(inner.recv_count, 6);
212        assert_eq!(inner.sent_count, 5);
213        assert_eq!(inner.clock.borrow().timer_names(), vec!["buffer_timer"]);
214    }
215
216    #[rstest]
217    fn test_buffering_used_when_sent_to_limit_returns_one(
218        mut test_throttler_buffered: TestThrottler,
219    ) {
220        let throttler = &mut test_throttler_buffered.throttler;
221
222        for _ in 0..5 {
223            throttler.send(42);
224        }
225
226        let inner = throttler.inner.borrow();
227        assert_eq!(inner.used(), 1.0);
228        assert_eq!(inner.recv_count, 5);
229        assert_eq!(inner.sent_count, 5);
230    }
231
232    #[rstest]
233    fn test_buffering_used_when_half_interval_from_limit_returns_one(
234        mut test_throttler_buffered: TestThrottler,
235    ) {
236        let throttler = &mut test_throttler_buffered.throttler;
237
238        for _ in 0..5 {
239            throttler.send(42);
240        }
241
242        let half_interval = test_throttler_buffered.interval / 2;
243        // Advance the clock by half the interval
244        {
245            let mut clock = test_throttler_buffered.clock.borrow_mut();
246            clock.advance_time(half_interval.into(), true);
247        }
248
249        let inner = throttler.inner.borrow();
250        assert_eq!(inner.used(), 1.0);
251        assert_eq!(inner.recv_count, 5);
252        assert_eq!(inner.sent_count, 5);
253    }
254
255    #[rstest]
256    fn test_buffering_used_before_limit_when_halfway_returns_half(
257        mut test_throttler_buffered: TestThrottler,
258    ) {
259        let throttler = &mut test_throttler_buffered.throttler;
260
261        for _ in 0..3 {
262            throttler.send(42);
263        }
264
265        let inner = throttler.inner.borrow();
266        assert_eq!(inner.used(), 0.6);
267        assert_eq!(inner.recv_count, 3);
268        assert_eq!(inner.sent_count, 3);
269    }
270
271    #[rstest]
272    fn test_buffering_refresh_when_at_limit_sends_remaining_items(
273        mut test_throttler_buffered: TestThrottler,
274    ) {
275        let throttler = &mut test_throttler_buffered.throttler;
276
277        for _ in 0..6 {
278            throttler.send(42);
279        }
280
281        // Advance time and process events
282        {
283            let mut clock = test_throttler_buffered.clock.borrow_mut();
284            let time_events = clock.advance_time(test_throttler_buffered.interval.into(), true);
285            for each_event in clock.match_handlers(time_events) {
286                drop(clock); // Release the mutable borrow
287
288                each_event.callback.call(each_event.event);
289
290                // Re-borrow the clock for the next iteration
291                clock = test_throttler_buffered.clock.borrow_mut();
292            }
293        }
294
295        // Assert final state
296        {
297            let inner = throttler.inner.borrow();
298            assert_eq!(inner.used(), 0.2);
299            assert_eq!(inner.recv_count, 6);
300            assert_eq!(inner.sent_count, 6);
301            assert_eq!(inner.qsize(), 0);
302        }
303    }
304
305    #[rstest]
306    fn test_buffering_send_message_after_buffering_message(
307        mut test_throttler_buffered: TestThrottler,
308    ) {
309        let throttler = &mut test_throttler_buffered.throttler;
310
311        for _ in 0..6 {
312            throttler.send(42);
313        }
314
315        // Advance time and process events
316        {
317            let mut clock = test_throttler_buffered.clock.borrow_mut();
318            let time_events = clock.advance_time(test_throttler_buffered.interval.into(), true);
319            for each_event in clock.match_handlers(time_events) {
320                drop(clock); // Release the mutable borrow
321
322                each_event.callback.call(each_event.event);
323
324                // Re-borrow the clock for the next iteration
325                clock = test_throttler_buffered.clock.borrow_mut();
326            }
327        }
328
329        for _ in 0..6 {
330            throttler.send(42);
331        }
332
333        // Assert final state
334        {
335            let inner = throttler.inner.borrow();
336            assert_eq!(inner.used(), 1.0);
337            assert_eq!(inner.recv_count, 12);
338            assert_eq!(inner.sent_count, 10);
339            assert_eq!(inner.qsize(), 2);
340        }
341    }
342
343    #[rstest]
344    fn test_buffering_send_message_after_halfway_after_buffering_message(
345        mut test_throttler_buffered: TestThrottler,
346    ) {
347        let throttler = &mut test_throttler_buffered.throttler;
348
349        for _ in 0..6 {
350            throttler.send(42);
351        }
352
353        // Advance time and process events
354        {
355            let mut clock = test_throttler_buffered.clock.borrow_mut();
356            let time_events = clock.advance_time(test_throttler_buffered.interval.into(), true);
357            for each_event in clock.match_handlers(time_events) {
358                drop(clock); // Release the mutable borrow
359
360                each_event.callback.call(each_event.event);
361
362                // Re-borrow the clock for the next iteration
363                clock = test_throttler_buffered.clock.borrow_mut();
364            }
365        }
366
367        for _ in 0..3 {
368            throttler.send(42);
369        }
370
371        // Assert final state
372        {
373            let inner = throttler.inner.borrow();
374            assert_eq!(inner.used(), 0.8);
375            assert_eq!(inner.recv_count, 9);
376            assert_eq!(inner.sent_count, 9);
377            assert_eq!(inner.qsize(), 0);
378        }
379    }
380
381    #[rstest]
382    fn test_dropping_send_sends_message_to_handler(mut test_throttler_unbuffered: TestThrottler) {
383        let throttler = &mut test_throttler_unbuffered.throttler;
384        throttler.send(42);
385        let inner = throttler.inner.borrow();
386
387        assert!(!inner.is_limiting);
388        assert_eq!(inner.recv_count, 1);
389        assert_eq!(inner.sent_count, 1);
390    }
391
392    #[rstest]
393    fn test_dropping_send_to_limit_drops_message(mut test_throttler_unbuffered: TestThrottler) {
394        let throttler = &mut test_throttler_unbuffered.throttler;
395        for _ in 0..6 {
396            throttler.send(42);
397        }
398        assert_eq!(throttler.qsize(), 0);
399
400        let inner = throttler.inner.borrow();
401        assert!(inner.is_limiting);
402        assert_eq!(inner.used(), 1.0);
403        assert_eq!(inner.clock.borrow().timer_count(), 1);
404        assert_eq!(inner.clock.borrow().timer_names(), vec!["dropper_timer"]);
405        assert_eq!(inner.recv_count, 6);
406        assert_eq!(inner.sent_count, 5);
407    }
408
409    #[rstest]
410    fn test_dropping_advance_time_when_at_limit_dropped_message(
411        mut test_throttler_unbuffered: TestThrottler,
412    ) {
413        let throttler = &mut test_throttler_unbuffered.throttler;
414        for _ in 0..6 {
415            throttler.send(42);
416        }
417
418        // Advance time and process events
419        {
420            let mut clock = test_throttler_unbuffered.clock.borrow_mut();
421            let time_events = clock.advance_time(test_throttler_unbuffered.interval.into(), true);
422            for each_event in clock.match_handlers(time_events) {
423                drop(clock); // Release the mutable borrow
424
425                each_event.callback.call(each_event.event);
426
427                // Re-borrow the clock for the next iteration
428                clock = test_throttler_unbuffered.clock.borrow_mut();
429            }
430        }
431
432        let inner = throttler.inner.borrow();
433        assert_eq!(inner.clock.borrow().timer_count(), 0);
434        assert!(!inner.is_limiting);
435        assert_eq!(inner.used(), 0.0);
436        assert_eq!(inner.recv_count, 6);
437        assert_eq!(inner.sent_count, 5);
438    }
439
440    #[rstest]
441    fn test_dropping_send_message_after_dropping_message(
442        mut test_throttler_unbuffered: TestThrottler,
443    ) {
444        let throttler = &mut test_throttler_unbuffered.throttler;
445        for _ in 0..6 {
446            throttler.send(42);
447        }
448
449        // Advance time and process events
450        {
451            let mut clock = test_throttler_unbuffered.clock.borrow_mut();
452            let time_events = clock.advance_time(test_throttler_unbuffered.interval.into(), true);
453            for each_event in clock.match_handlers(time_events) {
454                drop(clock); // Release the mutable borrow
455
456                each_event.callback.call(each_event.event);
457
458                // Re-borrow the clock for the next iteration
459                clock = test_throttler_unbuffered.clock.borrow_mut();
460            }
461        }
462
463        throttler.send(42);
464
465        let inner = throttler.inner.borrow();
466        assert_eq!(inner.used(), 0.2);
467        assert_eq!(inner.clock.borrow().timer_count(), 0);
468        assert!(!inner.is_limiting);
469        assert_eq!(inner.recv_count, 7);
470        assert_eq!(inner.sent_count, 6);
471    }
472
473    use proptest::prelude::*;
474
475    #[derive(Clone, Debug)]
476    enum ThrottlerInput {
477        SendMessage(u64),
478        AdvanceClock(u8),
479    }
480
481    // Custom strategy for ThrottlerInput
482    fn throttler_input_strategy() -> impl Strategy<Value = ThrottlerInput> {
483        prop_oneof![
484            2 => prop::bool::ANY.prop_map(|_| ThrottlerInput::SendMessage(42)),
485            8 => prop::num::u8::ANY.prop_map(|v| ThrottlerInput::AdvanceClock(v % 5 + 5)),
486        ]
487    }
488
489    // Custom strategy for ThrottlerTest
490    fn throttler_test_strategy() -> impl Strategy<Value = Vec<ThrottlerInput>> {
491        prop::collection::vec(throttler_input_strategy(), 10..=150)
492    }
493
494    fn test_throttler_with_inputs(inputs: Vec<ThrottlerInput>) {
495        let TestThrottler {
496            throttler,
497            clock: test_clock,
498            interval,
499        } = test_throttler_buffered();
500        let mut sent_count = 0;
501
502        for input in inputs {
503            match input {
504                ThrottlerInput::SendMessage(msg) => {
505                    throttler.send(msg);
506                    sent_count += 1;
507                }
508                ThrottlerInput::AdvanceClock(duration) => {
509                    let mut clock_ref = test_clock.borrow_mut();
510                    let current_time = clock_ref.get_time_ns();
511                    let time_events =
512                        clock_ref.advance_time(current_time + u64::from(duration), true);
513                    for each_event in clock_ref.match_handlers(time_events) {
514                        drop(clock_ref);
515                        each_event.callback.call(each_event.event);
516                        clock_ref = test_clock.borrow_mut();
517                    }
518                }
519            }
520
521            // Check the throttler rate limits on the appropriate conditions
522            // * Atleast one message is buffered
523            // * Timestamp queue is filled upto limit
524            // * Least recent timestamp in queue exceeds interval
525            let inner = throttler.inner.borrow();
526            let buffered_messages = inner.qsize() > 0;
527            let now = inner.clock.borrow().timestamp_ns().as_u64();
528            let limit_filled_within_interval = inner
529                .timestamps
530                .get(inner.limit - 1)
531                .is_some_and(|&ts| (now - ts.as_u64()) < interval);
532            let expected_limiting = buffered_messages && limit_filled_within_interval;
533            assert_eq!(inner.is_limiting, expected_limiting);
534
535            // Message conservation
536            let inner = throttler.inner.borrow();
537            assert_eq!(sent_count, inner.sent_count + inner.qsize());
538        }
539
540        // Advance clock by a large amount to process all messages
541        let time_events = test_clock
542            .borrow_mut()
543            .advance_time((interval * 100).into(), true);
544        let mut clock_ref = test_clock.borrow_mut();
545        for each_event in clock_ref.match_handlers(time_events) {
546            drop(clock_ref);
547            each_event.callback.call(each_event.event);
548            clock_ref = test_clock.borrow_mut();
549        }
550        assert_eq!(throttler.qsize(), 0);
551    }
552
553    #[test]
554    #[ignore = "Used for manually testing failing cases"]
555    fn test_case() {
556        let inputs = [
557            ThrottlerInput::SendMessage(42),
558            ThrottlerInput::AdvanceClock(5),
559            ThrottlerInput::SendMessage(42),
560            ThrottlerInput::SendMessage(42),
561            ThrottlerInput::SendMessage(42),
562            ThrottlerInput::SendMessage(42),
563            ThrottlerInput::SendMessage(42),
564            ThrottlerInput::AdvanceClock(5),
565            ThrottlerInput::SendMessage(42),
566            ThrottlerInput::SendMessage(42),
567        ]
568        .to_vec();
569
570        test_throttler_with_inputs(inputs);
571    }
572
573    proptest! {
574        #[test]
575        fn test(inputs in throttler_test_strategy()) {
576            test_throttler_with_inputs(inputs);
577        }
578    }
579}