nautilus_hyperliquid/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashSet,
18    str::FromStr,
19    sync::{
20        Arc,
21        atomic::{AtomicBool, AtomicU8, Ordering},
22    },
23};
24
25use anyhow::Context;
26use arc_swap::ArcSwap;
27use dashmap::DashMap;
28use nautilus_common::live::get_runtime;
29use nautilus_model::{
30    data::BarType,
31    identifiers::{AccountId, InstrumentId},
32    instruments::{Instrument, InstrumentAny},
33};
34use nautilus_network::{
35    mode::ConnectionMode,
36    websocket::{
37        AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
38    },
39};
40use ustr::Ustr;
41
42use crate::{
43    common::{HyperliquidProductType, enums::HyperliquidBarInterval, parse::bar_type_to_interval},
44    websocket::{
45        handler::{FeedHandler, HandlerCommand},
46        messages::{NautilusWsMessage, SubscriptionRequest},
47    },
48};
49
50const HYPERLIQUID_HEARTBEAT_MSG: &str = r#"{"method":"ping"}"#;
51
52/// Represents the different data types available from asset context subscriptions.
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54pub(super) enum AssetContextDataType {
55    MarkPrice,
56    IndexPrice,
57    FundingRate,
58}
59
60/// Hyperliquid WebSocket client following the BitMEX pattern.
61///
62/// Orchestrates WebSocket connection and subscriptions using a command-based architecture,
63/// where the inner FeedHandler owns the WebSocketClient and handles all I/O.
64#[derive(Debug)]
65#[cfg_attr(
66    feature = "python",
67    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.hyperliquid")
68)]
69pub struct HyperliquidWebSocketClient {
70    url: String,
71    product_type: HyperliquidProductType,
72    connection_mode: Arc<ArcSwap<AtomicU8>>,
73    signal: Arc<AtomicBool>,
74    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
75    out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
76    auth_tracker: AuthTracker,
77    subscriptions: SubscriptionState,
78    instruments: Arc<DashMap<Ustr, InstrumentAny>>,
79    bar_types: Arc<DashMap<String, BarType>>,
80    asset_context_subs: Arc<DashMap<Ustr, HashSet<AssetContextDataType>>>,
81    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
82    account_id: Option<AccountId>,
83}
84
85impl Clone for HyperliquidWebSocketClient {
86    fn clone(&self) -> Self {
87        Self {
88            url: self.url.clone(),
89            product_type: self.product_type,
90            connection_mode: Arc::clone(&self.connection_mode),
91            signal: Arc::clone(&self.signal),
92            cmd_tx: Arc::clone(&self.cmd_tx),
93            out_rx: None,
94            auth_tracker: self.auth_tracker.clone(),
95            subscriptions: self.subscriptions.clone(),
96            instruments: Arc::clone(&self.instruments),
97            bar_types: Arc::clone(&self.bar_types),
98            asset_context_subs: Arc::clone(&self.asset_context_subs),
99            task_handle: None,
100            account_id: self.account_id,
101        }
102    }
103}
104
105impl HyperliquidWebSocketClient {
106    /// Creates a new Hyperliquid WebSocket client without connecting.
107    ///
108    /// If `url` is `None`, the appropriate URL will be determined based on the `testnet` flag:
109    /// - `testnet=false`: `wss://api.hyperliquid.xyz/ws`
110    /// - `testnet=true`: `wss://api.hyperliquid-testnet.xyz/ws`
111    ///
112    /// The connection will be established when `connect()` is called.
113    pub fn new(
114        url: Option<String>,
115        testnet: bool,
116        product_type: HyperliquidProductType,
117        account_id: Option<AccountId>,
118    ) -> Self {
119        let url = url.unwrap_or_else(|| {
120            if testnet {
121                "wss://api.hyperliquid-testnet.xyz/ws".to_string()
122            } else {
123                "wss://api.hyperliquid.xyz/ws".to_string()
124            }
125        });
126        let connection_mode = Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
127            ConnectionMode::Closed as u8,
128        ))));
129        Self {
130            url,
131            product_type,
132            connection_mode,
133            signal: Arc::new(AtomicBool::new(false)),
134            auth_tracker: AuthTracker::new(),
135            subscriptions: SubscriptionState::new(':'),
136            instruments: Arc::new(DashMap::new()),
137            bar_types: Arc::new(DashMap::new()),
138            asset_context_subs: Arc::new(DashMap::new()),
139            cmd_tx: {
140                // Placeholder channel until connect() creates the real handler and replays queued instruments
141                let (tx, _) = tokio::sync::mpsc::unbounded_channel();
142                Arc::new(tokio::sync::RwLock::new(tx))
143            },
144            out_rx: None,
145            task_handle: None,
146            account_id,
147        }
148    }
149
150    /// Establishes WebSocket connection and spawns the message handler.
151    pub async fn connect(&mut self) -> anyhow::Result<()> {
152        if self.is_active() {
153            log::warn!("WebSocket already connected");
154            return Ok(());
155        }
156        let (message_handler, raw_rx) = channel_message_handler();
157        let cfg = WebSocketConfig {
158            url: self.url.clone(),
159            headers: vec![],
160            heartbeat: Some(30),
161            heartbeat_msg: Some(HYPERLIQUID_HEARTBEAT_MSG.to_string()),
162            reconnect_timeout_ms: Some(15_000),
163            reconnect_delay_initial_ms: Some(250),
164            reconnect_delay_max_ms: Some(5_000),
165            reconnect_backoff_factor: Some(2.0),
166            reconnect_jitter_ms: Some(200),
167            reconnect_max_attempts: None,
168        };
169        let client =
170            WebSocketClient::connect(cfg, Some(message_handler), None, None, vec![], None).await?;
171
172        // Atomically swap connection state to the client's atomic
173        self.connection_mode.store(client.connection_mode_atomic());
174        log::info!("Hyperliquid WebSocket connected: {}", self.url);
175
176        // Create channels for handler communication
177        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
178        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
179
180        // Send SetClient command immediately
181        if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
182            anyhow::bail!("Failed to send SetClient command: {e}");
183        }
184
185        // Initialize handler with existing instruments
186        let instruments_vec: Vec<InstrumentAny> = self
187            .instruments
188            .iter()
189            .map(|entry| entry.value().clone())
190            .collect();
191        if !instruments_vec.is_empty()
192            && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments_vec))
193        {
194            log::error!("Failed to send InitializeInstruments: {e}");
195        }
196
197        // Spawn handler task
198        let signal = Arc::clone(&self.signal);
199        let account_id = self.account_id;
200        let subscriptions = self.subscriptions.clone();
201        let cmd_tx_for_reconnect = cmd_tx.clone();
202
203        let stream_handle = get_runtime().spawn(async move {
204            let mut handler = FeedHandler::new(
205                signal,
206                cmd_rx,
207                raw_rx,
208                out_tx,
209                account_id,
210                subscriptions.clone(),
211            );
212
213            let resubscribe_all = || {
214                let topics = subscriptions.all_topics();
215                if topics.is_empty() {
216                    log::debug!("No active subscriptions to restore after reconnection");
217                    return;
218                }
219
220                log::info!(
221                    "Resubscribing to {} active subscriptions after reconnection",
222                    topics.len()
223                );
224                for topic in topics {
225                    match subscription_from_topic(&topic) {
226                        Ok(subscription) => {
227                            if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe {
228                                subscriptions: vec![subscription],
229                            }) {
230                                log::error!("Failed to send resubscribe command: {e}");
231                            }
232                        }
233                        Err(e) => {
234                            log::error!(
235                                "Failed to reconstruct subscription from topic: topic={topic}, {e}"
236                            );
237                        }
238                    }
239                }
240            };
241            loop {
242                match handler.next().await {
243                    Some(NautilusWsMessage::Reconnected) => {
244                        log::info!("WebSocket reconnected");
245                        resubscribe_all();
246                        continue;
247                    }
248                    Some(msg) => {
249                        if handler.send(msg).is_err() {
250                            log::error!("Failed to send message (receiver dropped)");
251                            break;
252                        }
253                    }
254                    None => {
255                        if handler.is_stopped() {
256                            log::debug!("Stop signal received, ending message processing");
257                            break;
258                        }
259                        log::warn!("WebSocket stream ended unexpectedly");
260                        break;
261                    }
262                }
263            }
264            log::debug!("Handler task completed");
265        });
266        self.task_handle = Some(Arc::new(stream_handle));
267        *self.cmd_tx.write().await = cmd_tx;
268        self.out_rx = Some(out_rx);
269        Ok(())
270    }
271
272    /// Disconnects the WebSocket connection.
273    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
274        log::info!("Disconnecting Hyperliquid WebSocket");
275        self.signal.store(true, Ordering::Relaxed);
276        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
277            log::debug!(
278                "Failed to send disconnect command (handler may already be shut down): {e}"
279            );
280        }
281        if let Some(task_handle) = self.task_handle.take() {
282            match Arc::try_unwrap(task_handle) {
283                Ok(handle) => {
284                    log::debug!("Waiting for task handle to complete");
285                    match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
286                        Ok(Ok(())) => log::debug!("Task handle completed successfully"),
287                        Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
288                        Err(_) => {
289                            log::warn!(
290                                "Timeout waiting for task handle, task may still be running"
291                            );
292                        }
293                    }
294                }
295                Err(arc_handle) => {
296                    log::debug!(
297                        "Cannot take ownership of task handle - other references exist, aborting task"
298                    );
299                    arc_handle.abort();
300                }
301            }
302        } else {
303            log::debug!("No task handle to await");
304        }
305        log::debug!("Disconnected");
306        Ok(())
307    }
308
309    /// Returns true if the WebSocket is actively connected.
310    pub fn is_active(&self) -> bool {
311        let mode = self.connection_mode.load();
312        mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8
313    }
314
315    /// Returns the URL of this WebSocket client.
316    pub fn url(&self) -> &str {
317        &self.url
318    }
319
320    /// Subscribe to order updates for a specific user address.
321    pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
322        let subscription = SubscriptionRequest::OrderUpdates {
323            user: user.to_string(),
324        };
325        self.cmd_tx
326            .read()
327            .await
328            .send(HandlerCommand::Subscribe {
329                subscriptions: vec![subscription],
330            })
331            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
332        Ok(())
333    }
334
335    /// Subscribe to user events (fills, funding, liquidations) for a specific user address.
336    pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
337        let subscription = SubscriptionRequest::UserEvents {
338            user: user.to_string(),
339        };
340        self.cmd_tx
341            .read()
342            .await
343            .send(HandlerCommand::Subscribe {
344                subscriptions: vec![subscription],
345            })
346            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
347        Ok(())
348    }
349
350    /// Subscribe to all user channels (order updates + user events) for convenience.
351    pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
352        self.subscribe_order_updates(user).await?;
353        self.subscribe_user_events(user).await?;
354        Ok(())
355    }
356
357    /// Subscribe to trades for an instrument.
358    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
359        let instrument = self
360            .get_instrument(&instrument_id)
361            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
362        let coin = instrument.raw_symbol().inner();
363
364        let cmd_tx = self.cmd_tx.read().await;
365
366        // Update the handler's coin→instrument mapping for this subscription
367        cmd_tx
368            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
369            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
370
371        let subscription = SubscriptionRequest::Trades { coin };
372
373        cmd_tx
374            .send(HandlerCommand::Subscribe {
375                subscriptions: vec![subscription],
376            })
377            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
378        Ok(())
379    }
380
381    /// Unsubscribe from trades for an instrument.
382    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
383        let instrument = self
384            .get_instrument(&instrument_id)
385            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
386        let coin = instrument.raw_symbol().inner();
387
388        let subscription = SubscriptionRequest::Trades { coin };
389
390        self.cmd_tx
391            .read()
392            .await
393            .send(HandlerCommand::Unsubscribe {
394                subscriptions: vec![subscription],
395            })
396            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
397        Ok(())
398    }
399
400    /// Subscribe to best bid/offer (BBO) quotes for an instrument.
401    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
402        let instrument = self
403            .get_instrument(&instrument_id)
404            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
405        let coin = instrument.raw_symbol().inner();
406
407        let cmd_tx = self.cmd_tx.read().await;
408
409        // Update the handler's coin→instrument mapping for this subscription
410        cmd_tx
411            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
412            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
413
414        let subscription = SubscriptionRequest::Bbo { coin };
415
416        cmd_tx
417            .send(HandlerCommand::Subscribe {
418                subscriptions: vec![subscription],
419            })
420            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
421        Ok(())
422    }
423
424    /// Unsubscribe from quote ticks for an instrument.
425    pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
426        let instrument = self
427            .get_instrument(&instrument_id)
428            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
429        let coin = instrument.raw_symbol().inner();
430
431        let subscription = SubscriptionRequest::Bbo { coin };
432
433        self.cmd_tx
434            .read()
435            .await
436            .send(HandlerCommand::Unsubscribe {
437                subscriptions: vec![subscription],
438            })
439            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
440        Ok(())
441    }
442
443    /// Subscribe to L2 order book for an instrument.
444    pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
445        let instrument = self
446            .get_instrument(&instrument_id)
447            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
448        let coin = instrument.raw_symbol().inner();
449
450        let cmd_tx = self.cmd_tx.read().await;
451
452        // Update the handler's coin→instrument mapping for this subscription
453        cmd_tx
454            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
455            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
456
457        let subscription = SubscriptionRequest::L2Book {
458            coin,
459            mantissa: None,
460            n_sig_figs: None,
461        };
462
463        cmd_tx
464            .send(HandlerCommand::Subscribe {
465                subscriptions: vec![subscription],
466            })
467            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
468        Ok(())
469    }
470
471    /// Unsubscribe from L2 order book for an instrument.
472    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
473        let instrument = self
474            .get_instrument(&instrument_id)
475            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
476        let coin = instrument.raw_symbol().inner();
477
478        let subscription = SubscriptionRequest::L2Book {
479            coin,
480            mantissa: None,
481            n_sig_figs: None,
482        };
483
484        self.cmd_tx
485            .read()
486            .await
487            .send(HandlerCommand::Unsubscribe {
488                subscriptions: vec![subscription],
489            })
490            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
491        Ok(())
492    }
493
494    /// Subscribe to candle/bar data for a specific coin and interval.
495    pub async fn subscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
496        // Get the instrument to extract the raw_symbol (Hyperliquid ticker)
497        let instrument = self
498            .get_instrument(&bar_type.instrument_id())
499            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
500        let coin = instrument.raw_symbol().inner();
501        let interval = bar_type_to_interval(&bar_type)?;
502        let subscription = SubscriptionRequest::Candle { coin, interval };
503
504        // Cache the bar type for parsing using canonical key
505        let key = format!("candle:{coin}:{interval}");
506        self.bar_types.insert(key.clone(), bar_type);
507
508        let cmd_tx = self.cmd_tx.read().await;
509
510        cmd_tx
511            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
512            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
513
514        cmd_tx
515            .send(HandlerCommand::AddBarType { key, bar_type })
516            .map_err(|e| anyhow::anyhow!("Failed to send AddBarType command: {e}"))?;
517
518        cmd_tx
519            .send(HandlerCommand::Subscribe {
520                subscriptions: vec![subscription],
521            })
522            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
523        Ok(())
524    }
525
526    /// Unsubscribe from candle/bar data.
527    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
528        // Get the instrument to extract the raw_symbol (Hyperliquid ticker)
529        let instrument = self
530            .get_instrument(&bar_type.instrument_id())
531            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
532        let coin = instrument.raw_symbol().inner();
533        let interval = bar_type_to_interval(&bar_type)?;
534        let subscription = SubscriptionRequest::Candle { coin, interval };
535
536        let key = format!("candle:{coin}:{interval}");
537        self.bar_types.remove(&key);
538
539        let cmd_tx = self.cmd_tx.read().await;
540
541        cmd_tx
542            .send(HandlerCommand::RemoveBarType { key })
543            .map_err(|e| anyhow::anyhow!("Failed to send RemoveBarType command: {e}"))?;
544
545        cmd_tx
546            .send(HandlerCommand::Unsubscribe {
547                subscriptions: vec![subscription],
548            })
549            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
550        Ok(())
551    }
552
553    /// Subscribe to mark price updates for an instrument.
554    pub async fn subscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
555        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
556            .await
557    }
558
559    /// Unsubscribe from mark price updates for an instrument.
560    pub async fn unsubscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
561        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
562            .await
563    }
564
565    /// Subscribe to index/oracle price updates for an instrument.
566    pub async fn subscribe_index_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
567        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
568            .await
569    }
570
571    /// Unsubscribe from index/oracle price updates for an instrument.
572    pub async fn unsubscribe_index_prices(
573        &self,
574        instrument_id: InstrumentId,
575    ) -> anyhow::Result<()> {
576        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
577            .await
578    }
579
580    /// Subscribe to funding rate updates for an instrument.
581    pub async fn subscribe_funding_rates(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
582        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
583            .await
584    }
585
586    /// Unsubscribe from funding rate updates for an instrument.
587    pub async fn unsubscribe_funding_rates(
588        &self,
589        instrument_id: InstrumentId,
590    ) -> anyhow::Result<()> {
591        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
592            .await
593    }
594
595    async fn subscribe_asset_context_data(
596        &self,
597        instrument_id: InstrumentId,
598        data_type: AssetContextDataType,
599    ) -> anyhow::Result<()> {
600        let instrument = self
601            .get_instrument(&instrument_id)
602            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
603        let coin = instrument.raw_symbol().inner();
604
605        let mut entry = self.asset_context_subs.entry(coin).or_default();
606        let is_first_subscription = entry.is_empty();
607        entry.insert(data_type);
608        let data_types = entry.clone();
609        drop(entry);
610
611        let cmd_tx = self.cmd_tx.read().await;
612
613        cmd_tx
614            .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
615            .map_err(|e| anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}"))?;
616
617        if is_first_subscription {
618            log::debug!(
619                "First asset context subscription for coin '{coin}', subscribing to ActiveAssetCtx"
620            );
621            let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
622
623            cmd_tx
624                .send(HandlerCommand::UpdateInstrument(instrument.clone()))
625                .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
626
627            cmd_tx
628                .send(HandlerCommand::Subscribe {
629                    subscriptions: vec![subscription],
630                })
631                .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
632        } else {
633            log::debug!(
634                "Already subscribed to ActiveAssetCtx for coin '{coin}', adding {data_type:?} to tracked types"
635            );
636        }
637
638        Ok(())
639    }
640
641    async fn unsubscribe_asset_context_data(
642        &self,
643        instrument_id: InstrumentId,
644        data_type: AssetContextDataType,
645    ) -> anyhow::Result<()> {
646        let instrument = self
647            .get_instrument(&instrument_id)
648            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
649        let coin = instrument.raw_symbol().inner();
650
651        if let Some(mut entry) = self.asset_context_subs.get_mut(&coin) {
652            entry.remove(&data_type);
653            let should_unsubscribe = entry.is_empty();
654            let data_types = entry.clone();
655            drop(entry);
656
657            let cmd_tx = self.cmd_tx.read().await;
658
659            if should_unsubscribe {
660                self.asset_context_subs.remove(&coin);
661
662                log::debug!(
663                    "Last asset context subscription removed for coin '{coin}', unsubscribing from ActiveAssetCtx"
664                );
665                let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
666
667                cmd_tx
668                    .send(HandlerCommand::UpdateAssetContextSubs {
669                        coin,
670                        data_types: HashSet::new(),
671                    })
672                    .map_err(|e| {
673                        anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
674                    })?;
675
676                cmd_tx
677                    .send(HandlerCommand::Unsubscribe {
678                        subscriptions: vec![subscription],
679                    })
680                    .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
681            } else {
682                log::debug!(
683                    "Removed {data_type:?} from tracked types for coin '{coin}', but keeping ActiveAssetCtx subscription"
684                );
685
686                cmd_tx
687                    .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
688                    .map_err(|e| {
689                        anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
690                    })?;
691            }
692        }
693
694        Ok(())
695    }
696
697    /// Caches multiple instruments.
698    ///
699    /// Clears the existing cache first, then adds all provided instruments.
700    /// Instruments are keyed by their full Nautilus symbol (e.g., "BTC-USD-PERP").
701    pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
702        self.instruments.clear();
703        for inst in instruments {
704            let symbol = inst.symbol().inner();
705            self.instruments.insert(symbol, inst.clone());
706        }
707        log::info!(
708            "Hyperliquid instrument cache initialized with {} instruments",
709            self.instruments.len()
710        );
711    }
712
713    /// Caches a single instrument.
714    ///
715    /// Any existing instrument with the same symbol will be replaced.
716    pub fn cache_instrument(&self, instrument: InstrumentAny) {
717        let symbol = instrument.symbol().inner();
718        self.instruments.insert(symbol, instrument.clone());
719
720        // Before connect() the handler isn't running; this send will fail and that's expected
721        // because connect() replays the instruments via InitializeInstruments
722        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
723            let _ = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument));
724        }
725    }
726
727    /// Gets an instrument from the cache by ID.
728    ///
729    /// Looks up the instrument by its full Nautilus symbol (e.g., "BTC-USD-PERP").
730    pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
731        self.instruments
732            .get(&id.symbol.inner())
733            .map(|e| e.value().clone())
734    }
735
736    /// Gets an instrument from the cache by symbol.
737    pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
738        self.instruments.get(symbol).map(|e| e.value().clone())
739    }
740
741    /// Returns the count of confirmed subscriptions.
742    pub fn subscription_count(&self) -> usize {
743        self.subscriptions.len()
744    }
745
746    /// Gets a bar type from the cache by coin and interval.
747    ///
748    /// This looks up the subscription key created when subscribing to bars.
749    pub fn get_bar_type(&self, coin: &str, interval: &str) -> Option<BarType> {
750        // Use canonical key format matching subscribe_bars
751        let key = format!("candle:{coin}:{interval}");
752        self.bar_types.get(&key).map(|entry| *entry.value())
753    }
754
755    /// Receives the next message from the WebSocket handler.
756    ///
757    /// Returns `None` if the handler has disconnected or the receiver was already taken.
758    pub async fn next_event(&mut self) -> Option<NautilusWsMessage> {
759        if let Some(ref mut rx) = self.out_rx {
760            rx.recv().await
761        } else {
762            None
763        }
764    }
765}
766
767/// Reconstructs a subscription request from a topic string.
768fn subscription_from_topic(topic: &str) -> anyhow::Result<SubscriptionRequest> {
769    let parts: Vec<&str> = topic.split(':').collect();
770
771    match parts.first() {
772        Some(&"allMids") => {
773            let dex = parts.get(1).map(|s| (*s).to_string());
774            Ok(SubscriptionRequest::AllMids { dex })
775        }
776        Some(&"notification") => Ok(SubscriptionRequest::Notification {
777            user: (*parts.get(1).context("Missing user")?).to_string(),
778        }),
779        Some(&"webData2") => Ok(SubscriptionRequest::WebData2 {
780            user: (*parts.get(1).context("Missing user")?).to_string(),
781        }),
782        Some(&"candle") => {
783            let coin = Ustr::from(parts.get(1).context("Missing coin")?);
784            let interval_str = parts.get(2).context("Missing interval")?;
785            let interval = HyperliquidBarInterval::from_str(interval_str)?;
786            Ok(SubscriptionRequest::Candle { coin, interval })
787        }
788        Some(&"l2Book") => Ok(SubscriptionRequest::L2Book {
789            coin: Ustr::from(parts.get(1).context("Missing coin")?),
790            mantissa: None,
791            n_sig_figs: None,
792        }),
793        Some(&"trades") => Ok(SubscriptionRequest::Trades {
794            coin: Ustr::from(parts.get(1).context("Missing coin")?),
795        }),
796        Some(&"orderUpdates") => Ok(SubscriptionRequest::OrderUpdates {
797            user: (*parts.get(1).context("Missing user")?).to_string(),
798        }),
799        Some(&"userEvents") => Ok(SubscriptionRequest::UserEvents {
800            user: (*parts.get(1).context("Missing user")?).to_string(),
801        }),
802        Some(&"userFills") => Ok(SubscriptionRequest::UserFills {
803            user: (*parts.get(1).context("Missing user")?).to_string(),
804            aggregate_by_time: None,
805        }),
806        Some(&"userFundings") => Ok(SubscriptionRequest::UserFundings {
807            user: (*parts.get(1).context("Missing user")?).to_string(),
808        }),
809        Some(&"userNonFundingLedgerUpdates") => {
810            Ok(SubscriptionRequest::UserNonFundingLedgerUpdates {
811                user: (*parts.get(1).context("Missing user")?).to_string(),
812            })
813        }
814        Some(&"activeAssetCtx") => Ok(SubscriptionRequest::ActiveAssetCtx {
815            coin: Ustr::from(parts.get(1).context("Missing coin")?),
816        }),
817        Some(&"activeSpotAssetCtx") => Ok(SubscriptionRequest::ActiveSpotAssetCtx {
818            coin: Ustr::from(parts.get(1).context("Missing coin")?),
819        }),
820        Some(&"activeAssetData") => Ok(SubscriptionRequest::ActiveAssetData {
821            user: (*parts.get(1).context("Missing user")?).to_string(),
822            coin: (*parts.get(2).context("Missing coin")?).to_string(),
823        }),
824        Some(&"userTwapSliceFills") => Ok(SubscriptionRequest::UserTwapSliceFills {
825            user: (*parts.get(1).context("Missing user")?).to_string(),
826        }),
827        Some(&"userTwapHistory") => Ok(SubscriptionRequest::UserTwapHistory {
828            user: (*parts.get(1).context("Missing user")?).to_string(),
829        }),
830        Some(&"bbo") => Ok(SubscriptionRequest::Bbo {
831            coin: Ustr::from(parts.get(1).context("Missing coin")?),
832        }),
833        Some(channel) => anyhow::bail!("Unknown subscription channel: {channel}"),
834        None => anyhow::bail!("Empty topic string"),
835    }
836}
837
838#[cfg(test)]
839mod tests {
840    use rstest::rstest;
841
842    use super::*;
843    use crate::common::enums::HyperliquidBarInterval;
844
845    /// Generates a unique topic key for a subscription request.
846    fn subscription_topic(sub: &SubscriptionRequest) -> String {
847        match sub {
848            SubscriptionRequest::AllMids { dex } => {
849                if let Some(dex) = dex {
850                    format!("allMids:{dex}")
851                } else {
852                    "allMids".to_string()
853                }
854            }
855            SubscriptionRequest::Notification { user } => format!("notification:{user}"),
856            SubscriptionRequest::WebData2 { user } => format!("webData2:{user}"),
857            SubscriptionRequest::Candle { coin, interval } => {
858                format!("candle:{coin}:{}", interval.as_str())
859            }
860            SubscriptionRequest::L2Book { coin, .. } => format!("l2Book:{coin}"),
861            SubscriptionRequest::Trades { coin } => format!("trades:{coin}"),
862            SubscriptionRequest::OrderUpdates { user } => format!("orderUpdates:{user}"),
863            SubscriptionRequest::UserEvents { user } => format!("userEvents:{user}"),
864            SubscriptionRequest::UserFills { user, .. } => format!("userFills:{user}"),
865            SubscriptionRequest::UserFundings { user } => format!("userFundings:{user}"),
866            SubscriptionRequest::UserNonFundingLedgerUpdates { user } => {
867                format!("userNonFundingLedgerUpdates:{user}")
868            }
869            SubscriptionRequest::ActiveAssetCtx { coin } => format!("activeAssetCtx:{coin}"),
870            SubscriptionRequest::ActiveSpotAssetCtx { coin } => {
871                format!("activeSpotAssetCtx:{coin}")
872            }
873            SubscriptionRequest::ActiveAssetData { user, coin } => {
874                format!("activeAssetData:{user}:{coin}")
875            }
876            SubscriptionRequest::UserTwapSliceFills { user } => {
877                format!("userTwapSliceFills:{user}")
878            }
879            SubscriptionRequest::UserTwapHistory { user } => format!("userTwapHistory:{user}"),
880            SubscriptionRequest::Bbo { coin } => format!("bbo:{coin}"),
881        }
882    }
883
884    #[rstest]
885    #[case(SubscriptionRequest::Trades { coin: "BTC".into() }, "trades:BTC")]
886    #[case(SubscriptionRequest::Bbo { coin: "BTC".into() }, "bbo:BTC")]
887    #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() }, "orderUpdates:0x123")]
888    #[case(SubscriptionRequest::UserEvents { user: "0xabc".to_string() }, "userEvents:0xabc")]
889    fn test_subscription_topic_generation(
890        #[case] subscription: SubscriptionRequest,
891        #[case] expected_topic: &str,
892    ) {
893        assert_eq!(subscription_topic(&subscription), expected_topic);
894    }
895
896    #[rstest]
897    fn test_subscription_topics_unique() {
898        let sub1 = SubscriptionRequest::Trades { coin: "BTC".into() };
899        let sub2 = SubscriptionRequest::Bbo { coin: "BTC".into() };
900
901        let topic1 = subscription_topic(&sub1);
902        let topic2 = subscription_topic(&sub2);
903
904        assert_ne!(topic1, topic2);
905    }
906
907    #[rstest]
908    #[case(SubscriptionRequest::Trades { coin: "BTC".into() })]
909    #[case(SubscriptionRequest::Bbo { coin: "ETH".into() })]
910    #[case(SubscriptionRequest::Candle { coin: "SOL".into(), interval: HyperliquidBarInterval::OneHour })]
911    #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() })]
912    fn test_subscription_reconstruction(#[case] subscription: SubscriptionRequest) {
913        let topic = subscription_topic(&subscription);
914        let reconstructed = subscription_from_topic(&topic).expect("Failed to reconstruct");
915        assert_eq!(subscription_topic(&reconstructed), topic);
916    }
917
918    #[rstest]
919    fn test_subscription_topic_candle() {
920        let sub = SubscriptionRequest::Candle {
921            coin: "BTC".into(),
922            interval: HyperliquidBarInterval::OneHour,
923        };
924
925        let topic = subscription_topic(&sub);
926        assert_eq!(topic, "candle:BTC:1h");
927    }
928}