Skip to main content

nautilus_hyperliquid/http/
rate_limits.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::time::{Duration, Instant};
17
18use serde_json::Value;
19
20#[derive(Debug)]
21pub struct WeightedLimiter {
22    capacity: f64,       // tokens per minute (e.g., 1200)
23    refill_per_sec: f64, // capacity / 60
24    state: tokio::sync::Mutex<State>,
25}
26
27#[derive(Debug)]
28struct State {
29    tokens: f64,
30    last_refill: Instant,
31}
32
33impl WeightedLimiter {
34    pub fn per_minute(capacity: u32) -> Self {
35        let cap = capacity as f64;
36        Self {
37            capacity: cap,
38            refill_per_sec: cap / 60.0,
39            state: tokio::sync::Mutex::new(State {
40                tokens: cap,
41                last_refill: Instant::now(),
42            }),
43        }
44    }
45
46    /// Acquire `weight` tokens, sleeping until available.
47    pub async fn acquire(&self, weight: u32) {
48        let need = weight as f64;
49        loop {
50            let mut st = self.state.lock().await;
51            Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
52
53            if st.tokens >= need {
54                st.tokens -= need;
55                return;
56            }
57            let deficit = need - st.tokens;
58            let secs = deficit / self.refill_per_sec;
59            drop(st);
60            tokio::time::sleep(Duration::from_secs_f64(secs.max(0.01))).await;
61        }
62    }
63
64    /// Post-response debit for per-items adders (can temporarily clamp to 0).
65    pub async fn debit_extra(&self, extra: u32) {
66        if extra == 0 {
67            return;
68        }
69        let mut st = self.state.lock().await;
70        Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
71        st.tokens = (st.tokens - extra as f64).max(0.0);
72    }
73
74    pub async fn snapshot(&self) -> RateLimitSnapshot {
75        let mut st = self.state.lock().await;
76        Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
77        RateLimitSnapshot {
78            capacity: self.capacity as u32,
79            tokens: st.tokens.max(0.0) as u32,
80        }
81    }
82
83    fn refill_locked(st: &mut State, per_sec: f64, cap: f64) {
84        let dt = Instant::now().duration_since(st.last_refill).as_secs_f64();
85        if dt > 0.0 {
86            st.tokens = (st.tokens + dt * per_sec).min(cap);
87            st.last_refill = Instant::now();
88        }
89    }
90}
91
92#[derive(Debug, Clone, Copy)]
93pub struct RateLimitSnapshot {
94    pub capacity: u32,
95    pub tokens: u32,
96}
97
98pub fn backoff_full_jitter(attempt: u32, base: Duration, cap: Duration) -> Duration {
99    use std::{
100        collections::hash_map::DefaultHasher,
101        hash::{Hash, Hasher},
102    };
103
104    // Simple pseudo-random based on attempt and time
105    let mut hasher = DefaultHasher::new();
106    attempt.hash(&mut hasher);
107    Instant::now().elapsed().as_nanos().hash(&mut hasher);
108    let hash = hasher.finish();
109
110    let max = (base.as_millis() as u64)
111        .saturating_mul(1u64 << attempt.min(16))
112        .min(cap.as_millis() as u64)
113        .max(base.as_millis() as u64);
114    Duration::from_millis(hash % max)
115}
116
117/// Classify Info requests into weight classes based on request_type.
118/// Since InfoRequest uses struct with request_type string, we match on that.
119pub fn info_base_weight(req: &crate::http::query::InfoRequest) -> u32 {
120    match req.request_type.as_str() {
121        // Cheap (2)
122        "l2Book"
123        | "allMids"
124        | "clearinghouseState"
125        | "orderStatus"
126        | "spotClearinghouseState"
127        | "exchangeStatus" => 2,
128        // Very expensive (60)
129        "userRole" => 60,
130        // Default (20)
131        _ => 20,
132    }
133}
134
135/// Extra weight for heavy Info endpoints: +1 per 20 (most), +1 per 60 for candleSnapshot.
136/// We count the largest array in the response (robust to schema variants).
137pub fn info_extra_weight(req: &crate::http::query::InfoRequest, json: &Value) -> u32 {
138    let items = match json {
139        Value::Array(a) => a.len(),
140        Value::Object(m) => m
141            .values()
142            .filter_map(|v| v.as_array().map(|a| a.len()))
143            .max()
144            .unwrap_or(0),
145        _ => 0,
146    };
147
148    let unit = match req.request_type.as_str() {
149        "candleSnapshot" => 60usize, // +1 per 60
150        "recentTrades"
151        | "historicalOrders"
152        | "userFills"
153        | "userFillsByTime"
154        | "fundingHistory"
155        | "userFunding"
156        | "nonUserFundingUpdates"
157        | "twapHistory"
158        | "userTwapSliceFills"
159        | "userTwapSliceFillsByTime"
160        | "delegatorHistory"
161        | "delegatorRewards"
162        | "validatorStats" => 20usize, // +1 per 20
163        _ => return 0,
164    };
165    (items / unit) as u32
166}
167
168/// Exchange: 1 + floor(batch_len / 40)
169pub fn exchange_weight(action: &crate::http::query::ExchangeAction) -> u32 {
170    use crate::http::query::ExchangeActionParams;
171
172    // Extract batch size from typed params
173    let batch_size = match &action.params {
174        ExchangeActionParams::Order(params) => params.orders.len(),
175        ExchangeActionParams::Cancel(params) => params.cancels.len(),
176        ExchangeActionParams::Modify(_) => {
177            // Modify is for a single order
178            1
179        }
180        ExchangeActionParams::UpdateLeverage(_) | ExchangeActionParams::UpdateIsolatedMargin(_) => {
181            0
182        }
183    };
184    1 + (batch_size as u32 / 40)
185}
186
187#[cfg(test)]
188mod tests {
189    use rstest::rstest;
190
191    use super::*;
192    use crate::http::query::{
193        CancelParams, ExchangeAction, ExchangeActionParams, ExchangeActionType, OrderParams,
194        UpdateLeverageParams,
195    };
196
197    #[rstest]
198    #[case(1, 1)]
199    #[case(39, 1)]
200    #[case(40, 2)]
201    #[case(79, 2)]
202    #[case(80, 3)]
203    fn test_exchange_weight_order_steps_every_40(
204        #[case] array_len: usize,
205        #[case] expected_weight: u32,
206    ) {
207        use rust_decimal::Decimal;
208
209        use super::super::models::{
210            Cloid, HyperliquidExecGrouping, HyperliquidExecLimitParams, HyperliquidExecOrderKind,
211            HyperliquidExecPlaceOrderRequest, HyperliquidExecTif,
212        };
213
214        let orders: Vec<HyperliquidExecPlaceOrderRequest> = (0..array_len)
215            .map(|_| HyperliquidExecPlaceOrderRequest {
216                asset: 0,
217                is_buy: true,
218                price: Decimal::new(50000, 0),
219                size: Decimal::new(1, 0),
220                reduce_only: false,
221                kind: HyperliquidExecOrderKind::Limit {
222                    limit: HyperliquidExecLimitParams {
223                        tif: HyperliquidExecTif::Gtc,
224                    },
225                },
226                cloid: Some(Cloid::from_hex("0x00000000000000000000000000000000").unwrap()),
227            })
228            .collect();
229
230        let action = ExchangeAction {
231            action_type: ExchangeActionType::Order,
232            params: ExchangeActionParams::Order(OrderParams {
233                orders,
234                grouping: HyperliquidExecGrouping::Na,
235                builder: None,
236            }),
237        };
238        assert_eq!(exchange_weight(&action), expected_weight);
239    }
240
241    #[rstest]
242    fn test_exchange_weight_cancel() {
243        use super::super::models::{Cloid, HyperliquidExecCancelByCloidRequest};
244
245        let cancels: Vec<HyperliquidExecCancelByCloidRequest> = (0..40)
246            .map(|_| HyperliquidExecCancelByCloidRequest {
247                asset: 0,
248                cloid: Cloid::from_hex("0x00000000000000000000000000000000").unwrap(),
249            })
250            .collect();
251
252        let action = ExchangeAction {
253            action_type: ExchangeActionType::Cancel,
254            params: ExchangeActionParams::Cancel(CancelParams { cancels }),
255        };
256        assert_eq!(exchange_weight(&action), 2);
257    }
258
259    #[rstest]
260    fn test_exchange_weight_non_batch_action() {
261        let update_leverage = ExchangeAction {
262            action_type: ExchangeActionType::UpdateLeverage,
263            params: ExchangeActionParams::UpdateLeverage(UpdateLeverageParams {
264                asset: 1,
265                is_cross: true,
266                leverage: 10,
267            }),
268        };
269        assert_eq!(exchange_weight(&update_leverage), 1);
270    }
271
272    #[tokio::test]
273    async fn test_limiter_roughly_caps_to_capacity() {
274        let limiter = WeightedLimiter::per_minute(1200);
275
276        // Consume ~1200 in quick succession
277        for _ in 0..60 {
278            limiter.acquire(20).await; // 60 * 20 = 1200
279        }
280
281        // The next acquire should take time for tokens to refill
282        let t0 = std::time::Instant::now();
283        limiter.acquire(20).await;
284        let elapsed = t0.elapsed();
285
286        // Should take at least some time to refill (allow some jitter/timing variance)
287        assert!(
288            elapsed.as_millis() >= 500,
289            "Expected significant delay, was {}ms",
290            elapsed.as_millis()
291        );
292    }
293
294    #[tokio::test]
295    async fn test_limiter_debit_extra_works() {
296        let limiter = WeightedLimiter::per_minute(100);
297
298        // Start with full bucket
299        let snapshot = limiter.snapshot().await;
300        assert_eq!(snapshot.capacity, 100);
301        assert_eq!(snapshot.tokens, 100);
302
303        // Acquire some tokens
304        limiter.acquire(30).await;
305        let snapshot = limiter.snapshot().await;
306        assert_eq!(snapshot.tokens, 70);
307
308        // Debit extra
309        limiter.debit_extra(20).await;
310        let snapshot = limiter.snapshot().await;
311        assert_eq!(snapshot.tokens, 50);
312
313        // Debit more than available (should clamp to 0)
314        limiter.debit_extra(100).await;
315        let snapshot = limiter.snapshot().await;
316        assert_eq!(snapshot.tokens, 0);
317    }
318
319    #[rstest]
320    #[case(0, 100)]
321    #[case(1, 200)]
322    #[case(2, 400)]
323    fn test_backoff_full_jitter_increases(#[case] attempt: u32, #[case] max_expected_ms: u64) {
324        let base = Duration::from_millis(100);
325        let cap = Duration::from_secs(5);
326
327        let delay = backoff_full_jitter(attempt, base, cap);
328
329        // Should be in expected ranges (allowing for jitter)
330        assert!(delay.as_millis() <= max_expected_ms as u128);
331    }
332
333    #[rstest]
334    fn test_backoff_full_jitter_respects_cap() {
335        let base = Duration::from_millis(100);
336        let cap = Duration::from_secs(5);
337
338        let delay_high = backoff_full_jitter(10, base, cap);
339        assert!(delay_high.as_millis() <= cap.as_millis());
340    }
341}