nautilus_hyperliquid/http/
rate_limits.rs1use std::time::{Duration, Instant};
17
18use serde_json::Value;
19use tokio::sync::Mutex;
20
21#[derive(Debug)]
22pub struct WeightedLimiter {
23 capacity: f64, refill_per_sec: f64, state: Mutex<State>,
26}
27
28#[derive(Debug)]
29struct State {
30 tokens: f64,
31 last_refill: Instant,
32}
33
34impl WeightedLimiter {
35 pub fn per_minute(capacity: u32) -> Self {
36 let cap = capacity as f64;
37 Self {
38 capacity: cap,
39 refill_per_sec: cap / 60.0,
40 state: Mutex::new(State {
41 tokens: cap,
42 last_refill: Instant::now(),
43 }),
44 }
45 }
46
47 pub async fn acquire(&self, weight: u32) {
49 let need = weight as f64;
50 loop {
51 let mut st = self.state.lock().await;
52 Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
53
54 if st.tokens >= need {
55 st.tokens -= need;
56 return;
57 }
58 let deficit = need - st.tokens;
59 let secs = deficit / self.refill_per_sec;
60 drop(st);
61 tokio::time::sleep(Duration::from_secs_f64(secs.max(0.01))).await;
62 }
63 }
64
65 pub async fn debit_extra(&self, extra: u32) {
67 if extra == 0 {
68 return;
69 }
70 let mut st = self.state.lock().await;
71 Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
72 st.tokens = (st.tokens - extra as f64).max(0.0);
73 }
74
75 pub async fn snapshot(&self) -> RateLimitSnapshot {
76 let mut st = self.state.lock().await;
77 Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
78 RateLimitSnapshot {
79 capacity: self.capacity as u32,
80 tokens: st.tokens.max(0.0) as u32,
81 }
82 }
83
84 fn refill_locked(st: &mut State, per_sec: f64, cap: f64) {
85 let dt = Instant::now().duration_since(st.last_refill).as_secs_f64();
86 if dt > 0.0 {
87 st.tokens = (st.tokens + dt * per_sec).min(cap);
88 st.last_refill = Instant::now();
89 }
90 }
91}
92
93#[derive(Debug, Clone, Copy)]
94pub struct RateLimitSnapshot {
95 pub capacity: u32,
96 pub tokens: u32,
97}
98
99pub fn backoff_full_jitter(attempt: u32, base: Duration, cap: Duration) -> Duration {
100 use std::{
101 collections::hash_map::DefaultHasher,
102 hash::{Hash, Hasher},
103 };
104
105 let mut hasher = DefaultHasher::new();
107 attempt.hash(&mut hasher);
108 Instant::now().elapsed().as_nanos().hash(&mut hasher);
109 let hash = hasher.finish();
110
111 let max = (base.as_millis() as u64)
112 .saturating_mul(1u64 << attempt.min(16))
113 .min(cap.as_millis() as u64)
114 .max(base.as_millis() as u64);
115 Duration::from_millis(hash % max)
116}
117
118pub fn info_base_weight(req: &crate::http::query::InfoRequest) -> u32 {
121 match req.request_type.as_str() {
122 "l2Book"
124 | "allMids"
125 | "clearinghouseState"
126 | "orderStatus"
127 | "spotClearinghouseState"
128 | "exchangeStatus" => 2,
129 "userRole" => 60,
131 _ => 20,
133 }
134}
135
136pub fn info_extra_weight(req: &crate::http::query::InfoRequest, json: &Value) -> u32 {
139 let items = match json {
140 Value::Array(a) => a.len(),
141 Value::Object(m) => m
142 .values()
143 .filter_map(|v| v.as_array().map(|a| a.len()))
144 .max()
145 .unwrap_or(0),
146 _ => 0,
147 };
148
149 let unit = match req.request_type.as_str() {
150 "candleSnapshot" => 60usize, "recentTrades"
152 | "historicalOrders"
153 | "userFills"
154 | "userFillsByTime"
155 | "fundingHistory"
156 | "userFunding"
157 | "nonUserFundingUpdates"
158 | "twapHistory"
159 | "userTwapSliceFills"
160 | "userTwapSliceFillsByTime"
161 | "delegatorHistory"
162 | "delegatorRewards"
163 | "validatorStats" => 20usize, _ => return 0,
165 };
166 (items / unit) as u32
167}
168
169pub fn exchange_weight(action: &crate::http::query::ExchangeAction) -> u32 {
171 let batch_size = match action.action_type.as_str() {
174 "order" => {
175 if let Some(orders) = action.params.get("orders") {
176 orders.as_array().map(|a| a.len()).unwrap_or(0)
177 } else {
178 0
179 }
180 }
181 "cancel" => {
182 if let Some(cancels) = action.params.get("cancels") {
183 cancels.as_array().map(|a| a.len()).unwrap_or(0)
184 } else {
185 0
186 }
187 }
188 "batchModify" => {
189 if let Some(modifies) = action.params.get("modifies") {
190 modifies.as_array().map(|a| a.len()).unwrap_or(0)
191 } else {
192 0
193 }
194 }
195 _ => 0,
196 };
197 1 + (batch_size as u32 / 40)
198}
199
200#[cfg(test)]
205mod tests {
206 use rstest::rstest;
207 use serde_json::json;
208
209 use super::*;
210 use crate::http::query::{ExchangeAction, InfoRequest};
211
212 #[rstest]
213 #[case("order", "orders", 1, 1)]
214 #[case("order", "orders", 39, 1)]
215 #[case("order", "orders", 40, 2)]
216 #[case("order", "orders", 79, 2)]
217 #[case("order", "orders", 80, 3)]
218 #[case("cancel", "cancels", 40, 2)]
219 #[case("batchModify", "modifies", 40, 2)]
220 fn test_exchange_weight_steps_every_40(
221 #[case] action_type: &str,
222 #[case] array_key: &str,
223 #[case] array_len: usize,
224 #[case] expected_weight: u32,
225 ) {
226 let action = ExchangeAction {
227 action_type: action_type.to_string(),
228 params: json!({ array_key: vec![1; array_len] }),
229 };
230 assert_eq!(exchange_weight(&action), expected_weight);
231 }
232
233 #[rstest]
234 fn test_exchange_weight_non_batch_action() {
235 let update_leverage = ExchangeAction {
236 action_type: "updateLeverage".to_string(),
237 params: json!({ "asset": 1, "isCross": true, "leverage": 10 }),
238 };
239 assert_eq!(exchange_weight(&update_leverage), 1);
240 }
241
242 #[rstest]
243 #[case("l2Book", 2)]
244 #[case("allMids", 2)]
245 #[case("clearinghouseState", 2)]
246 #[case("orderStatus", 2)]
247 #[case("spotClearinghouseState", 2)]
248 #[case("exchangeStatus", 2)]
249 #[case("userRole", 60)]
250 #[case("userFills", 20)]
251 #[case("unknownEndpoint", 20)]
252 fn test_info_base_weights(#[case] request_type: &str, #[case] expected_weight: u32) {
253 let request = InfoRequest {
254 request_type: request_type.to_string(),
255 params: json!({ "coin": "BTC" }),
256 };
257 assert_eq!(info_base_weight(&request), expected_weight);
258 }
259
260 #[rstest]
261 fn test_info_extra_weight_no_charging() {
262 let l2_book = InfoRequest {
263 request_type: "l2Book".to_string(),
264 params: json!({ "coin": "BTC" }),
265 };
266 let large_json = json!(vec![1; 1000]);
267 assert_eq!(info_extra_weight(&l2_book, &large_json), 0);
268 }
269
270 #[rstest]
271 fn test_info_extra_weight_complex_json() {
272 let user_fills = InfoRequest {
273 request_type: "userFills".to_string(),
274 params: json!({ "user": "0x123" }),
275 };
276 let complex_json = json!({
277 "fills": vec![1; 40],
278 "orders": vec![1; 20],
279 "other": "data"
280 });
281 assert_eq!(info_extra_weight(&user_fills, &complex_json), 2); }
283
284 #[tokio::test]
285 async fn test_limiter_roughly_caps_to_capacity() {
286 let limiter = WeightedLimiter::per_minute(1200);
287
288 for _ in 0..60 {
290 limiter.acquire(20).await; }
292
293 let t0 = std::time::Instant::now();
295 limiter.acquire(20).await;
296 let elapsed = t0.elapsed();
297
298 assert!(
300 elapsed.as_millis() >= 500,
301 "Expected significant delay, got {}ms",
302 elapsed.as_millis()
303 );
304 }
305
306 #[tokio::test]
307 async fn test_limiter_debit_extra_works() {
308 let limiter = WeightedLimiter::per_minute(100);
309
310 let snapshot = limiter.snapshot().await;
312 assert_eq!(snapshot.capacity, 100);
313 assert_eq!(snapshot.tokens, 100);
314
315 limiter.acquire(30).await;
317 let snapshot = limiter.snapshot().await;
318 assert_eq!(snapshot.tokens, 70);
319
320 limiter.debit_extra(20).await;
322 let snapshot = limiter.snapshot().await;
323 assert_eq!(snapshot.tokens, 50);
324
325 limiter.debit_extra(100).await;
327 let snapshot = limiter.snapshot().await;
328 assert_eq!(snapshot.tokens, 0);
329 }
330
331 #[rstest]
332 #[case(0, 100)]
333 #[case(1, 200)]
334 #[case(2, 400)]
335 fn test_backoff_full_jitter_increases(#[case] attempt: u32, #[case] max_expected_ms: u64) {
336 let base = Duration::from_millis(100);
337 let cap = Duration::from_secs(5);
338
339 let delay = backoff_full_jitter(attempt, base, cap);
340
341 assert!(delay.as_millis() <= max_expected_ms as u128);
343 }
344
345 #[rstest]
346 fn test_backoff_full_jitter_respects_cap() {
347 let base = Duration::from_millis(100);
348 let cap = Duration::from_secs(5);
349
350 let delay_high = backoff_full_jitter(10, base, cap);
351 assert!(delay_high.as_millis() <= cap.as_millis());
352 }
353}