nautilus_hyperliquid/http/
rate_limits.rs1use std::time::{Duration, Instant};
17
18use serde_json::Value;
19
20#[derive(Debug)]
21pub struct WeightedLimiter {
22 capacity: f64, refill_per_sec: f64, state: tokio::sync::Mutex<State>,
25}
26
27#[derive(Debug)]
28struct State {
29 tokens: f64,
30 last_refill: Instant,
31}
32
33impl WeightedLimiter {
34 pub fn per_minute(capacity: u32) -> Self {
35 let cap = capacity as f64;
36 Self {
37 capacity: cap,
38 refill_per_sec: cap / 60.0,
39 state: tokio::sync::Mutex::new(State {
40 tokens: cap,
41 last_refill: Instant::now(),
42 }),
43 }
44 }
45
46 pub async fn acquire(&self, weight: u32) {
48 let need = weight as f64;
49 loop {
50 let mut st = self.state.lock().await;
51 Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
52
53 if st.tokens >= need {
54 st.tokens -= need;
55 return;
56 }
57 let deficit = need - st.tokens;
58 let secs = deficit / self.refill_per_sec;
59 drop(st);
60 tokio::time::sleep(Duration::from_secs_f64(secs.max(0.01))).await;
61 }
62 }
63
64 pub async fn debit_extra(&self, extra: u32) {
66 if extra == 0 {
67 return;
68 }
69 let mut st = self.state.lock().await;
70 Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
71 st.tokens = (st.tokens - extra as f64).max(0.0);
72 }
73
74 pub async fn snapshot(&self) -> RateLimitSnapshot {
75 let mut st = self.state.lock().await;
76 Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
77 RateLimitSnapshot {
78 capacity: self.capacity as u32,
79 tokens: st.tokens.max(0.0) as u32,
80 }
81 }
82
83 fn refill_locked(st: &mut State, per_sec: f64, cap: f64) {
84 let dt = Instant::now().duration_since(st.last_refill).as_secs_f64();
85 if dt > 0.0 {
86 st.tokens = (st.tokens + dt * per_sec).min(cap);
87 st.last_refill = Instant::now();
88 }
89 }
90}
91
92#[derive(Debug, Clone, Copy)]
93pub struct RateLimitSnapshot {
94 pub capacity: u32,
95 pub tokens: u32,
96}
97
98pub fn backoff_full_jitter(attempt: u32, base: Duration, cap: Duration) -> Duration {
99 use std::{
100 collections::hash_map::DefaultHasher,
101 hash::{Hash, Hasher},
102 };
103
104 let mut hasher = DefaultHasher::new();
106 attempt.hash(&mut hasher);
107 Instant::now().elapsed().as_nanos().hash(&mut hasher);
108 let hash = hasher.finish();
109
110 let max = (base.as_millis() as u64)
111 .saturating_mul(1u64 << attempt.min(16))
112 .min(cap.as_millis() as u64)
113 .max(base.as_millis() as u64);
114 Duration::from_millis(hash % max)
115}
116
117pub fn info_base_weight(req: &crate::http::query::InfoRequest) -> u32 {
120 match req.request_type.as_str() {
121 "l2Book"
123 | "allMids"
124 | "clearinghouseState"
125 | "orderStatus"
126 | "spotClearinghouseState"
127 | "exchangeStatus" => 2,
128 "userRole" => 60,
130 _ => 20,
132 }
133}
134
135pub fn info_extra_weight(req: &crate::http::query::InfoRequest, json: &Value) -> u32 {
138 let items = match json {
139 Value::Array(a) => a.len(),
140 Value::Object(m) => m
141 .values()
142 .filter_map(|v| v.as_array().map(|a| a.len()))
143 .max()
144 .unwrap_or(0),
145 _ => 0,
146 };
147
148 let unit = match req.request_type.as_str() {
149 "candleSnapshot" => 60usize, "recentTrades"
151 | "historicalOrders"
152 | "userFills"
153 | "userFillsByTime"
154 | "fundingHistory"
155 | "userFunding"
156 | "nonUserFundingUpdates"
157 | "twapHistory"
158 | "userTwapSliceFills"
159 | "userTwapSliceFillsByTime"
160 | "delegatorHistory"
161 | "delegatorRewards"
162 | "validatorStats" => 20usize, _ => return 0,
164 };
165 (items / unit) as u32
166}
167
168pub fn exchange_weight(action: &crate::http::query::ExchangeAction) -> u32 {
170 use crate::http::query::ExchangeActionParams;
171
172 let batch_size = match &action.params {
174 ExchangeActionParams::Order(params) => params.orders.len(),
175 ExchangeActionParams::Cancel(params) => params.cancels.len(),
176 ExchangeActionParams::Modify(_) => {
177 1
179 }
180 ExchangeActionParams::UpdateLeverage(_) | ExchangeActionParams::UpdateIsolatedMargin(_) => {
181 0
182 }
183 };
184 1 + (batch_size as u32 / 40)
185}
186
187#[cfg(test)]
188mod tests {
189 use rstest::rstest;
190
191 use super::*;
192 use crate::http::query::{
193 CancelParams, ExchangeAction, ExchangeActionParams, ExchangeActionType, OrderParams,
194 UpdateLeverageParams,
195 };
196
197 #[rstest]
198 #[case(1, 1)]
199 #[case(39, 1)]
200 #[case(40, 2)]
201 #[case(79, 2)]
202 #[case(80, 3)]
203 fn test_exchange_weight_order_steps_every_40(
204 #[case] array_len: usize,
205 #[case] expected_weight: u32,
206 ) {
207 use rust_decimal::Decimal;
208
209 use super::super::models::{
210 Cloid, HyperliquidExecGrouping, HyperliquidExecLimitParams, HyperliquidExecOrderKind,
211 HyperliquidExecPlaceOrderRequest, HyperliquidExecTif,
212 };
213
214 let orders: Vec<HyperliquidExecPlaceOrderRequest> = (0..array_len)
215 .map(|_| HyperliquidExecPlaceOrderRequest {
216 asset: 0,
217 is_buy: true,
218 price: Decimal::new(50000, 0),
219 size: Decimal::new(1, 0),
220 reduce_only: false,
221 kind: HyperliquidExecOrderKind::Limit {
222 limit: HyperliquidExecLimitParams {
223 tif: HyperliquidExecTif::Gtc,
224 },
225 },
226 cloid: Some(Cloid::from_hex("0x00000000000000000000000000000000").unwrap()),
227 })
228 .collect();
229
230 let action = ExchangeAction {
231 action_type: ExchangeActionType::Order,
232 params: ExchangeActionParams::Order(OrderParams {
233 orders,
234 grouping: HyperliquidExecGrouping::Na,
235 }),
236 };
237 assert_eq!(exchange_weight(&action), expected_weight);
238 }
239
240 #[rstest]
241 fn test_exchange_weight_cancel() {
242 use super::super::models::{Cloid, HyperliquidExecCancelByCloidRequest};
243
244 let cancels: Vec<HyperliquidExecCancelByCloidRequest> = (0..40)
245 .map(|_| HyperliquidExecCancelByCloidRequest {
246 asset: 0,
247 cloid: Cloid::from_hex("0x00000000000000000000000000000000").unwrap(),
248 })
249 .collect();
250
251 let action = ExchangeAction {
252 action_type: ExchangeActionType::Cancel,
253 params: ExchangeActionParams::Cancel(CancelParams { cancels }),
254 };
255 assert_eq!(exchange_weight(&action), 2);
256 }
257
258 #[rstest]
259 fn test_exchange_weight_non_batch_action() {
260 let update_leverage = ExchangeAction {
261 action_type: ExchangeActionType::UpdateLeverage,
262 params: ExchangeActionParams::UpdateLeverage(UpdateLeverageParams {
263 asset: 1,
264 is_cross: true,
265 leverage: 10,
266 }),
267 };
268 assert_eq!(exchange_weight(&update_leverage), 1);
269 }
270
271 #[tokio::test]
272 async fn test_limiter_roughly_caps_to_capacity() {
273 let limiter = WeightedLimiter::per_minute(1200);
274
275 for _ in 0..60 {
277 limiter.acquire(20).await; }
279
280 let t0 = std::time::Instant::now();
282 limiter.acquire(20).await;
283 let elapsed = t0.elapsed();
284
285 assert!(
287 elapsed.as_millis() >= 500,
288 "Expected significant delay, was {}ms",
289 elapsed.as_millis()
290 );
291 }
292
293 #[tokio::test]
294 async fn test_limiter_debit_extra_works() {
295 let limiter = WeightedLimiter::per_minute(100);
296
297 let snapshot = limiter.snapshot().await;
299 assert_eq!(snapshot.capacity, 100);
300 assert_eq!(snapshot.tokens, 100);
301
302 limiter.acquire(30).await;
304 let snapshot = limiter.snapshot().await;
305 assert_eq!(snapshot.tokens, 70);
306
307 limiter.debit_extra(20).await;
309 let snapshot = limiter.snapshot().await;
310 assert_eq!(snapshot.tokens, 50);
311
312 limiter.debit_extra(100).await;
314 let snapshot = limiter.snapshot().await;
315 assert_eq!(snapshot.tokens, 0);
316 }
317
318 #[rstest]
319 #[case(0, 100)]
320 #[case(1, 200)]
321 #[case(2, 400)]
322 fn test_backoff_full_jitter_increases(#[case] attempt: u32, #[case] max_expected_ms: u64) {
323 let base = Duration::from_millis(100);
324 let cap = Duration::from_secs(5);
325
326 let delay = backoff_full_jitter(attempt, base, cap);
327
328 assert!(delay.as_millis() <= max_expected_ms as u128);
330 }
331
332 #[rstest]
333 fn test_backoff_full_jitter_respects_cap() {
334 let base = Duration::from_millis(100);
335 let cap = Duration::from_secs(5);
336
337 let delay_high = backoff_full_jitter(10, base, cap);
338 assert!(delay_high.as_millis() <= cap.as_millis());
339 }
340}