nautilus_hyperliquid/http/
rate_limits.rs1use std::time::{Duration, Instant};
17
18use serde_json::Value;
19use tokio::sync::Mutex;
20
21#[derive(Debug)]
22pub struct WeightedLimiter {
23 capacity: f64, refill_per_sec: f64, state: Mutex<State>,
26}
27
28#[derive(Debug)]
29struct State {
30 tokens: f64,
31 last_refill: Instant,
32}
33
34impl WeightedLimiter {
35 pub fn per_minute(capacity: u32) -> Self {
36 let cap = capacity as f64;
37 Self {
38 capacity: cap,
39 refill_per_sec: cap / 60.0,
40 state: Mutex::new(State {
41 tokens: cap,
42 last_refill: Instant::now(),
43 }),
44 }
45 }
46
47 pub async fn acquire(&self, weight: u32) {
49 let need = weight as f64;
50 loop {
51 let mut st = self.state.lock().await;
52 Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
53
54 if st.tokens >= need {
55 st.tokens -= need;
56 return;
57 }
58 let deficit = need - st.tokens;
59 let secs = deficit / self.refill_per_sec;
60 drop(st);
61 tokio::time::sleep(Duration::from_secs_f64(secs.max(0.01))).await;
62 }
63 }
64
65 pub async fn debit_extra(&self, extra: u32) {
67 if extra == 0 {
68 return;
69 }
70 let mut st = self.state.lock().await;
71 Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
72 st.tokens = (st.tokens - extra as f64).max(0.0);
73 }
74
75 pub async fn snapshot(&self) -> RateLimitSnapshot {
76 let mut st = self.state.lock().await;
77 Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
78 RateLimitSnapshot {
79 capacity: self.capacity as u32,
80 tokens: st.tokens.max(0.0) as u32,
81 }
82 }
83
84 fn refill_locked(st: &mut State, per_sec: f64, cap: f64) {
85 let dt = Instant::now().duration_since(st.last_refill).as_secs_f64();
86 if dt > 0.0 {
87 st.tokens = (st.tokens + dt * per_sec).min(cap);
88 st.last_refill = Instant::now();
89 }
90 }
91}
92
93#[derive(Debug, Clone, Copy)]
94pub struct RateLimitSnapshot {
95 pub capacity: u32,
96 pub tokens: u32,
97}
98
99pub fn backoff_full_jitter(attempt: u32, base: Duration, cap: Duration) -> Duration {
100 use std::{
101 collections::hash_map::DefaultHasher,
102 hash::{Hash, Hasher},
103 };
104
105 let mut hasher = DefaultHasher::new();
107 attempt.hash(&mut hasher);
108 Instant::now().elapsed().as_nanos().hash(&mut hasher);
109 let hash = hasher.finish();
110
111 let max = (base.as_millis() as u64)
112 .saturating_mul(1u64 << attempt.min(16))
113 .min(cap.as_millis() as u64)
114 .max(base.as_millis() as u64);
115 Duration::from_millis(hash % max)
116}
117
118pub fn info_base_weight(req: &crate::http::query::InfoRequest) -> u32 {
121 match req.request_type.as_str() {
122 "l2Book"
124 | "allMids"
125 | "clearinghouseState"
126 | "orderStatus"
127 | "spotClearinghouseState"
128 | "exchangeStatus" => 2,
129 "userRole" => 60,
131 _ => 20,
133 }
134}
135
136pub fn info_extra_weight(req: &crate::http::query::InfoRequest, json: &Value) -> u32 {
139 let items = match json {
140 Value::Array(a) => a.len(),
141 Value::Object(m) => m
142 .values()
143 .filter_map(|v| v.as_array().map(|a| a.len()))
144 .max()
145 .unwrap_or(0),
146 _ => 0,
147 };
148
149 let unit = match req.request_type.as_str() {
150 "candleSnapshot" => 60usize, "recentTrades"
152 | "historicalOrders"
153 | "userFills"
154 | "userFillsByTime"
155 | "fundingHistory"
156 | "userFunding"
157 | "nonUserFundingUpdates"
158 | "twapHistory"
159 | "userTwapSliceFills"
160 | "userTwapSliceFillsByTime"
161 | "delegatorHistory"
162 | "delegatorRewards"
163 | "validatorStats" => 20usize, _ => return 0,
165 };
166 (items / unit) as u32
167}
168
169pub fn exchange_weight(action: &crate::http::query::ExchangeAction) -> u32 {
171 use crate::http::query::ExchangeActionParams;
172
173 let batch_size = match &action.params {
175 ExchangeActionParams::Order(params) => params.orders.len(),
176 ExchangeActionParams::Cancel(params) => params.cancels.len(),
177 ExchangeActionParams::Modify(_) => {
178 1
180 }
181 ExchangeActionParams::UpdateLeverage(_) | ExchangeActionParams::UpdateIsolatedMargin(_) => {
182 0
183 }
184 };
185 1 + (batch_size as u32 / 40)
186}
187
188#[cfg(test)]
193mod tests {
194 use rstest::rstest;
195 use serde_json::json;
196
197 use super::*;
198 use crate::http::query::{
199 CancelParams, ExchangeAction, ExchangeActionParams, ExchangeActionType, InfoRequest,
200 InfoRequestParams, L2BookParams, OrderParams, UpdateLeverageParams, UserFillsParams,
201 };
202
203 #[rstest]
204 #[case(1, 1)]
205 #[case(39, 1)]
206 #[case(40, 2)]
207 #[case(79, 2)]
208 #[case(80, 3)]
209 fn test_exchange_weight_order_steps_every_40(
210 #[case] array_len: usize,
211 #[case] expected_weight: u32,
212 ) {
213 use rust_decimal::Decimal;
214
215 use super::super::models::{
216 Cloid, HyperliquidExecLimitParams, HyperliquidExecOrderKind,
217 HyperliquidExecPlaceOrderRequest, HyperliquidExecTif,
218 };
219
220 let orders: Vec<HyperliquidExecPlaceOrderRequest> = (0..array_len)
221 .map(|_| HyperliquidExecPlaceOrderRequest {
222 asset: 0,
223 is_buy: true,
224 price: Decimal::new(50000, 0),
225 size: Decimal::new(1, 0),
226 reduce_only: false,
227 kind: HyperliquidExecOrderKind::Limit {
228 limit: HyperliquidExecLimitParams {
229 tif: HyperliquidExecTif::Gtc,
230 },
231 },
232 cloid: Some(Cloid::from_hex("0x00000000000000000000000000000000").unwrap()),
233 })
234 .collect();
235
236 let action = ExchangeAction {
237 action_type: ExchangeActionType::Order,
238 params: ExchangeActionParams::Order(OrderParams {
239 orders,
240 grouping: "na".to_string(),
241 }),
242 };
243 assert_eq!(exchange_weight(&action), expected_weight);
244 }
245
246 #[rstest]
247 fn test_exchange_weight_cancel() {
248 use super::super::models::{Cloid, HyperliquidExecCancelByCloidRequest};
249
250 let cancels: Vec<HyperliquidExecCancelByCloidRequest> = (0..40)
251 .map(|_| HyperliquidExecCancelByCloidRequest {
252 asset: 0,
253 cloid: Cloid::from_hex("0x00000000000000000000000000000000").unwrap(),
254 })
255 .collect();
256
257 let action = ExchangeAction {
258 action_type: ExchangeActionType::Cancel,
259 params: ExchangeActionParams::Cancel(CancelParams { cancels }),
260 };
261 assert_eq!(exchange_weight(&action), 2);
262 }
263
264 #[rstest]
265 fn test_exchange_weight_non_batch_action() {
266 let update_leverage = ExchangeAction {
267 action_type: ExchangeActionType::UpdateLeverage,
268 params: ExchangeActionParams::UpdateLeverage(UpdateLeverageParams {
269 asset: 1,
270 is_cross: true,
271 leverage: 10,
272 }),
273 };
274 assert_eq!(exchange_weight(&update_leverage), 1);
275 }
276
277 #[rstest]
278 #[case("l2Book", 2)]
279 #[case("allMids", 2)]
280 #[case("clearinghouseState", 2)]
281 #[case("orderStatus", 2)]
282 #[case("spotClearinghouseState", 2)]
283 #[case("exchangeStatus", 2)]
284 #[case("userRole", 60)]
285 #[case("userFills", 20)]
286 #[case("unknownEndpoint", 20)]
287 fn test_info_base_weights(#[case] request_type: &str, #[case] expected_weight: u32) {
288 let request = InfoRequest {
289 request_type: request_type.to_string(),
290 params: InfoRequestParams::L2Book(L2BookParams {
291 coin: "BTC".to_string(),
292 }),
293 };
294 assert_eq!(info_base_weight(&request), expected_weight);
295 }
296
297 #[rstest]
298 fn test_info_extra_weight_no_charging() {
299 let l2_book = InfoRequest {
300 request_type: "l2Book".to_string(),
301 params: InfoRequestParams::L2Book(L2BookParams {
302 coin: "BTC".to_string(),
303 }),
304 };
305 let large_json = json!(vec![1; 1000]);
306 assert_eq!(info_extra_weight(&l2_book, &large_json), 0);
307 }
308
309 #[rstest]
310 fn test_info_extra_weight_complex_json() {
311 let user_fills = InfoRequest {
312 request_type: "userFills".to_string(),
313 params: InfoRequestParams::UserFills(UserFillsParams {
314 user: "0x123".to_string(),
315 }),
316 };
317 let complex_json = json!({
318 "fills": vec![1; 40],
319 "orders": vec![1; 20],
320 "other": "data"
321 });
322 assert_eq!(info_extra_weight(&user_fills, &complex_json), 2); }
324
325 #[tokio::test]
326 async fn test_limiter_roughly_caps_to_capacity() {
327 let limiter = WeightedLimiter::per_minute(1200);
328
329 for _ in 0..60 {
331 limiter.acquire(20).await; }
333
334 let t0 = std::time::Instant::now();
336 limiter.acquire(20).await;
337 let elapsed = t0.elapsed();
338
339 assert!(
341 elapsed.as_millis() >= 500,
342 "Expected significant delay, got {}ms",
343 elapsed.as_millis()
344 );
345 }
346
347 #[tokio::test]
348 async fn test_limiter_debit_extra_works() {
349 let limiter = WeightedLimiter::per_minute(100);
350
351 let snapshot = limiter.snapshot().await;
353 assert_eq!(snapshot.capacity, 100);
354 assert_eq!(snapshot.tokens, 100);
355
356 limiter.acquire(30).await;
358 let snapshot = limiter.snapshot().await;
359 assert_eq!(snapshot.tokens, 70);
360
361 limiter.debit_extra(20).await;
363 let snapshot = limiter.snapshot().await;
364 assert_eq!(snapshot.tokens, 50);
365
366 limiter.debit_extra(100).await;
368 let snapshot = limiter.snapshot().await;
369 assert_eq!(snapshot.tokens, 0);
370 }
371
372 #[rstest]
373 #[case(0, 100)]
374 #[case(1, 200)]
375 #[case(2, 400)]
376 fn test_backoff_full_jitter_increases(#[case] attempt: u32, #[case] max_expected_ms: u64) {
377 let base = Duration::from_millis(100);
378 let cap = Duration::from_secs(5);
379
380 let delay = backoff_full_jitter(attempt, base, cap);
381
382 assert!(delay.as_millis() <= max_expected_ms as u128);
384 }
385
386 #[rstest]
387 fn test_backoff_full_jitter_respects_cap() {
388 let base = Duration::from_millis(100);
389 let cap = Duration::from_secs(5);
390
391 let delay_high = backoff_full_jitter(10, base, cap);
392 assert!(delay_high.as_millis() <= cap.as_millis());
393 }
394}