Skip to main content

nautilus_hyperliquid/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Hyperliquid.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::{AHashMap, AHashSet};
24use dashmap::DashMap;
25use nautilus_common::cache::fifo::FifoCache;
26use nautilus_core::{AtomicTime, nanos::UnixNanos, time::get_atomic_clock_realtime};
27use nautilus_model::{
28    data::BarType,
29    identifiers::{AccountId, ClientOrderId},
30    instruments::{Instrument, InstrumentAny},
31};
32use nautilus_network::{
33    RECONNECTED,
34    retry::{RetryManager, create_websocket_retry_manager},
35    websocket::{SubscriptionState, WebSocketClient},
36};
37use tokio_tungstenite::tungstenite::Message;
38use ustr::Ustr;
39
40use super::{
41    client::AssetContextDataType,
42    error::HyperliquidWsError,
43    messages::{
44        CandleData, ExecutionReport, HyperliquidWsMessage, HyperliquidWsRequest, NautilusWsMessage,
45        SubscriptionRequest, WsActiveAssetCtxData, WsUserEventData,
46    },
47    parse::{
48        parse_ws_asset_context, parse_ws_candle, parse_ws_fill_report, parse_ws_order_book_deltas,
49        parse_ws_order_status_report, parse_ws_quote_tick, parse_ws_trade_tick,
50    },
51};
52
53/// Commands sent from the outer client to the inner message handler.
54#[derive(Debug)]
55#[allow(
56    clippy::large_enum_variant,
57    reason = "Commands are ephemeral and immediately consumed"
58)]
59#[allow(private_interfaces)]
60pub enum HandlerCommand {
61    /// Set the WebSocketClient for the handler to use.
62    SetClient(WebSocketClient),
63    /// Disconnect the WebSocket connection.
64    Disconnect,
65    /// Subscribe to the given subscriptions.
66    Subscribe {
67        subscriptions: Vec<SubscriptionRequest>,
68    },
69    /// Unsubscribe from the given subscriptions.
70    Unsubscribe {
71        subscriptions: Vec<SubscriptionRequest>,
72    },
73    /// Initialize the instruments cache with the given instruments.
74    InitializeInstruments(Vec<InstrumentAny>),
75    /// Update a single instrument in the cache.
76    UpdateInstrument(InstrumentAny),
77    /// Add a bar type mapping for candle parsing.
78    AddBarType { key: String, bar_type: BarType },
79    /// Remove a bar type mapping.
80    RemoveBarType { key: String },
81    /// Update asset context subscriptions for a coin.
82    UpdateAssetContextSubs {
83        coin: Ustr,
84        data_types: AHashSet<AssetContextDataType>,
85    },
86    /// Cache spot fill coin mappings for instrument lookup.
87    CacheSpotFillCoins(AHashMap<Ustr, Ustr>),
88}
89
90pub(super) struct FeedHandler {
91    clock: &'static AtomicTime,
92    signal: Arc<AtomicBool>,
93    client: Option<WebSocketClient>,
94    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
95    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
96    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
97    account_id: Option<AccountId>,
98    subscriptions: SubscriptionState,
99    retry_manager: RetryManager<HyperliquidWsError>,
100    message_buffer: Vec<NautilusWsMessage>,
101    instruments: AHashMap<Ustr, InstrumentAny>,
102    cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
103    bar_types_cache: AHashMap<String, BarType>,
104    bar_cache: AHashMap<String, CandleData>,
105    asset_context_subs: AHashMap<Ustr, AHashSet<AssetContextDataType>>,
106    processed_trade_ids: FifoCache<u64, 10_000>,
107    mark_price_cache: AHashMap<Ustr, String>,
108    index_price_cache: AHashMap<Ustr, String>,
109    funding_rate_cache: AHashMap<Ustr, String>,
110}
111
112impl FeedHandler {
113    /// Creates a new [`FeedHandler`] instance.
114    pub(super) fn new(
115        signal: Arc<AtomicBool>,
116        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
117        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
118        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
119        account_id: Option<AccountId>,
120        subscriptions: SubscriptionState,
121        cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
122    ) -> Self {
123        Self {
124            clock: get_atomic_clock_realtime(),
125            signal,
126            client: None,
127            cmd_rx,
128            raw_rx,
129            out_tx,
130            account_id,
131            subscriptions,
132            retry_manager: create_websocket_retry_manager(),
133            message_buffer: Vec::new(),
134            instruments: AHashMap::new(),
135            cloid_cache,
136            bar_types_cache: AHashMap::new(),
137            bar_cache: AHashMap::new(),
138            asset_context_subs: AHashMap::new(),
139            processed_trade_ids: FifoCache::new(),
140            mark_price_cache: AHashMap::new(),
141            index_price_cache: AHashMap::new(),
142            funding_rate_cache: AHashMap::new(),
143        }
144    }
145
146    /// Send a message to the output channel.
147    pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), String> {
148        self.out_tx
149            .send(msg)
150            .map_err(|e| format!("Failed to send message: {e}"))
151    }
152
153    /// Check if the handler has received a stop signal.
154    pub(super) fn is_stopped(&self) -> bool {
155        self.signal.load(Ordering::Relaxed)
156    }
157
158    /// Sends a WebSocket message with retry logic.
159    async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
160        if let Some(client) = &self.client {
161            self.retry_manager
162                .execute_with_retry(
163                    "websocket_send",
164                    || {
165                        let payload = payload.clone();
166                        async move {
167                            client.send_text(payload, None).await.map_err(|e| {
168                                HyperliquidWsError::ClientError(format!("Send failed: {e}"))
169                            })
170                        }
171                    },
172                    should_retry_hyperliquid_error,
173                    create_hyperliquid_timeout_error,
174                )
175                .await
176                .map_err(|e| anyhow::anyhow!("{e}"))
177        } else {
178            Err(anyhow::anyhow!("No WebSocket client available"))
179        }
180    }
181
182    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
183        if !self.message_buffer.is_empty() {
184            return Some(self.message_buffer.remove(0));
185        }
186
187        loop {
188            tokio::select! {
189                Some(cmd) = self.cmd_rx.recv() => {
190                    match cmd {
191                        HandlerCommand::SetClient(client) => {
192                            log::debug!("Setting WebSocket client in handler");
193                            self.client = Some(client);
194                        }
195                        HandlerCommand::Disconnect => {
196                            log::debug!("Handler received disconnect command");
197                            if let Some(ref client) = self.client {
198                                client.disconnect().await;
199                            }
200                            self.signal.store(true, Ordering::SeqCst);
201                            return None;
202                        }
203                        HandlerCommand::Subscribe { subscriptions } => {
204                            for subscription in subscriptions {
205                                let key = subscription_to_key(&subscription);
206                                self.subscriptions.mark_subscribe(&key);
207
208                                let request = HyperliquidWsRequest::Subscribe { subscription };
209                                match serde_json::to_string(&request) {
210                                    Ok(payload) => {
211                                        log::debug!("Sending subscribe payload: {payload}");
212                                        if let Err(e) = self.send_with_retry(payload).await {
213                                            log::error!("Error subscribing to {key}: {e}");
214                                            self.subscriptions.mark_failure(&key);
215                                        }
216                                    }
217                                    Err(e) => {
218                                        log::error!("Error serializing subscription for {key}: {e}");
219                                        self.subscriptions.mark_failure(&key);
220                                    }
221                                }
222                            }
223                        }
224                        HandlerCommand::Unsubscribe { subscriptions } => {
225                            for subscription in subscriptions {
226                                let key = subscription_to_key(&subscription);
227                                self.subscriptions.mark_unsubscribe(&key);
228
229                                let request = HyperliquidWsRequest::Unsubscribe { subscription };
230                                match serde_json::to_string(&request) {
231                                    Ok(payload) => {
232                                        log::debug!("Sending unsubscribe payload: {payload}");
233                                        if let Err(e) = self.send_with_retry(payload).await {
234                                            log::error!("Error unsubscribing from {key}: {e}");
235                                        }
236                                    }
237                                    Err(e) => {
238                                        log::error!("Error serializing unsubscription for {key}: {e}");
239                                    }
240                                }
241                            }
242                        }
243                        HandlerCommand::InitializeInstruments(instruments) => {
244                            for inst in instruments {
245                                let coin = inst.raw_symbol().inner();
246                                self.instruments.insert(coin, inst);
247                            }
248                        }
249                        HandlerCommand::UpdateInstrument(inst) => {
250                            let coin = inst.raw_symbol().inner();
251                            self.instruments.insert(coin, inst);
252                        }
253                        HandlerCommand::AddBarType { key, bar_type } => {
254                            self.bar_types_cache.insert(key, bar_type);
255                        }
256                        HandlerCommand::RemoveBarType { key } => {
257                            self.bar_types_cache.remove(&key);
258                            self.bar_cache.remove(&key);
259                        }
260                        HandlerCommand::UpdateAssetContextSubs { coin, data_types } => {
261                            if data_types.is_empty() {
262                                self.asset_context_subs.remove(&coin);
263                            } else {
264                                self.asset_context_subs.insert(coin, data_types);
265                            }
266                        }
267                        HandlerCommand::CacheSpotFillCoins(_) => {
268                            // No longer needed - raw_symbol now contains the proper format
269                        }
270                    }
271                    continue;
272                }
273
274                Some(raw_msg) = self.raw_rx.recv() => {
275                    match raw_msg {
276                        Message::Text(text) => {
277                            if text == RECONNECTED {
278                                log::info!("Received RECONNECTED sentinel");
279                                return Some(NautilusWsMessage::Reconnected);
280                            }
281
282                            match serde_json::from_str::<HyperliquidWsMessage>(&text) {
283                                Ok(msg) => {
284                                    let ts_init = self.clock.get_time_ns();
285
286                                    let nautilus_msgs = Self::parse_to_nautilus_messages(
287                                        msg,
288                                        &self.instruments,
289                                        &self.cloid_cache,
290                                        &self.bar_types_cache,
291                                        self.account_id,
292                                        ts_init,
293                                        &self.asset_context_subs,
294                                        &mut self.processed_trade_ids,
295                                        &mut self.mark_price_cache,
296                                        &mut self.index_price_cache,
297                                        &mut self.funding_rate_cache,
298                                        &mut self.bar_cache,
299                                    );
300
301                                    if !nautilus_msgs.is_empty() {
302                                        let mut iter = nautilus_msgs.into_iter();
303                                        let first = iter.next().unwrap();
304                                        self.message_buffer.extend(iter);
305                                        return Some(first);
306                                    }
307                                }
308                                Err(e) => {
309                                    log::error!("Error parsing WebSocket message: {e}, text: {text}");
310                                }
311                            }
312                        }
313                        Message::Ping(data) => {
314                            if let Some(ref client) = self.client
315                                && let Err(e) = client.send_pong(data.to_vec()).await {
316                                log::error!("Error sending pong: {e}");
317                            }
318                        }
319                        Message::Close(_) => {
320                            log::info!("Received WebSocket close frame");
321                            return None;
322                        }
323                        _ => {}
324                    }
325                }
326
327                else => {
328                    log::debug!("Handler shutting down: stream ended or command channel closed");
329                    return None;
330                }
331            }
332        }
333    }
334
335    #[allow(clippy::too_many_arguments)]
336    fn parse_to_nautilus_messages(
337        msg: HyperliquidWsMessage,
338        instruments: &AHashMap<Ustr, InstrumentAny>,
339        cloid_cache: &DashMap<Ustr, ClientOrderId>,
340        bar_types: &AHashMap<String, BarType>,
341        account_id: Option<AccountId>,
342        ts_init: UnixNanos,
343        asset_context_subs: &AHashMap<Ustr, AHashSet<AssetContextDataType>>,
344        processed_trade_ids: &mut FifoCache<u64, 10_000>,
345        mark_price_cache: &mut AHashMap<Ustr, String>,
346        index_price_cache: &mut AHashMap<Ustr, String>,
347        funding_rate_cache: &mut AHashMap<Ustr, String>,
348        bar_cache: &mut AHashMap<String, CandleData>,
349    ) -> Vec<NautilusWsMessage> {
350        let mut result = Vec::new();
351
352        match msg {
353            HyperliquidWsMessage::OrderUpdates { data } => {
354                if let Some(account_id) = account_id
355                    && let Some(msg) = Self::handle_order_updates(
356                        &data,
357                        instruments,
358                        cloid_cache,
359                        account_id,
360                        ts_init,
361                    )
362                {
363                    result.push(msg);
364                }
365            }
366            HyperliquidWsMessage::UserEvents { data } | HyperliquidWsMessage::User { data } => {
367                // Process fills from userEvents channel (userFills channel is redundant)
368                match data {
369                    WsUserEventData::Fills { fills } => {
370                        log::debug!("Received {} fill(s) from userEvents channel", fills.len());
371                        for fill in &fills {
372                            log::debug!(
373                                "Fill: oid={}, coin={}, side={:?}, sz={}, px={}",
374                                fill.oid,
375                                fill.coin,
376                                fill.side,
377                                fill.sz,
378                                fill.px
379                            );
380                        }
381                        if let Some(account_id) = account_id {
382                            log::debug!("Processing fills with account_id={account_id}");
383                            if let Some(msg) = Self::handle_user_fills(
384                                &fills,
385                                instruments,
386                                cloid_cache,
387                                account_id,
388                                ts_init,
389                                processed_trade_ids,
390                            ) {
391                                log::debug!("Successfully created fill message");
392                                result.push(msg);
393                            } else {
394                                log::debug!("handle_user_fills returned None (no new fills)");
395                            }
396                        } else {
397                            log::warn!("Cannot process fills: account_id is None");
398                        }
399                    }
400                    _ => {
401                        log::debug!("Received non-fill user event: {data:?}");
402                    }
403                }
404            }
405            HyperliquidWsMessage::UserFills { data } => {
406                // UserFills channel is redundant with userEvents, but handle it for
407                // backwards compatibility if explicitly subscribed
408                if let Some(account_id) = account_id
409                    && let Some(msg) = Self::handle_user_fills(
410                        &data.fills,
411                        instruments,
412                        cloid_cache,
413                        account_id,
414                        ts_init,
415                        processed_trade_ids,
416                    )
417                {
418                    result.push(msg);
419                }
420            }
421            HyperliquidWsMessage::Trades { data } => {
422                if let Some(msg) = Self::handle_trades(&data, instruments, ts_init) {
423                    result.push(msg);
424                }
425            }
426            HyperliquidWsMessage::Bbo { data } => {
427                if let Some(msg) = Self::handle_bbo(&data, instruments, ts_init) {
428                    result.push(msg);
429                }
430            }
431            HyperliquidWsMessage::L2Book { data } => {
432                if let Some(msg) = Self::handle_l2_book(&data, instruments, ts_init) {
433                    result.push(msg);
434                }
435            }
436            HyperliquidWsMessage::Candle { data } => {
437                if let Some(msg) =
438                    Self::handle_candle(&data, instruments, bar_types, bar_cache, ts_init)
439                {
440                    result.push(msg);
441                }
442            }
443            HyperliquidWsMessage::ActiveAssetCtx { data }
444            | HyperliquidWsMessage::ActiveSpotAssetCtx { data } => {
445                result.extend(Self::handle_asset_context(
446                    &data,
447                    instruments,
448                    asset_context_subs,
449                    mark_price_cache,
450                    index_price_cache,
451                    funding_rate_cache,
452                    ts_init,
453                ));
454            }
455            HyperliquidWsMessage::Error { data } => {
456                log::warn!("Received error from Hyperliquid WebSocket: {data}");
457            }
458            // Ignore other message types (subscription confirmations, etc)
459            _ => {}
460        }
461
462        result
463    }
464
465    fn handle_order_updates(
466        data: &[super::messages::WsOrderData],
467        instruments: &AHashMap<Ustr, InstrumentAny>,
468        cloid_cache: &DashMap<Ustr, ClientOrderId>,
469        account_id: AccountId,
470        ts_init: UnixNanos,
471    ) -> Option<NautilusWsMessage> {
472        let mut exec_reports = Vec::new();
473
474        for order_update in data {
475            let instrument = instruments.get(&order_update.order.coin);
476
477            if let Some(instrument) = instrument {
478                match parse_ws_order_status_report(order_update, instrument, account_id, ts_init) {
479                    Ok(mut report) => {
480                        // Resolve cloid to real client_order_id if cached
481                        if let Some(cloid) = &order_update.order.cloid {
482                            let cloid_ustr = Ustr::from(cloid.as_str());
483                            if let Some(entry) = cloid_cache.get(&cloid_ustr) {
484                                let real_client_order_id = *entry.value();
485                                log::debug!("Resolved cloid {cloid} -> {real_client_order_id}");
486                                report.client_order_id = Some(real_client_order_id);
487                            }
488                        }
489                        exec_reports.push(ExecutionReport::Order(report));
490                    }
491                    Err(e) => {
492                        log::error!("Error parsing order update: {e}");
493                    }
494                }
495            } else {
496                log::debug!("No instrument found for coin: {}", order_update.order.coin);
497            }
498        }
499
500        if exec_reports.is_empty() {
501            None
502        } else {
503            Some(NautilusWsMessage::ExecutionReports(exec_reports))
504        }
505    }
506
507    fn handle_user_fills(
508        fills: &[super::messages::WsFillData],
509        instruments: &AHashMap<Ustr, InstrumentAny>,
510        cloid_cache: &DashMap<Ustr, ClientOrderId>,
511        account_id: AccountId,
512        ts_init: UnixNanos,
513        processed_trade_ids: &mut FifoCache<u64, 10_000>,
514    ) -> Option<NautilusWsMessage> {
515        let mut exec_reports = Vec::new();
516
517        for fill in fills {
518            // Skip duplicate fills (Hyperliquid sometimes sends duplicate userEvents)
519            if processed_trade_ids.contains(&fill.tid) {
520                log::debug!("Skipping duplicate fill: tid={}", fill.tid);
521                continue;
522            }
523            processed_trade_ids.add(fill.tid);
524
525            let instrument = instruments.get(&fill.coin);
526
527            if let Some(instrument) = instrument {
528                log::debug!("Found instrument for fill coin={}", fill.coin);
529                match parse_ws_fill_report(fill, instrument, account_id, ts_init) {
530                    Ok(mut report) => {
531                        // Resolve cloid to real client_order_id if cached
532                        if let Some(cloid) = &fill.cloid {
533                            let cloid_ustr = Ustr::from(cloid.as_str());
534                            if let Some(entry) = cloid_cache.get(&cloid_ustr) {
535                                let real_client_order_id = *entry.value();
536                                log::debug!(
537                                    "Resolved fill cloid {cloid} -> {real_client_order_id}"
538                                );
539                                report.client_order_id = Some(real_client_order_id);
540                            }
541                        }
542                        log::debug!(
543                            "Parsed fill report: venue_order_id={:?}, trade_id={:?}",
544                            report.venue_order_id,
545                            report.trade_id
546                        );
547                        exec_reports.push(ExecutionReport::Fill(report));
548                    }
549                    Err(e) => {
550                        log::error!("Error parsing fill: {e}");
551                    }
552                }
553            } else {
554                log::warn!(
555                    "No instrument found for fill coin={}. Keys: {:?}",
556                    fill.coin,
557                    instruments.keys().collect::<Vec<_>>()
558                );
559            }
560        }
561
562        if exec_reports.is_empty() {
563            None
564        } else {
565            Some(NautilusWsMessage::ExecutionReports(exec_reports))
566        }
567    }
568
569    fn handle_trades(
570        data: &[super::messages::WsTradeData],
571        instruments: &AHashMap<Ustr, InstrumentAny>,
572        ts_init: UnixNanos,
573    ) -> Option<NautilusWsMessage> {
574        let mut trade_ticks = Vec::new();
575
576        for trade in data {
577            if let Some(instrument) = instruments.get(&trade.coin) {
578                match parse_ws_trade_tick(trade, instrument, ts_init) {
579                    Ok(tick) => trade_ticks.push(tick),
580                    Err(e) => {
581                        log::error!("Error parsing trade tick: {e}");
582                    }
583                }
584            } else {
585                log::debug!("No instrument found for coin: {}", trade.coin);
586            }
587        }
588
589        if trade_ticks.is_empty() {
590            None
591        } else {
592            Some(NautilusWsMessage::Trades(trade_ticks))
593        }
594    }
595
596    fn handle_bbo(
597        data: &super::messages::WsBboData,
598        instruments: &AHashMap<Ustr, InstrumentAny>,
599        ts_init: UnixNanos,
600    ) -> Option<NautilusWsMessage> {
601        if let Some(instrument) = instruments.get(&data.coin) {
602            match parse_ws_quote_tick(data, instrument, ts_init) {
603                Ok(quote_tick) => Some(NautilusWsMessage::Quote(quote_tick)),
604                Err(e) => {
605                    log::error!("Error parsing quote tick: {e}");
606                    None
607                }
608            }
609        } else {
610            log::debug!("No instrument found for coin: {}", data.coin);
611            None
612        }
613    }
614
615    fn handle_l2_book(
616        data: &super::messages::WsBookData,
617        instruments: &AHashMap<Ustr, InstrumentAny>,
618        ts_init: UnixNanos,
619    ) -> Option<NautilusWsMessage> {
620        if let Some(instrument) = instruments.get(&data.coin) {
621            match parse_ws_order_book_deltas(data, instrument, ts_init) {
622                Ok(deltas) => Some(NautilusWsMessage::Deltas(deltas)),
623                Err(e) => {
624                    log::error!("Error parsing order book deltas: {e}");
625                    None
626                }
627            }
628        } else {
629            log::debug!("No instrument found for coin: {}", data.coin);
630            None
631        }
632    }
633
634    fn handle_candle(
635        data: &CandleData,
636        instruments: &AHashMap<Ustr, InstrumentAny>,
637        bar_types: &AHashMap<String, BarType>,
638        bar_cache: &mut AHashMap<String, CandleData>,
639        ts_init: UnixNanos,
640    ) -> Option<NautilusWsMessage> {
641        let key = format!("candle:{}:{}", data.s, data.i);
642
643        let mut closed_bar = None;
644        if let Some(cached) = bar_cache.get(&key) {
645            // Emit cached bar when close_time changes, indicating the previous period closed
646            if cached.close_time != data.close_time {
647                log::debug!(
648                    "Bar period changed for {}: prev_close_time={}, new_close_time={}",
649                    data.s,
650                    cached.close_time,
651                    data.close_time
652                );
653                closed_bar = Some(cached.clone());
654            }
655        }
656
657        bar_cache.insert(key.clone(), data.clone());
658
659        if let Some(closed_data) = closed_bar {
660            if let Some(bar_type) = bar_types.get(&key) {
661                if let Some(instrument) = instruments.get(&data.s) {
662                    match parse_ws_candle(&closed_data, instrument, bar_type, ts_init) {
663                        Ok(bar) => return Some(NautilusWsMessage::Candle(bar)),
664                        Err(e) => {
665                            log::error!("Error parsing closed candle: {e}");
666                        }
667                    }
668                } else {
669                    log::debug!("No instrument found for coin: {}", data.s);
670                }
671            } else {
672                log::debug!("No bar type found for key: {key}");
673            }
674        }
675
676        None
677    }
678
679    fn handle_asset_context(
680        data: &WsActiveAssetCtxData,
681        instruments: &AHashMap<Ustr, InstrumentAny>,
682        asset_context_subs: &AHashMap<Ustr, AHashSet<AssetContextDataType>>,
683        mark_price_cache: &mut AHashMap<Ustr, String>,
684        index_price_cache: &mut AHashMap<Ustr, String>,
685        funding_rate_cache: &mut AHashMap<Ustr, String>,
686        ts_init: UnixNanos,
687    ) -> Vec<NautilusWsMessage> {
688        let mut result = Vec::new();
689
690        let coin = match data {
691            WsActiveAssetCtxData::Perp { coin, .. } => coin,
692            WsActiveAssetCtxData::Spot { coin, .. } => coin,
693        };
694
695        if let Some(instrument) = instruments.get(coin) {
696            let (mark_px, oracle_px, funding) = match data {
697                WsActiveAssetCtxData::Perp { ctx, .. } => (
698                    &ctx.shared.mark_px,
699                    Some(&ctx.oracle_px),
700                    Some(&ctx.funding),
701                ),
702                WsActiveAssetCtxData::Spot { ctx, .. } => (&ctx.shared.mark_px, None, None),
703            };
704
705            let mark_changed = mark_price_cache.get(coin) != Some(mark_px);
706            let index_changed = oracle_px.is_some_and(|px| index_price_cache.get(coin) != Some(px));
707            let funding_changed =
708                funding.is_some_and(|rate| funding_rate_cache.get(coin) != Some(rate));
709
710            let subscribed_types = asset_context_subs.get(coin);
711
712            if mark_changed || index_changed || funding_changed {
713                match parse_ws_asset_context(data, instrument, ts_init) {
714                    Ok((mark_price, index_price, funding_rate)) => {
715                        if mark_changed
716                            && subscribed_types
717                                .is_some_and(|s| s.contains(&AssetContextDataType::MarkPrice))
718                        {
719                            mark_price_cache.insert(*coin, mark_px.clone());
720                            result.push(NautilusWsMessage::MarkPrice(mark_price));
721                        }
722                        if index_changed
723                            && subscribed_types
724                                .is_some_and(|s| s.contains(&AssetContextDataType::IndexPrice))
725                        {
726                            if let Some(px) = oracle_px {
727                                index_price_cache.insert(*coin, px.clone());
728                            }
729                            if let Some(index) = index_price {
730                                result.push(NautilusWsMessage::IndexPrice(index));
731                            }
732                        }
733                        if funding_changed
734                            && subscribed_types
735                                .is_some_and(|s| s.contains(&AssetContextDataType::FundingRate))
736                        {
737                            if let Some(rate) = funding {
738                                funding_rate_cache.insert(*coin, rate.clone());
739                            }
740                            if let Some(funding) = funding_rate {
741                                result.push(NautilusWsMessage::FundingRate(funding));
742                            }
743                        }
744                    }
745                    Err(e) => {
746                        log::error!("Error parsing asset context: {e}");
747                    }
748                }
749            }
750        } else {
751            log::debug!("No instrument found for coin: {coin}");
752        }
753
754        result
755    }
756}
757
758/// Creates a canonical subscription key from a SubscriptionRequest for tracking.
759fn subscription_to_key(sub: &SubscriptionRequest) -> String {
760    match sub {
761        SubscriptionRequest::AllMids { dex } => {
762            if let Some(dex_name) = dex {
763                format!("allMids:{dex_name}")
764            } else {
765                "allMids".to_string()
766            }
767        }
768        SubscriptionRequest::Notification { user } => format!("notification:{user}"),
769        SubscriptionRequest::WebData2 { user } => format!("webData2:{user}"),
770        SubscriptionRequest::Candle { coin, interval } => {
771            format!("candle:{coin}:{}", interval.as_str())
772        }
773        SubscriptionRequest::L2Book { coin, .. } => format!("l2Book:{coin}"),
774        SubscriptionRequest::Trades { coin } => format!("trades:{coin}"),
775        SubscriptionRequest::OrderUpdates { user } => format!("orderUpdates:{user}"),
776        SubscriptionRequest::UserEvents { user } => format!("userEvents:{user}"),
777        SubscriptionRequest::UserFills { user, .. } => format!("userFills:{user}"),
778        SubscriptionRequest::UserFundings { user } => format!("userFundings:{user}"),
779        SubscriptionRequest::UserNonFundingLedgerUpdates { user } => {
780            format!("userNonFundingLedgerUpdates:{user}")
781        }
782        SubscriptionRequest::ActiveAssetCtx { coin } => format!("activeAssetCtx:{coin}"),
783        SubscriptionRequest::ActiveSpotAssetCtx { coin } => format!("activeSpotAssetCtx:{coin}"),
784        SubscriptionRequest::ActiveAssetData { user, coin } => {
785            format!("activeAssetData:{user}:{coin}")
786        }
787        SubscriptionRequest::UserTwapSliceFills { user } => format!("userTwapSliceFills:{user}"),
788        SubscriptionRequest::UserTwapHistory { user } => format!("userTwapHistory:{user}"),
789        SubscriptionRequest::Bbo { coin } => format!("bbo:{coin}"),
790    }
791}
792
793/// Determines whether a Hyperliquid WebSocket error should trigger a retry.
794pub(crate) fn should_retry_hyperliquid_error(error: &HyperliquidWsError) -> bool {
795    match error {
796        HyperliquidWsError::TungsteniteError(_) => true,
797        HyperliquidWsError::ClientError(msg) => {
798            let msg_lower = msg.to_lowercase();
799            msg_lower.contains("timeout")
800                || msg_lower.contains("timed out")
801                || msg_lower.contains("connection")
802                || msg_lower.contains("network")
803        }
804        _ => false,
805    }
806}
807
808/// Creates a timeout error for Hyperliquid retry logic.
809pub(crate) fn create_hyperliquid_timeout_error(msg: String) -> HyperliquidWsError {
810    HyperliquidWsError::ClientError(msg)
811}