nautilus_hyperliquid/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Hyperliquid.
17
18use std::{
19    collections::HashSet,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use nautilus_core::{AtomicTime, nanos::UnixNanos, time::get_atomic_clock_realtime};
28use nautilus_model::{
29    data::BarType,
30    identifiers::AccountId,
31    instruments::{Instrument, InstrumentAny},
32};
33use nautilus_network::{
34    RECONNECTED,
35    retry::{RetryManager, create_websocket_retry_manager},
36    websocket::{SubscriptionState, WebSocketClient},
37};
38use tokio_tungstenite::tungstenite::Message;
39use ustr::Ustr;
40
41use super::{
42    client::AssetContextDataType,
43    error::HyperliquidWsError,
44    messages::{
45        CandleData, ExecutionReport, HyperliquidWsMessage, HyperliquidWsRequest, NautilusWsMessage,
46        SubscriptionRequest, WsActiveAssetCtxData, WsUserEventData,
47    },
48    parse::{
49        parse_ws_asset_context, parse_ws_candle, parse_ws_fill_report, parse_ws_order_book_deltas,
50        parse_ws_order_status_report, parse_ws_quote_tick, parse_ws_trade_tick,
51    },
52};
53
54/// Commands sent from the outer client to the inner message handler.
55#[derive(Debug)]
56#[allow(
57    clippy::large_enum_variant,
58    reason = "Commands are ephemeral and immediately consumed"
59)]
60#[allow(private_interfaces)]
61pub enum HandlerCommand {
62    /// Set the WebSocketClient for the handler to use.
63    SetClient(WebSocketClient),
64    /// Disconnect the WebSocket connection.
65    Disconnect,
66    /// Subscribe to the given subscriptions.
67    Subscribe {
68        subscriptions: Vec<SubscriptionRequest>,
69    },
70    /// Unsubscribe from the given subscriptions.
71    Unsubscribe {
72        subscriptions: Vec<SubscriptionRequest>,
73    },
74    /// Initialize the instruments cache with the given instruments.
75    InitializeInstruments(Vec<InstrumentAny>),
76    /// Update a single instrument in the cache.
77    UpdateInstrument(InstrumentAny),
78    /// Add a bar type mapping for candle parsing.
79    AddBarType { key: String, bar_type: BarType },
80    /// Remove a bar type mapping.
81    RemoveBarType { key: String },
82    /// Update asset context subscriptions for a coin.
83    UpdateAssetContextSubs {
84        coin: Ustr,
85        data_types: HashSet<AssetContextDataType>,
86    },
87}
88
89pub(super) struct FeedHandler {
90    clock: &'static AtomicTime,
91    signal: Arc<AtomicBool>,
92    client: Option<WebSocketClient>,
93    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
94    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
95    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
96    account_id: Option<AccountId>,
97    subscriptions: SubscriptionState,
98    retry_manager: RetryManager<HyperliquidWsError>,
99    message_buffer: Vec<NautilusWsMessage>,
100    instruments_cache: AHashMap<Ustr, InstrumentAny>,
101    bar_types_cache: AHashMap<String, BarType>,
102    bar_cache: AHashMap<String, CandleData>,
103    asset_context_subs: AHashMap<Ustr, HashSet<AssetContextDataType>>,
104    mark_price_cache: AHashMap<Ustr, String>,
105    index_price_cache: AHashMap<Ustr, String>,
106    funding_rate_cache: AHashMap<Ustr, String>,
107}
108
109impl FeedHandler {
110    /// Creates a new [`FeedHandler`] instance.
111    pub(super) fn new(
112        signal: Arc<AtomicBool>,
113        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
114        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
115        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
116        account_id: Option<AccountId>,
117        subscriptions: SubscriptionState,
118    ) -> Self {
119        Self {
120            clock: get_atomic_clock_realtime(),
121            signal,
122            client: None,
123            cmd_rx,
124            raw_rx,
125            out_tx,
126            account_id,
127            subscriptions,
128            retry_manager: create_websocket_retry_manager(),
129            message_buffer: Vec::new(),
130            instruments_cache: AHashMap::new(),
131            bar_types_cache: AHashMap::new(),
132            bar_cache: AHashMap::new(),
133            asset_context_subs: AHashMap::new(),
134            mark_price_cache: AHashMap::new(),
135            index_price_cache: AHashMap::new(),
136            funding_rate_cache: AHashMap::new(),
137        }
138    }
139
140    /// Send a message to the output channel.
141    pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), String> {
142        self.out_tx
143            .send(msg)
144            .map_err(|e| format!("Failed to send message: {e}"))
145    }
146
147    /// Check if the handler has received a stop signal.
148    pub(super) fn is_stopped(&self) -> bool {
149        self.signal.load(Ordering::Relaxed)
150    }
151
152    /// Sends a WebSocket message with retry logic.
153    async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
154        if let Some(client) = &self.client {
155            self.retry_manager
156                .execute_with_retry(
157                    "websocket_send",
158                    || {
159                        let payload = payload.clone();
160                        async move {
161                            client.send_text(payload, None).await.map_err(|e| {
162                                HyperliquidWsError::ClientError(format!("Send failed: {e}"))
163                            })
164                        }
165                    },
166                    should_retry_hyperliquid_error,
167                    create_hyperliquid_timeout_error,
168                )
169                .await
170                .map_err(|e| anyhow::anyhow!("{e}"))
171        } else {
172            Err(anyhow::anyhow!("No WebSocket client available"))
173        }
174    }
175
176    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
177        if !self.message_buffer.is_empty() {
178            return Some(self.message_buffer.remove(0));
179        }
180
181        loop {
182            tokio::select! {
183                Some(cmd) = self.cmd_rx.recv() => {
184                    match cmd {
185                        HandlerCommand::SetClient(client) => {
186                            tracing::debug!("Setting WebSocket client in handler");
187                            self.client = Some(client);
188                        }
189                        HandlerCommand::Disconnect => {
190                            tracing::debug!("Handler received disconnect command");
191                            if let Some(ref client) = self.client {
192                                client.disconnect().await;
193                            }
194                            self.signal.store(true, Ordering::SeqCst);
195                            return None;
196                        }
197                        HandlerCommand::Subscribe { subscriptions } => {
198                            for subscription in subscriptions {
199                                let key = subscription_to_key(&subscription);
200                                self.subscriptions.mark_subscribe(&key);
201
202                                let request = HyperliquidWsRequest::Subscribe { subscription };
203                                match serde_json::to_string(&request) {
204                                    Ok(payload) => {
205                                        tracing::debug!("Sending subscribe payload: {payload}");
206                                        if let Err(e) = self.send_with_retry(payload).await {
207                                            tracing::error!("Error subscribing to {key}: {e}");
208                                            self.subscriptions.mark_failure(&key);
209                                        }
210                                    }
211                                    Err(e) => {
212                                        tracing::error!("Error serializing subscription for {key}: {e}");
213                                        self.subscriptions.mark_failure(&key);
214                                    }
215                                }
216                            }
217                        }
218                        HandlerCommand::Unsubscribe { subscriptions } => {
219                            for subscription in subscriptions {
220                                let key = subscription_to_key(&subscription);
221                                self.subscriptions.mark_unsubscribe(&key);
222
223                                let request = HyperliquidWsRequest::Unsubscribe { subscription };
224                                match serde_json::to_string(&request) {
225                                    Ok(payload) => {
226                                        tracing::debug!("Sending unsubscribe payload: {payload}");
227                                        if let Err(e) = self.send_with_retry(payload).await {
228                                            tracing::error!("Error unsubscribing from {key}: {e}");
229                                        }
230                                    }
231                                    Err(e) => {
232                                        tracing::error!("Error serializing unsubscription for {key}: {e}");
233                                    }
234                                }
235                            }
236                        }
237                        HandlerCommand::InitializeInstruments(instruments) => {
238                            for inst in instruments {
239                                let full_symbol = inst.symbol().inner();
240                                let coin = inst.raw_symbol().inner();
241
242                                self.instruments_cache.insert(full_symbol, inst.clone());
243                                self.instruments_cache.insert(coin, inst);
244                            }
245                        }
246                        HandlerCommand::UpdateInstrument(inst) => {
247                            let full_symbol = inst.symbol().inner();
248                            let coin = inst.raw_symbol().inner();
249
250                            self.instruments_cache.insert(full_symbol, inst.clone());
251                            self.instruments_cache.insert(coin, inst);
252                        }
253                        HandlerCommand::AddBarType { key, bar_type } => {
254                            self.bar_types_cache.insert(key, bar_type);
255                        }
256                        HandlerCommand::RemoveBarType { key } => {
257                            self.bar_types_cache.remove(&key);
258                            self.bar_cache.remove(&key);
259                        }
260                        HandlerCommand::UpdateAssetContextSubs { coin, data_types } => {
261                            if data_types.is_empty() {
262                                self.asset_context_subs.remove(&coin);
263                            } else {
264                                self.asset_context_subs.insert(coin, data_types);
265                            }
266                        }
267                    }
268                    continue;
269                }
270
271                Some(raw_msg) = self.raw_rx.recv() => {
272                    match raw_msg {
273                        Message::Text(text) => {
274                            if text == RECONNECTED {
275                                tracing::info!("Received RECONNECTED sentinel");
276                                return Some(NautilusWsMessage::Reconnected);
277                            }
278
279                            match serde_json::from_str::<HyperliquidWsMessage>(&text) {
280                                Ok(msg) => {
281                                    let ts_init = self.clock.get_time_ns();
282
283                                    let nautilus_msgs = Self::parse_to_nautilus_messages(
284                                        msg,
285                                        &self.instruments_cache,
286                                        &self.bar_types_cache,
287                                        self.account_id,
288                                        ts_init,
289                                        &self.asset_context_subs,
290                                        &mut self.mark_price_cache,
291                                        &mut self.index_price_cache,
292                                        &mut self.funding_rate_cache,
293                                        &mut self.bar_cache,
294                                    );
295
296                                    if !nautilus_msgs.is_empty() {
297                                        let mut iter = nautilus_msgs.into_iter();
298                                        let first = iter.next().unwrap();
299                                        self.message_buffer.extend(iter);
300                                        return Some(first);
301                                    }
302                                }
303                                Err(e) => {
304                                    tracing::error!("Error parsing WebSocket message: {e}, text: {text}");
305                                }
306                            }
307                        }
308                        Message::Ping(data) => {
309                            if let Some(ref client) = self.client
310                                && let Err(e) = client.send_pong(data.to_vec()).await {
311                                tracing::error!("Error sending pong: {e}");
312                            }
313                        }
314                        Message::Close(_) => {
315                            tracing::info!("Received WebSocket close frame");
316                            return None;
317                        }
318                        _ => {}
319                    }
320                }
321
322                else => {
323                    tracing::debug!("Handler shutting down: stream ended or command channel closed");
324                    return None;
325                }
326            }
327        }
328    }
329
330    #[allow(clippy::too_many_arguments)]
331    fn parse_to_nautilus_messages(
332        msg: HyperliquidWsMessage,
333        instruments: &AHashMap<Ustr, InstrumentAny>,
334        bar_types: &AHashMap<String, BarType>,
335        account_id: Option<AccountId>,
336        ts_init: UnixNanos,
337        asset_context_subs: &AHashMap<Ustr, HashSet<AssetContextDataType>>,
338        mark_price_cache: &mut AHashMap<Ustr, String>,
339        index_price_cache: &mut AHashMap<Ustr, String>,
340        funding_rate_cache: &mut AHashMap<Ustr, String>,
341        bar_cache: &mut AHashMap<String, CandleData>,
342    ) -> Vec<NautilusWsMessage> {
343        let mut result = Vec::new();
344
345        match msg {
346            HyperliquidWsMessage::OrderUpdates { data } => {
347                if let Some(account_id) = account_id
348                    && let Some(msg) =
349                        Self::handle_order_updates(&data, instruments, account_id, ts_init)
350                {
351                    result.push(msg);
352                }
353            }
354            HyperliquidWsMessage::UserEvents { data } => {
355                if let Some(account_id) = account_id
356                    && let WsUserEventData::Fills { fills } = data
357                    && let Some(msg) =
358                        Self::handle_user_fills(&fills, instruments, account_id, ts_init)
359                {
360                    result.push(msg);
361                }
362            }
363            HyperliquidWsMessage::Trades { data } => {
364                if let Some(msg) = Self::handle_trades(&data, instruments, ts_init) {
365                    result.push(msg);
366                }
367            }
368            HyperliquidWsMessage::Bbo { data } => {
369                if let Some(msg) = Self::handle_bbo(&data, instruments, ts_init) {
370                    result.push(msg);
371                }
372            }
373            HyperliquidWsMessage::L2Book { data } => {
374                if let Some(msg) = Self::handle_l2_book(&data, instruments, ts_init) {
375                    result.push(msg);
376                }
377            }
378            HyperliquidWsMessage::Candle { data } => {
379                if let Some(msg) =
380                    Self::handle_candle(&data, instruments, bar_types, bar_cache, ts_init)
381                {
382                    result.push(msg);
383                }
384            }
385            HyperliquidWsMessage::ActiveAssetCtx { data }
386            | HyperliquidWsMessage::ActiveSpotAssetCtx { data } => {
387                result.extend(Self::handle_asset_context(
388                    &data,
389                    instruments,
390                    asset_context_subs,
391                    mark_price_cache,
392                    index_price_cache,
393                    funding_rate_cache,
394                    ts_init,
395                ));
396            }
397            HyperliquidWsMessage::Error { data } => {
398                tracing::warn!("Received error from Hyperliquid WebSocket: {data}");
399            }
400            // Ignore other message types (subscription confirmations, etc)
401            _ => {}
402        }
403
404        result
405    }
406
407    fn handle_order_updates(
408        data: &[super::messages::WsOrderData],
409        instruments: &AHashMap<Ustr, InstrumentAny>,
410        account_id: AccountId,
411        ts_init: UnixNanos,
412    ) -> Option<NautilusWsMessage> {
413        let mut exec_reports = Vec::new();
414
415        for order_update in data {
416            if let Some(instrument) = instruments.get(&order_update.order.coin) {
417                match parse_ws_order_status_report(order_update, instrument, account_id, ts_init) {
418                    Ok(report) => {
419                        exec_reports.push(ExecutionReport::Order(report));
420                    }
421                    Err(e) => {
422                        tracing::error!("Error parsing order update: {e}");
423                    }
424                }
425            } else {
426                tracing::debug!("No instrument found for coin: {}", order_update.order.coin);
427            }
428        }
429
430        if !exec_reports.is_empty() {
431            Some(NautilusWsMessage::ExecutionReports(exec_reports))
432        } else {
433            None
434        }
435    }
436
437    fn handle_user_fills(
438        fills: &[super::messages::WsFillData],
439        instruments: &AHashMap<Ustr, InstrumentAny>,
440        account_id: AccountId,
441        ts_init: UnixNanos,
442    ) -> Option<NautilusWsMessage> {
443        let mut exec_reports = Vec::new();
444
445        for fill in fills {
446            if let Some(instrument) = instruments.get(&fill.coin) {
447                match parse_ws_fill_report(fill, instrument, account_id, ts_init) {
448                    Ok(report) => {
449                        exec_reports.push(ExecutionReport::Fill(report));
450                    }
451                    Err(e) => {
452                        tracing::error!("Error parsing fill: {e}");
453                    }
454                }
455            } else {
456                tracing::debug!("No instrument found for coin: {}", fill.coin);
457            }
458        }
459
460        if !exec_reports.is_empty() {
461            Some(NautilusWsMessage::ExecutionReports(exec_reports))
462        } else {
463            None
464        }
465    }
466
467    fn handle_trades(
468        data: &[super::messages::WsTradeData],
469        instruments: &AHashMap<Ustr, InstrumentAny>,
470        ts_init: UnixNanos,
471    ) -> Option<NautilusWsMessage> {
472        let mut trade_ticks = Vec::new();
473
474        for trade in data {
475            if let Some(instrument) = instruments.get(&trade.coin) {
476                match parse_ws_trade_tick(trade, instrument, ts_init) {
477                    Ok(tick) => trade_ticks.push(tick),
478                    Err(e) => {
479                        tracing::error!("Error parsing trade tick: {e}");
480                    }
481                }
482            } else {
483                tracing::debug!("No instrument found for coin: {}", trade.coin);
484            }
485        }
486
487        if !trade_ticks.is_empty() {
488            Some(NautilusWsMessage::Trades(trade_ticks))
489        } else {
490            None
491        }
492    }
493
494    fn handle_bbo(
495        data: &super::messages::WsBboData,
496        instruments: &AHashMap<Ustr, InstrumentAny>,
497        ts_init: UnixNanos,
498    ) -> Option<NautilusWsMessage> {
499        if let Some(instrument) = instruments.get(&data.coin) {
500            match parse_ws_quote_tick(data, instrument, ts_init) {
501                Ok(quote_tick) => Some(NautilusWsMessage::Quote(quote_tick)),
502                Err(e) => {
503                    tracing::error!("Error parsing quote tick: {e}");
504                    None
505                }
506            }
507        } else {
508            tracing::debug!("No instrument found for coin: {}", data.coin);
509            None
510        }
511    }
512
513    fn handle_l2_book(
514        data: &super::messages::WsBookData,
515        instruments: &AHashMap<Ustr, InstrumentAny>,
516        ts_init: UnixNanos,
517    ) -> Option<NautilusWsMessage> {
518        if let Some(instrument) = instruments.get(&data.coin) {
519            match parse_ws_order_book_deltas(data, instrument, ts_init) {
520                Ok(deltas) => Some(NautilusWsMessage::Deltas(deltas)),
521                Err(e) => {
522                    tracing::error!("Error parsing order book deltas: {e}");
523                    None
524                }
525            }
526        } else {
527            tracing::debug!("No instrument found for coin: {}", data.coin);
528            None
529        }
530    }
531
532    fn handle_candle(
533        data: &CandleData,
534        instruments: &AHashMap<Ustr, InstrumentAny>,
535        bar_types: &AHashMap<String, BarType>,
536        bar_cache: &mut AHashMap<String, CandleData>,
537        ts_init: UnixNanos,
538    ) -> Option<NautilusWsMessage> {
539        let key = format!("candle:{}:{}", data.s, data.i);
540
541        let mut closed_bar = None;
542        if let Some(cached) = bar_cache.get(&key) {
543            // Emit cached bar when close_time changes, indicating the previous period closed
544            if cached.close_time != data.close_time {
545                tracing::debug!(
546                    "Bar period changed for {}: prev_close_time={}, new_close_time={}",
547                    data.s,
548                    cached.close_time,
549                    data.close_time
550                );
551                closed_bar = Some(cached.clone());
552            }
553        }
554
555        bar_cache.insert(key.clone(), data.clone());
556
557        if let Some(closed_data) = closed_bar {
558            if let Some(bar_type) = bar_types.get(&key) {
559                if let Some(instrument) = instruments.get(&data.s) {
560                    match parse_ws_candle(&closed_data, instrument, bar_type, ts_init) {
561                        Ok(bar) => return Some(NautilusWsMessage::Candle(bar)),
562                        Err(e) => {
563                            tracing::error!("Error parsing closed candle: {e}");
564                        }
565                    }
566                } else {
567                    tracing::debug!("No instrument found for coin: {}", data.s);
568                }
569            } else {
570                tracing::debug!("No bar type found for key: {key}");
571            }
572        }
573
574        None
575    }
576
577    fn handle_asset_context(
578        data: &WsActiveAssetCtxData,
579        instruments: &AHashMap<Ustr, InstrumentAny>,
580        asset_context_subs: &AHashMap<Ustr, HashSet<AssetContextDataType>>,
581        mark_price_cache: &mut AHashMap<Ustr, String>,
582        index_price_cache: &mut AHashMap<Ustr, String>,
583        funding_rate_cache: &mut AHashMap<Ustr, String>,
584        ts_init: UnixNanos,
585    ) -> Vec<NautilusWsMessage> {
586        let mut result = Vec::new();
587
588        let coin = match data {
589            WsActiveAssetCtxData::Perp { coin, .. } => coin,
590            WsActiveAssetCtxData::Spot { coin, .. } => coin,
591        };
592
593        if let Some(instrument) = instruments.get(coin) {
594            let (mark_px, oracle_px, funding) = match data {
595                WsActiveAssetCtxData::Perp { ctx, .. } => (
596                    &ctx.shared.mark_px,
597                    Some(&ctx.oracle_px),
598                    Some(&ctx.funding),
599                ),
600                WsActiveAssetCtxData::Spot { ctx, .. } => (&ctx.shared.mark_px, None, None),
601            };
602
603            let mark_changed = mark_price_cache.get(coin) != Some(mark_px);
604            let index_changed = oracle_px.is_some_and(|px| index_price_cache.get(coin) != Some(px));
605            let funding_changed =
606                funding.is_some_and(|rate| funding_rate_cache.get(coin) != Some(rate));
607
608            let subscribed_types = asset_context_subs.get(coin);
609
610            if mark_changed || index_changed || funding_changed {
611                match parse_ws_asset_context(data, instrument, ts_init) {
612                    Ok((mark_price, index_price, funding_rate)) => {
613                        if mark_changed
614                            && subscribed_types
615                                .is_some_and(|s| s.contains(&AssetContextDataType::MarkPrice))
616                        {
617                            mark_price_cache.insert(*coin, mark_px.clone());
618                            result.push(NautilusWsMessage::MarkPrice(mark_price));
619                        }
620                        if index_changed
621                            && subscribed_types
622                                .is_some_and(|s| s.contains(&AssetContextDataType::IndexPrice))
623                        {
624                            if let Some(px) = oracle_px {
625                                index_price_cache.insert(*coin, px.clone());
626                            }
627                            if let Some(index) = index_price {
628                                result.push(NautilusWsMessage::IndexPrice(index));
629                            }
630                        }
631                        if funding_changed
632                            && subscribed_types
633                                .is_some_and(|s| s.contains(&AssetContextDataType::FundingRate))
634                        {
635                            if let Some(rate) = funding {
636                                funding_rate_cache.insert(*coin, rate.clone());
637                            }
638                            if let Some(funding) = funding_rate {
639                                result.push(NautilusWsMessage::FundingRate(funding));
640                            }
641                        }
642                    }
643                    Err(e) => {
644                        tracing::error!("Error parsing asset context: {e}");
645                    }
646                }
647            }
648        } else {
649            tracing::debug!("No instrument found for coin: {coin}");
650        }
651
652        result
653    }
654}
655
656/// Creates a canonical subscription key from a SubscriptionRequest for tracking.
657fn subscription_to_key(sub: &SubscriptionRequest) -> String {
658    match sub {
659        SubscriptionRequest::AllMids { dex } => {
660            if let Some(dex_name) = dex {
661                format!("allMids:{dex_name}")
662            } else {
663                "allMids".to_string()
664            }
665        }
666        SubscriptionRequest::Notification { user } => format!("notification:{user}"),
667        SubscriptionRequest::WebData2 { user } => format!("webData2:{user}"),
668        SubscriptionRequest::Candle { coin, interval } => {
669            format!("candle:{coin}:{}", interval.as_str())
670        }
671        SubscriptionRequest::L2Book { coin, .. } => format!("l2Book:{coin}"),
672        SubscriptionRequest::Trades { coin } => format!("trades:{coin}"),
673        SubscriptionRequest::OrderUpdates { user } => format!("orderUpdates:{user}"),
674        SubscriptionRequest::UserEvents { user } => format!("userEvents:{user}"),
675        SubscriptionRequest::UserFills { user, .. } => format!("userFills:{user}"),
676        SubscriptionRequest::UserFundings { user } => format!("userFundings:{user}"),
677        SubscriptionRequest::UserNonFundingLedgerUpdates { user } => {
678            format!("userNonFundingLedgerUpdates:{user}")
679        }
680        SubscriptionRequest::ActiveAssetCtx { coin } => format!("activeAssetCtx:{coin}"),
681        SubscriptionRequest::ActiveSpotAssetCtx { coin } => format!("activeSpotAssetCtx:{coin}"),
682        SubscriptionRequest::ActiveAssetData { user, coin } => {
683            format!("activeAssetData:{user}:{coin}")
684        }
685        SubscriptionRequest::UserTwapSliceFills { user } => format!("userTwapSliceFills:{user}"),
686        SubscriptionRequest::UserTwapHistory { user } => format!("userTwapHistory:{user}"),
687        SubscriptionRequest::Bbo { coin } => format!("bbo:{coin}"),
688    }
689}
690
691/// Determines whether a Hyperliquid WebSocket error should trigger a retry.
692pub(crate) fn should_retry_hyperliquid_error(error: &HyperliquidWsError) -> bool {
693    match error {
694        HyperliquidWsError::TungsteniteError(_) => true,
695        HyperliquidWsError::ClientError(msg) => {
696            let msg_lower = msg.to_lowercase();
697            msg_lower.contains("timeout")
698                || msg_lower.contains("timed out")
699                || msg_lower.contains("connection")
700                || msg_lower.contains("network")
701        }
702        _ => false,
703    }
704}
705
706/// Creates a timeout error for Hyperliquid retry logic.
707pub(crate) fn create_hyperliquid_timeout_error(msg: String) -> HyperliquidWsError {
708    HyperliquidWsError::ClientError(msg)
709}