nautilus_hyperliquid/http/
rate_limits.rs1use std::time::{Duration, Instant};
17
18use serde_json::Value;
19
20#[derive(Debug)]
21pub struct WeightedLimiter {
22 capacity: f64, refill_per_sec: f64, state: tokio::sync::Mutex<State>,
25}
26
27#[derive(Debug)]
28struct State {
29 tokens: f64,
30 last_refill: Instant,
31}
32
33impl WeightedLimiter {
34 pub fn per_minute(capacity: u32) -> Self {
35 let cap = capacity as f64;
36 Self {
37 capacity: cap,
38 refill_per_sec: cap / 60.0,
39 state: tokio::sync::Mutex::new(State {
40 tokens: cap,
41 last_refill: Instant::now(),
42 }),
43 }
44 }
45
46 pub async fn acquire(&self, weight: u32) {
48 let need = weight as f64;
49 loop {
50 let mut st = self.state.lock().await;
51 Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
52
53 if st.tokens >= need {
54 st.tokens -= need;
55 return;
56 }
57 let deficit = need - st.tokens;
58 let secs = deficit / self.refill_per_sec;
59 drop(st);
60 tokio::time::sleep(Duration::from_secs_f64(secs.max(0.01))).await;
61 }
62 }
63
64 pub async fn debit_extra(&self, extra: u32) {
66 if extra == 0 {
67 return;
68 }
69 let mut st = self.state.lock().await;
70 Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
71 st.tokens = (st.tokens - extra as f64).max(0.0);
72 }
73
74 pub async fn snapshot(&self) -> RateLimitSnapshot {
75 let mut st = self.state.lock().await;
76 Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
77 RateLimitSnapshot {
78 capacity: self.capacity as u32,
79 tokens: st.tokens.max(0.0) as u32,
80 }
81 }
82
83 fn refill_locked(st: &mut State, per_sec: f64, cap: f64) {
84 let dt = Instant::now().duration_since(st.last_refill).as_secs_f64();
85 if dt > 0.0 {
86 st.tokens = (st.tokens + dt * per_sec).min(cap);
87 st.last_refill = Instant::now();
88 }
89 }
90}
91
92#[derive(Debug, Clone, Copy)]
93pub struct RateLimitSnapshot {
94 pub capacity: u32,
95 pub tokens: u32,
96}
97
98pub fn backoff_full_jitter(attempt: u32, base: Duration, cap: Duration) -> Duration {
99 use std::{
100 collections::hash_map::DefaultHasher,
101 hash::{Hash, Hasher},
102 };
103
104 let mut hasher = DefaultHasher::new();
106 attempt.hash(&mut hasher);
107 Instant::now().elapsed().as_nanos().hash(&mut hasher);
108 let hash = hasher.finish();
109
110 let max = (base.as_millis() as u64)
111 .saturating_mul(1u64 << attempt.min(16))
112 .min(cap.as_millis() as u64)
113 .max(base.as_millis() as u64);
114 Duration::from_millis(hash % max)
115}
116
117pub fn info_base_weight(req: &crate::http::query::InfoRequest) -> u32 {
120 match req.request_type.as_str() {
121 "l2Book"
123 | "allMids"
124 | "clearinghouseState"
125 | "orderStatus"
126 | "spotClearinghouseState"
127 | "exchangeStatus" => 2,
128 "userRole" => 60,
130 _ => 20,
132 }
133}
134
135pub fn info_extra_weight(req: &crate::http::query::InfoRequest, json: &Value) -> u32 {
138 let items = match json {
139 Value::Array(a) => a.len(),
140 Value::Object(m) => m
141 .values()
142 .filter_map(|v| v.as_array().map(|a| a.len()))
143 .max()
144 .unwrap_or(0),
145 _ => 0,
146 };
147
148 let unit = match req.request_type.as_str() {
149 "candleSnapshot" => 60usize, "recentTrades"
151 | "historicalOrders"
152 | "userFills"
153 | "userFillsByTime"
154 | "fundingHistory"
155 | "userFunding"
156 | "nonUserFundingUpdates"
157 | "twapHistory"
158 | "userTwapSliceFills"
159 | "userTwapSliceFillsByTime"
160 | "delegatorHistory"
161 | "delegatorRewards"
162 | "validatorStats" => 20usize, _ => return 0,
164 };
165 (items / unit) as u32
166}
167
168pub fn exchange_weight(action: &crate::http::query::ExchangeAction) -> u32 {
170 use crate::http::query::ExchangeActionParams;
171
172 let batch_size = match &action.params {
174 ExchangeActionParams::Order(params) => params.orders.len(),
175 ExchangeActionParams::Cancel(params) => params.cancels.len(),
176 ExchangeActionParams::Modify(_) => {
177 1
179 }
180 ExchangeActionParams::UpdateLeverage(_) | ExchangeActionParams::UpdateIsolatedMargin(_) => {
181 0
182 }
183 };
184 1 + (batch_size as u32 / 40)
185}
186
187#[cfg(test)]
188mod tests {
189 use rstest::rstest;
190
191 use super::*;
192 use crate::http::query::{
193 CancelParams, ExchangeAction, ExchangeActionParams, ExchangeActionType, OrderParams,
194 UpdateLeverageParams,
195 };
196
197 #[rstest]
198 #[case(1, 1)]
199 #[case(39, 1)]
200 #[case(40, 2)]
201 #[case(79, 2)]
202 #[case(80, 3)]
203 fn test_exchange_weight_order_steps_every_40(
204 #[case] array_len: usize,
205 #[case] expected_weight: u32,
206 ) {
207 use rust_decimal::Decimal;
208
209 use super::super::models::{
210 Cloid, HyperliquidExecGrouping, HyperliquidExecLimitParams, HyperliquidExecOrderKind,
211 HyperliquidExecPlaceOrderRequest, HyperliquidExecTif,
212 };
213
214 let orders: Vec<HyperliquidExecPlaceOrderRequest> = (0..array_len)
215 .map(|_| HyperliquidExecPlaceOrderRequest {
216 asset: 0,
217 is_buy: true,
218 price: Decimal::new(50000, 0),
219 size: Decimal::new(1, 0),
220 reduce_only: false,
221 kind: HyperliquidExecOrderKind::Limit {
222 limit: HyperliquidExecLimitParams {
223 tif: HyperliquidExecTif::Gtc,
224 },
225 },
226 cloid: Some(Cloid::from_hex("0x00000000000000000000000000000000").unwrap()),
227 })
228 .collect();
229
230 let action = ExchangeAction {
231 action_type: ExchangeActionType::Order,
232 params: ExchangeActionParams::Order(OrderParams {
233 orders,
234 grouping: HyperliquidExecGrouping::Na,
235 builder: None,
236 }),
237 };
238 assert_eq!(exchange_weight(&action), expected_weight);
239 }
240
241 #[rstest]
242 fn test_exchange_weight_cancel() {
243 use super::super::models::{Cloid, HyperliquidExecCancelByCloidRequest};
244
245 let cancels: Vec<HyperliquidExecCancelByCloidRequest> = (0..40)
246 .map(|_| HyperliquidExecCancelByCloidRequest {
247 asset: 0,
248 cloid: Cloid::from_hex("0x00000000000000000000000000000000").unwrap(),
249 })
250 .collect();
251
252 let action = ExchangeAction {
253 action_type: ExchangeActionType::Cancel,
254 params: ExchangeActionParams::Cancel(CancelParams { cancels }),
255 };
256 assert_eq!(exchange_weight(&action), 2);
257 }
258
259 #[rstest]
260 fn test_exchange_weight_non_batch_action() {
261 let update_leverage = ExchangeAction {
262 action_type: ExchangeActionType::UpdateLeverage,
263 params: ExchangeActionParams::UpdateLeverage(UpdateLeverageParams {
264 asset: 1,
265 is_cross: true,
266 leverage: 10,
267 }),
268 };
269 assert_eq!(exchange_weight(&update_leverage), 1);
270 }
271
272 #[tokio::test]
273 async fn test_limiter_roughly_caps_to_capacity() {
274 let limiter = WeightedLimiter::per_minute(1200);
275
276 for _ in 0..60 {
278 limiter.acquire(20).await; }
280
281 let t0 = std::time::Instant::now();
283 limiter.acquire(20).await;
284 let elapsed = t0.elapsed();
285
286 assert!(
288 elapsed.as_millis() >= 500,
289 "Expected significant delay, was {}ms",
290 elapsed.as_millis()
291 );
292 }
293
294 #[tokio::test]
295 async fn test_limiter_debit_extra_works() {
296 let limiter = WeightedLimiter::per_minute(100);
297
298 let snapshot = limiter.snapshot().await;
300 assert_eq!(snapshot.capacity, 100);
301 assert_eq!(snapshot.tokens, 100);
302
303 limiter.acquire(30).await;
305 let snapshot = limiter.snapshot().await;
306 assert_eq!(snapshot.tokens, 70);
307
308 limiter.debit_extra(20).await;
310 let snapshot = limiter.snapshot().await;
311 assert_eq!(snapshot.tokens, 50);
312
313 limiter.debit_extra(100).await;
315 let snapshot = limiter.snapshot().await;
316 assert_eq!(snapshot.tokens, 0);
317 }
318
319 #[rstest]
320 #[case(0, 100)]
321 #[case(1, 200)]
322 #[case(2, 400)]
323 fn test_backoff_full_jitter_increases(#[case] attempt: u32, #[case] max_expected_ms: u64) {
324 let base = Duration::from_millis(100);
325 let cap = Duration::from_secs(5);
326
327 let delay = backoff_full_jitter(attempt, base, cap);
328
329 assert!(delay.as_millis() <= max_expected_ms as u128);
331 }
332
333 #[rstest]
334 fn test_backoff_full_jitter_respects_cap() {
335 let base = Duration::from_millis(100);
336 let cap = Duration::from_secs(5);
337
338 let delay_high = backoff_full_jitter(10, base, cap);
339 assert!(delay_high.as_millis() <= cap.as_millis());
340 }
341}