nautilus_hyperliquid/http/
rate_limits.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::time::{Duration, Instant};
17
18use serde_json::Value;
19use tokio::sync::Mutex;
20
21#[derive(Debug)]
22pub struct WeightedLimiter {
23    capacity: f64,       // tokens per minute (e.g., 1200)
24    refill_per_sec: f64, // capacity / 60
25    state: Mutex<State>,
26}
27
28#[derive(Debug)]
29struct State {
30    tokens: f64,
31    last_refill: Instant,
32}
33
34impl WeightedLimiter {
35    pub fn per_minute(capacity: u32) -> Self {
36        let cap = capacity as f64;
37        Self {
38            capacity: cap,
39            refill_per_sec: cap / 60.0,
40            state: Mutex::new(State {
41                tokens: cap,
42                last_refill: Instant::now(),
43            }),
44        }
45    }
46
47    /// Acquire `weight` tokens, sleeping until available.
48    pub async fn acquire(&self, weight: u32) {
49        let need = weight as f64;
50        loop {
51            let mut st = self.state.lock().await;
52            Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
53
54            if st.tokens >= need {
55                st.tokens -= need;
56                return;
57            }
58            let deficit = need - st.tokens;
59            let secs = deficit / self.refill_per_sec;
60            drop(st);
61            tokio::time::sleep(Duration::from_secs_f64(secs.max(0.01))).await;
62        }
63    }
64
65    /// Post-response debit for per-items adders (can temporarily clamp to 0).
66    pub async fn debit_extra(&self, extra: u32) {
67        if extra == 0 {
68            return;
69        }
70        let mut st = self.state.lock().await;
71        Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
72        st.tokens = (st.tokens - extra as f64).max(0.0);
73    }
74
75    pub async fn snapshot(&self) -> RateLimitSnapshot {
76        let mut st = self.state.lock().await;
77        Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
78        RateLimitSnapshot {
79            capacity: self.capacity as u32,
80            tokens: st.tokens.max(0.0) as u32,
81        }
82    }
83
84    fn refill_locked(st: &mut State, per_sec: f64, cap: f64) {
85        let dt = Instant::now().duration_since(st.last_refill).as_secs_f64();
86        if dt > 0.0 {
87            st.tokens = (st.tokens + dt * per_sec).min(cap);
88            st.last_refill = Instant::now();
89        }
90    }
91}
92
93#[derive(Debug, Clone, Copy)]
94pub struct RateLimitSnapshot {
95    pub capacity: u32,
96    pub tokens: u32,
97}
98
99pub fn backoff_full_jitter(attempt: u32, base: Duration, cap: Duration) -> Duration {
100    use std::{
101        collections::hash_map::DefaultHasher,
102        hash::{Hash, Hasher},
103    };
104
105    // Simple pseudo-random based on attempt and time
106    let mut hasher = DefaultHasher::new();
107    attempt.hash(&mut hasher);
108    Instant::now().elapsed().as_nanos().hash(&mut hasher);
109    let hash = hasher.finish();
110
111    let max = (base.as_millis() as u64)
112        .saturating_mul(1u64 << attempt.min(16))
113        .min(cap.as_millis() as u64)
114        .max(base.as_millis() as u64);
115    Duration::from_millis(hash % max)
116}
117
118/// Classify Info requests into weight classes based on request_type.
119/// Since InfoRequest uses struct with request_type string, we match on that.
120pub fn info_base_weight(req: &crate::http::query::InfoRequest) -> u32 {
121    match req.request_type.as_str() {
122        // Cheap (2)
123        "l2Book"
124        | "allMids"
125        | "clearinghouseState"
126        | "orderStatus"
127        | "spotClearinghouseState"
128        | "exchangeStatus" => 2,
129        // Very expensive (60)
130        "userRole" => 60,
131        // Default (20)
132        _ => 20,
133    }
134}
135
136/// Extra weight for heavy Info endpoints: +1 per 20 (most), +1 per 60 for candleSnapshot.
137/// We count the largest array in the response (robust to schema variants).
138pub fn info_extra_weight(req: &crate::http::query::InfoRequest, json: &Value) -> u32 {
139    let items = match json {
140        Value::Array(a) => a.len(),
141        Value::Object(m) => m
142            .values()
143            .filter_map(|v| v.as_array().map(|a| a.len()))
144            .max()
145            .unwrap_or(0),
146        _ => 0,
147    };
148
149    let unit = match req.request_type.as_str() {
150        "candleSnapshot" => 60usize, // +1 per 60
151        "recentTrades"
152        | "historicalOrders"
153        | "userFills"
154        | "userFillsByTime"
155        | "fundingHistory"
156        | "userFunding"
157        | "nonUserFundingUpdates"
158        | "twapHistory"
159        | "userTwapSliceFills"
160        | "userTwapSliceFillsByTime"
161        | "delegatorHistory"
162        | "delegatorRewards"
163        | "validatorStats" => 20usize, // +1 per 20
164        _ => return 0,
165    };
166    (items / unit) as u32
167}
168
169/// Exchange: 1 + floor(batch_len / 40)
170pub fn exchange_weight(action: &crate::http::query::ExchangeAction) -> u32 {
171    use crate::http::query::ExchangeActionParams;
172
173    // Extract batch size from typed params
174    let batch_size = match &action.params {
175        ExchangeActionParams::Order(params) => params.orders.len(),
176        ExchangeActionParams::Cancel(params) => params.cancels.len(),
177        ExchangeActionParams::Modify(_) => {
178            // Modify is for a single order
179            1
180        }
181        ExchangeActionParams::UpdateLeverage(_) | ExchangeActionParams::UpdateIsolatedMargin(_) => {
182            0
183        }
184    };
185    1 + (batch_size as u32 / 40)
186}
187
188////////////////////////////////////////////////////////////////////////////////
189// Tests
190////////////////////////////////////////////////////////////////////////////////
191
192#[cfg(test)]
193mod tests {
194    use rstest::rstest;
195    use serde_json::json;
196
197    use super::*;
198    use crate::http::query::{
199        CancelParams, ExchangeAction, ExchangeActionParams, ExchangeActionType, InfoRequest,
200        InfoRequestParams, L2BookParams, OrderParams, UpdateLeverageParams, UserFillsParams,
201    };
202
203    #[rstest]
204    #[case(1, 1)]
205    #[case(39, 1)]
206    #[case(40, 2)]
207    #[case(79, 2)]
208    #[case(80, 3)]
209    fn test_exchange_weight_order_steps_every_40(
210        #[case] array_len: usize,
211        #[case] expected_weight: u32,
212    ) {
213        use rust_decimal::Decimal;
214
215        use super::super::models::{
216            Cloid, HyperliquidExecLimitParams, HyperliquidExecOrderKind,
217            HyperliquidExecPlaceOrderRequest, HyperliquidExecTif,
218        };
219
220        let orders: Vec<HyperliquidExecPlaceOrderRequest> = (0..array_len)
221            .map(|_| HyperliquidExecPlaceOrderRequest {
222                asset: 0,
223                is_buy: true,
224                price: Decimal::new(50000, 0),
225                size: Decimal::new(1, 0),
226                reduce_only: false,
227                kind: HyperliquidExecOrderKind::Limit {
228                    limit: HyperliquidExecLimitParams {
229                        tif: HyperliquidExecTif::Gtc,
230                    },
231                },
232                cloid: Some(Cloid::from_hex("0x00000000000000000000000000000000").unwrap()),
233            })
234            .collect();
235
236        let action = ExchangeAction {
237            action_type: ExchangeActionType::Order,
238            params: ExchangeActionParams::Order(OrderParams {
239                orders,
240                grouping: "na".to_string(),
241            }),
242        };
243        assert_eq!(exchange_weight(&action), expected_weight);
244    }
245
246    #[rstest]
247    fn test_exchange_weight_cancel() {
248        use super::super::models::{Cloid, HyperliquidExecCancelByCloidRequest};
249
250        let cancels: Vec<HyperliquidExecCancelByCloidRequest> = (0..40)
251            .map(|_| HyperliquidExecCancelByCloidRequest {
252                asset: 0,
253                cloid: Cloid::from_hex("0x00000000000000000000000000000000").unwrap(),
254            })
255            .collect();
256
257        let action = ExchangeAction {
258            action_type: ExchangeActionType::Cancel,
259            params: ExchangeActionParams::Cancel(CancelParams { cancels }),
260        };
261        assert_eq!(exchange_weight(&action), 2);
262    }
263
264    #[rstest]
265    fn test_exchange_weight_non_batch_action() {
266        let update_leverage = ExchangeAction {
267            action_type: ExchangeActionType::UpdateLeverage,
268            params: ExchangeActionParams::UpdateLeverage(UpdateLeverageParams {
269                asset: 1,
270                is_cross: true,
271                leverage: 10,
272            }),
273        };
274        assert_eq!(exchange_weight(&update_leverage), 1);
275    }
276
277    #[rstest]
278    #[case("l2Book", 2)]
279    #[case("allMids", 2)]
280    #[case("clearinghouseState", 2)]
281    #[case("orderStatus", 2)]
282    #[case("spotClearinghouseState", 2)]
283    #[case("exchangeStatus", 2)]
284    #[case("userRole", 60)]
285    #[case("userFills", 20)]
286    #[case("unknownEndpoint", 20)]
287    fn test_info_base_weights(#[case] request_type: &str, #[case] expected_weight: u32) {
288        let request = InfoRequest {
289            request_type: request_type.to_string(),
290            params: InfoRequestParams::L2Book(L2BookParams {
291                coin: "BTC".to_string(),
292            }),
293        };
294        assert_eq!(info_base_weight(&request), expected_weight);
295    }
296
297    #[rstest]
298    fn test_info_extra_weight_no_charging() {
299        let l2_book = InfoRequest {
300            request_type: "l2Book".to_string(),
301            params: InfoRequestParams::L2Book(L2BookParams {
302                coin: "BTC".to_string(),
303            }),
304        };
305        let large_json = json!(vec![1; 1000]);
306        assert_eq!(info_extra_weight(&l2_book, &large_json), 0);
307    }
308
309    #[rstest]
310    fn test_info_extra_weight_complex_json() {
311        let user_fills = InfoRequest {
312            request_type: "userFills".to_string(),
313            params: InfoRequestParams::UserFills(UserFillsParams {
314                user: "0x123".to_string(),
315            }),
316        };
317        let complex_json = json!({
318            "fills": vec![1; 40],
319            "orders": vec![1; 20],
320            "other": "data"
321        });
322        assert_eq!(info_extra_weight(&user_fills, &complex_json), 2); // largest array is 40, 40/20 = 2
323    }
324
325    #[tokio::test]
326    async fn test_limiter_roughly_caps_to_capacity() {
327        let limiter = WeightedLimiter::per_minute(1200);
328
329        // Consume ~1200 in quick succession
330        for _ in 0..60 {
331            limiter.acquire(20).await; // 60 * 20 = 1200
332        }
333
334        // The next acquire should take time for tokens to refill
335        let t0 = std::time::Instant::now();
336        limiter.acquire(20).await;
337        let elapsed = t0.elapsed();
338
339        // Should take at least some time to refill (allow some jitter/timing variance)
340        assert!(
341            elapsed.as_millis() >= 500,
342            "Expected significant delay, got {}ms",
343            elapsed.as_millis()
344        );
345    }
346
347    #[tokio::test]
348    async fn test_limiter_debit_extra_works() {
349        let limiter = WeightedLimiter::per_minute(100);
350
351        // Start with full bucket
352        let snapshot = limiter.snapshot().await;
353        assert_eq!(snapshot.capacity, 100);
354        assert_eq!(snapshot.tokens, 100);
355
356        // Acquire some tokens
357        limiter.acquire(30).await;
358        let snapshot = limiter.snapshot().await;
359        assert_eq!(snapshot.tokens, 70);
360
361        // Debit extra
362        limiter.debit_extra(20).await;
363        let snapshot = limiter.snapshot().await;
364        assert_eq!(snapshot.tokens, 50);
365
366        // Debit more than available (should clamp to 0)
367        limiter.debit_extra(100).await;
368        let snapshot = limiter.snapshot().await;
369        assert_eq!(snapshot.tokens, 0);
370    }
371
372    #[rstest]
373    #[case(0, 100)]
374    #[case(1, 200)]
375    #[case(2, 400)]
376    fn test_backoff_full_jitter_increases(#[case] attempt: u32, #[case] max_expected_ms: u64) {
377        let base = Duration::from_millis(100);
378        let cap = Duration::from_secs(5);
379
380        let delay = backoff_full_jitter(attempt, base, cap);
381
382        // Should be in expected ranges (allowing for jitter)
383        assert!(delay.as_millis() <= max_expected_ms as u128);
384    }
385
386    #[rstest]
387    fn test_backoff_full_jitter_respects_cap() {
388        let base = Duration::from_millis(100);
389        let cap = Duration::from_secs(5);
390
391        let delay_high = backoff_full_jitter(10, base, cap);
392        assert!(delay_high.as_millis() <= cap.as_millis());
393    }
394}