nautilus_common/throttler/
inner.rs1use std::{cell::RefCell, collections::VecDeque, fmt::Debug, rc::Rc};
17
18use nautilus_core::{correctness::FAILED, UnixNanos};
19
20use super::Throttler;
21use crate::{clock::Clock, timer::TimeEventCallback};
22
23pub struct InnerThrottler<T, F> {
28 pub recv_count: usize,
30 pub sent_count: usize,
32 pub is_limiting: bool,
34 pub limit: usize,
36 pub buffer: VecDeque<T>,
38 pub timestamps: VecDeque<UnixNanos>,
40 pub clock: Rc<RefCell<dyn Clock>>,
42 interval: u64,
44 timer_name: String,
46 output_send: F,
48 output_drop: Option<F>,
50}
51
52impl<T, F> Debug for InnerThrottler<T, F>
53where
54 T: Debug,
55{
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 f.debug_struct(stringify!(InnerThrottler))
58 .field("recv_count", &self.recv_count)
59 .field("sent_count", &self.sent_count)
60 .field("is_limiting", &self.is_limiting)
61 .field("limit", &self.limit)
62 .field("buffer", &self.buffer)
63 .field("timestamps", &self.timestamps)
64 .field("interval", &self.interval)
65 .field("timer_name", &self.timer_name)
66 .finish()
67 }
68}
69
70impl<T, F> InnerThrottler<T, F> {
71 #[inline]
73 pub fn new(
74 limit: usize,
75 interval: u64,
76 clock: Rc<RefCell<dyn Clock>>,
77 timer_name: String,
78 output_send: F,
79 output_drop: Option<F>,
80 ) -> Self {
81 Self {
82 recv_count: 0,
83 sent_count: 0,
84 is_limiting: false,
85 limit,
86 buffer: VecDeque::new(),
87 timestamps: VecDeque::with_capacity(limit),
88 clock,
89 interval,
90 timer_name,
91 output_send,
92 output_drop,
93 }
94 }
95
96 #[inline]
102 pub fn set_timer(&mut self, callback: Option<TimeEventCallback>) {
103 let delta = self.delta_next();
104 let mut clock = self.clock.borrow_mut();
105 if clock.timer_names().contains(&self.timer_name.as_str()) {
106 clock.cancel_timer(&self.timer_name);
107 }
108 let alert_ts = clock.timestamp_ns() + delta;
109
110 clock
111 .set_time_alert_ns(&self.timer_name, alert_ts, callback)
112 .expect(FAILED);
113 }
114
115 #[inline]
117 pub fn delta_next(&mut self) -> u64 {
118 match self.timestamps.get(self.limit - 1) {
119 Some(ts) => {
120 let diff = self.clock.borrow().timestamp_ns().as_u64() - ts.as_u64();
121 self.interval.saturating_sub(diff)
122 }
123 None => 0,
124 }
125 }
126
127 #[inline]
129 pub fn reset(&mut self) {
130 self.buffer.clear();
131 self.recv_count = 0;
132 self.sent_count = 0;
133 self.is_limiting = false;
134 self.timestamps.clear();
135 }
136
137 #[inline]
139 pub fn used(&self) -> f64 {
140 if self.timestamps.is_empty() {
141 return 0.0;
142 }
143
144 let now = self.clock.borrow().timestamp_ns().as_i64();
145 let interval_start = now - self.interval as i64;
146
147 let messages_in_current_interval = self
148 .timestamps
149 .iter()
150 .take_while(|&&ts| ts.as_i64() > interval_start)
151 .count();
152
153 (messages_in_current_interval as f64) / (self.limit as f64)
154 }
155
156 #[inline]
158 pub fn qsize(&self) -> usize {
159 self.buffer.len()
160 }
161}
162
163impl<T, F> InnerThrottler<T, F>
164where
165 T: 'static,
166 F: Fn(T) + 'static,
167{
168 #[inline]
169 pub fn send_msg(&mut self, msg: T) {
170 let now = self.clock.borrow().timestamp_ns();
171
172 if self.timestamps.len() >= self.limit {
173 self.timestamps.pop_back();
174 }
175 self.timestamps.push_front(now);
176
177 (self.output_send)(msg);
178 self.sent_count += 1;
179 }
180
181 #[inline]
182 pub fn limit_msg(&mut self, msg: T, throttler: Throttler<T, F>) {
183 let callback = if self.output_drop.is_none() {
184 self.buffer.push_front(msg);
185 log::debug!("Buffering {}", self.buffer.len());
186 Some(throttler.get_process_callback().into())
187 } else {
188 log::debug!("Dropping");
189 if let Some(drop) = &self.output_drop {
190 drop(msg);
191 }
192 Some(throttler.get_resume_callback().into())
193 };
194 if !self.is_limiting {
195 log::debug!("Limiting");
196 self.set_timer(callback);
197 self.is_limiting = true;
198 }
199 }
200}