nautilus_common/throttler/
inner.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cell::RefCell, collections::VecDeque, fmt::Debug, rc::Rc};
17
18use nautilus_core::{correctness::FAILED, UnixNanos};
19
20use super::Throttler;
21use crate::{clock::Clock, timer::TimeEventCallback};
22
23/// Throttler rate limits messages by dropping or buffering them.
24///
25/// Throttler takes messages of type T and callback of type F for dropping
26/// or processing messages.
27pub struct InnerThrottler<T, F> {
28    /// The number of messages received.
29    pub recv_count: usize,
30    /// The number of messages sent.
31    pub sent_count: usize,
32    /// Whether the throttler is currently limiting the message rate.
33    pub is_limiting: bool,
34    /// The maximum number of messages that can be sent within the interval.
35    pub limit: usize,
36    /// The buffer of messages to be sent.
37    pub buffer: VecDeque<T>,
38    /// The timestamps of the sent messages.
39    pub timestamps: VecDeque<UnixNanos>,
40    /// The clock used to keep track of time.
41    pub clock: Rc<RefCell<dyn Clock>>,
42    /// The interval between messages in nanoseconds.
43    interval: u64,
44    /// The name of the timer.
45    timer_name: String,
46    /// The callback to send a message.
47    output_send: F,
48    /// The callback to drop a message.
49    output_drop: Option<F>,
50}
51
52impl<T, F> Debug for InnerThrottler<T, F>
53where
54    T: Debug,
55{
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        f.debug_struct(stringify!(InnerThrottler))
58            .field("recv_count", &self.recv_count)
59            .field("sent_count", &self.sent_count)
60            .field("is_limiting", &self.is_limiting)
61            .field("limit", &self.limit)
62            .field("buffer", &self.buffer)
63            .field("timestamps", &self.timestamps)
64            .field("interval", &self.interval)
65            .field("timer_name", &self.timer_name)
66            .finish()
67    }
68}
69
70impl<T, F> InnerThrottler<T, F> {
71    /// Creates a new [`InnerThrottler`] instance.
72    #[inline]
73    pub fn new(
74        limit: usize,
75        interval: u64,
76        clock: Rc<RefCell<dyn Clock>>,
77        timer_name: String,
78        output_send: F,
79        output_drop: Option<F>,
80    ) -> Self {
81        Self {
82            recv_count: 0,
83            sent_count: 0,
84            is_limiting: false,
85            limit,
86            buffer: VecDeque::new(),
87            timestamps: VecDeque::with_capacity(limit),
88            clock,
89            interval,
90            timer_name,
91            output_send,
92            output_drop,
93        }
94    }
95
96    /// Set timer with a callback to be triggered on next interval.
97    ///
98    /// Typically used to register callbacks:
99    /// - [`super::callbacks::ThrottlerProcess`] to process buffered messages
100    /// - [`super::callbacks::ThrottlerResume`] to stop buffering
101    #[inline]
102    pub fn set_timer(&mut self, callback: Option<TimeEventCallback>) {
103        let delta = self.delta_next();
104        let mut clock = self.clock.borrow_mut();
105        if clock.timer_names().contains(&self.timer_name.as_str()) {
106            clock.cancel_timer(&self.timer_name);
107        }
108        let alert_ts = clock.timestamp_ns() + delta;
109
110        clock
111            .set_time_alert_ns(&self.timer_name, alert_ts, callback)
112            .expect(FAILED);
113    }
114
115    /// Time delta when the next message can be sent.
116    #[inline]
117    pub fn delta_next(&mut self) -> u64 {
118        match self.timestamps.get(self.limit - 1) {
119            Some(ts) => {
120                let diff = self.clock.borrow().timestamp_ns().as_u64() - ts.as_u64();
121                self.interval.saturating_sub(diff)
122            }
123            None => 0,
124        }
125    }
126
127    /// Reset the throttler which clears internal state.
128    #[inline]
129    pub fn reset(&mut self) {
130        self.buffer.clear();
131        self.recv_count = 0;
132        self.sent_count = 0;
133        self.is_limiting = false;
134        self.timestamps.clear();
135    }
136
137    /// Fractional value of rate limit consumed in current interval.
138    #[inline]
139    pub fn used(&self) -> f64 {
140        if self.timestamps.is_empty() {
141            return 0.0;
142        }
143
144        let now = self.clock.borrow().timestamp_ns().as_i64();
145        let interval_start = now - self.interval as i64;
146
147        let messages_in_current_interval = self
148            .timestamps
149            .iter()
150            .take_while(|&&ts| ts.as_i64() > interval_start)
151            .count();
152
153        (messages_in_current_interval as f64) / (self.limit as f64)
154    }
155
156    /// Number of messages queued in buffer.
157    #[inline]
158    pub fn qsize(&self) -> usize {
159        self.buffer.len()
160    }
161}
162
163impl<T, F> InnerThrottler<T, F>
164where
165    T: 'static,
166    F: Fn(T) + 'static,
167{
168    #[inline]
169    pub fn send_msg(&mut self, msg: T) {
170        let now = self.clock.borrow().timestamp_ns();
171
172        if self.timestamps.len() >= self.limit {
173            self.timestamps.pop_back();
174        }
175        self.timestamps.push_front(now);
176
177        (self.output_send)(msg);
178        self.sent_count += 1;
179    }
180
181    #[inline]
182    pub fn limit_msg(&mut self, msg: T, throttler: Throttler<T, F>) {
183        let callback = if self.output_drop.is_none() {
184            self.buffer.push_front(msg);
185            log::debug!("Buffering {}", self.buffer.len());
186            Some(throttler.get_process_callback().into())
187        } else {
188            log::debug!("Dropping");
189            if let Some(drop) = &self.output_drop {
190                drop(msg);
191            }
192            Some(throttler.get_resume_callback().into())
193        };
194        if !self.is_limiting {
195            log::debug!("Limiting");
196            self.set_timer(callback);
197            self.is_limiting = true;
198        }
199    }
200}