Skip to main content

nautilus_hyperliquid/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    str::FromStr,
18    sync::{
19        Arc,
20        atomic::{AtomicBool, AtomicU8, Ordering},
21    },
22};
23
24use ahash::{AHashMap, AHashSet};
25use anyhow::Context;
26use arc_swap::ArcSwap;
27use dashmap::DashMap;
28use nautilus_common::live::get_runtime;
29use nautilus_model::{
30    data::BarType,
31    identifiers::{AccountId, ClientOrderId, InstrumentId},
32    instruments::{Instrument, InstrumentAny},
33};
34use nautilus_network::{
35    mode::ConnectionMode,
36    websocket::{
37        AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
38    },
39};
40use ustr::Ustr;
41
42use crate::{
43    common::{enums::HyperliquidBarInterval, parse::bar_type_to_interval},
44    websocket::{
45        handler::{FeedHandler, HandlerCommand},
46        messages::{NautilusWsMessage, SubscriptionRequest},
47    },
48};
49
50const HYPERLIQUID_HEARTBEAT_MSG: &str = r#"{"method":"ping"}"#;
51
52/// Represents the different data types available from asset context subscriptions.
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54pub(super) enum AssetContextDataType {
55    MarkPrice,
56    IndexPrice,
57    FundingRate,
58}
59
60/// Hyperliquid WebSocket client following the BitMEX pattern.
61///
62/// Orchestrates WebSocket connection and subscriptions using a command-based architecture,
63/// where the inner FeedHandler owns the WebSocketClient and handles all I/O.
64#[derive(Debug)]
65#[cfg_attr(
66    feature = "python",
67    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.hyperliquid")
68)]
69pub struct HyperliquidWebSocketClient {
70    url: String,
71    connection_mode: Arc<ArcSwap<AtomicU8>>,
72    signal: Arc<AtomicBool>,
73    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
74    out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
75    auth_tracker: AuthTracker,
76    subscriptions: SubscriptionState,
77    instruments: Arc<DashMap<Ustr, InstrumentAny>>,
78    bar_types: Arc<DashMap<String, BarType>>,
79    asset_context_subs: Arc<DashMap<Ustr, AHashSet<AssetContextDataType>>>,
80    cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
81    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
82    account_id: Option<AccountId>,
83}
84
85impl Clone for HyperliquidWebSocketClient {
86    fn clone(&self) -> Self {
87        Self {
88            url: self.url.clone(),
89            connection_mode: Arc::clone(&self.connection_mode),
90            signal: Arc::clone(&self.signal),
91            cmd_tx: Arc::clone(&self.cmd_tx),
92            out_rx: None,
93            auth_tracker: self.auth_tracker.clone(),
94            subscriptions: self.subscriptions.clone(),
95            instruments: Arc::clone(&self.instruments),
96            bar_types: Arc::clone(&self.bar_types),
97            asset_context_subs: Arc::clone(&self.asset_context_subs),
98            cloid_cache: Arc::clone(&self.cloid_cache),
99            task_handle: None,
100            account_id: self.account_id,
101        }
102    }
103}
104
105impl HyperliquidWebSocketClient {
106    /// Creates a new Hyperliquid WebSocket client without connecting.
107    ///
108    /// If `url` is `None`, the appropriate URL will be determined based on the `testnet` flag:
109    /// - `testnet=false`: `wss://api.hyperliquid.xyz/ws`
110    /// - `testnet=true`: `wss://api.hyperliquid-testnet.xyz/ws`
111    ///
112    /// The connection will be established when `connect()` is called.
113    pub fn new(url: Option<String>, testnet: bool, account_id: Option<AccountId>) -> Self {
114        let url = url.unwrap_or_else(|| {
115            if testnet {
116                "wss://api.hyperliquid-testnet.xyz/ws".to_string()
117            } else {
118                "wss://api.hyperliquid.xyz/ws".to_string()
119            }
120        });
121        let connection_mode = Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
122            ConnectionMode::Closed as u8,
123        ))));
124        Self {
125            url,
126            connection_mode,
127            signal: Arc::new(AtomicBool::new(false)),
128            auth_tracker: AuthTracker::new(),
129            subscriptions: SubscriptionState::new(':'),
130            instruments: Arc::new(DashMap::new()),
131            bar_types: Arc::new(DashMap::new()),
132            asset_context_subs: Arc::new(DashMap::new()),
133            cloid_cache: Arc::new(DashMap::new()),
134            cmd_tx: {
135                // Placeholder channel until connect() creates the real handler and replays queued instruments
136                let (tx, _) = tokio::sync::mpsc::unbounded_channel();
137                Arc::new(tokio::sync::RwLock::new(tx))
138            },
139            out_rx: None,
140            task_handle: None,
141            account_id,
142        }
143    }
144
145    /// Establishes WebSocket connection and spawns the message handler.
146    pub async fn connect(&mut self) -> anyhow::Result<()> {
147        if self.is_active() {
148            log::warn!("WebSocket already connected");
149            return Ok(());
150        }
151        let (message_handler, raw_rx) = channel_message_handler();
152        let cfg = WebSocketConfig {
153            url: self.url.clone(),
154            headers: vec![],
155            heartbeat: Some(30),
156            heartbeat_msg: Some(HYPERLIQUID_HEARTBEAT_MSG.to_string()),
157            reconnect_timeout_ms: Some(15_000),
158            reconnect_delay_initial_ms: Some(250),
159            reconnect_delay_max_ms: Some(5_000),
160            reconnect_backoff_factor: Some(2.0),
161            reconnect_jitter_ms: Some(200),
162            reconnect_max_attempts: None,
163        };
164        let client =
165            WebSocketClient::connect(cfg, Some(message_handler), None, None, vec![], None).await?;
166
167        // Create channels for handler communication
168        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
169        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
170
171        // Update cmd_tx before connection_mode to avoid race where is_active() returns
172        // true but subscriptions still go to the old placeholder channel
173        *self.cmd_tx.write().await = cmd_tx.clone();
174        self.out_rx = Some(out_rx);
175
176        self.connection_mode.store(client.connection_mode_atomic());
177        log::info!("Hyperliquid WebSocket connected: {}", self.url);
178
179        // Send SetClient command immediately
180        if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
181            anyhow::bail!("Failed to send SetClient command: {e}");
182        }
183
184        // Initialize handler with existing instruments
185        let instruments_vec: Vec<InstrumentAny> = self
186            .instruments
187            .iter()
188            .map(|entry| entry.value().clone())
189            .collect();
190        if !instruments_vec.is_empty()
191            && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments_vec))
192        {
193            log::error!("Failed to send InitializeInstruments: {e}");
194        }
195
196        // Spawn handler task
197        let signal = Arc::clone(&self.signal);
198        let account_id = self.account_id;
199        let subscriptions = self.subscriptions.clone();
200        let cmd_tx_for_reconnect = cmd_tx.clone();
201        let cloid_cache = Arc::clone(&self.cloid_cache);
202
203        let stream_handle = get_runtime().spawn(async move {
204            let mut handler = FeedHandler::new(
205                signal,
206                cmd_rx,
207                raw_rx,
208                out_tx,
209                account_id,
210                subscriptions.clone(),
211                cloid_cache,
212            );
213
214            let resubscribe_all = || {
215                let topics = subscriptions.all_topics();
216                if topics.is_empty() {
217                    log::debug!("No active subscriptions to restore after reconnection");
218                    return;
219                }
220
221                log::info!(
222                    "Resubscribing to {} active subscriptions after reconnection",
223                    topics.len()
224                );
225                for topic in topics {
226                    match subscription_from_topic(&topic) {
227                        Ok(subscription) => {
228                            if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe {
229                                subscriptions: vec![subscription],
230                            }) {
231                                log::error!("Failed to send resubscribe command: {e}");
232                            }
233                        }
234                        Err(e) => {
235                            log::error!(
236                                "Failed to reconstruct subscription from topic: topic={topic}, {e}"
237                            );
238                        }
239                    }
240                }
241            };
242            loop {
243                match handler.next().await {
244                    Some(NautilusWsMessage::Reconnected) => {
245                        log::info!("WebSocket reconnected");
246                        resubscribe_all();
247                        continue;
248                    }
249                    Some(msg) => {
250                        if handler.send(msg).is_err() {
251                            log::error!("Failed to send message (receiver dropped)");
252                            break;
253                        }
254                    }
255                    None => {
256                        if handler.is_stopped() {
257                            log::debug!("Stop signal received, ending message processing");
258                            break;
259                        }
260                        log::warn!("WebSocket stream ended unexpectedly");
261                        break;
262                    }
263                }
264            }
265            log::debug!("Handler task completed");
266        });
267        self.task_handle = Some(Arc::new(stream_handle));
268        Ok(())
269    }
270
271    /// Disconnects the WebSocket connection.
272    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
273        log::info!("Disconnecting Hyperliquid WebSocket");
274        self.signal.store(true, Ordering::Relaxed);
275        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
276            log::debug!(
277                "Failed to send disconnect command (handler may already be shut down): {e}"
278            );
279        }
280        if let Some(task_handle) = self.task_handle.take() {
281            match Arc::try_unwrap(task_handle) {
282                Ok(handle) => {
283                    log::debug!("Waiting for task handle to complete");
284                    let abort_handle = handle.abort_handle();
285                    tokio::select! {
286                        result = handle => {
287                            match result {
288                                Ok(()) => log::debug!("Task handle completed successfully"),
289                                Err(e) if e.is_cancelled() => {
290                                    log::debug!("Task was cancelled");
291                                }
292                                Err(e) => log::error!("Task handle encountered an error: {e:?}"),
293                            }
294                        }
295                        () = tokio::time::sleep(tokio::time::Duration::from_secs(2)) => {
296                            log::warn!("Timeout waiting for task handle, aborting task");
297                            abort_handle.abort();
298                        }
299                    }
300                }
301                Err(arc_handle) => {
302                    log::debug!(
303                        "Cannot take ownership of task handle - other references exist, aborting task"
304                    );
305                    arc_handle.abort();
306                }
307            }
308        } else {
309            log::debug!("No task handle to await");
310        }
311        log::debug!("Disconnected");
312        Ok(())
313    }
314
315    /// Returns true if the WebSocket is actively connected.
316    pub fn is_active(&self) -> bool {
317        let mode = self.connection_mode.load();
318        mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8
319    }
320
321    /// Returns the URL of this WebSocket client.
322    pub fn url(&self) -> &str {
323        &self.url
324    }
325
326    /// Caches multiple instruments.
327    ///
328    /// Clears the existing cache first, then adds all provided instruments.
329    /// Instruments are keyed by their raw_symbol which is unique per instrument:
330    /// - Perps use base currency (e.g., "BTC")
331    /// - Spot uses @{pair_index} format (e.g., "@107") or slash format for PURR
332    pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
333        self.instruments.clear();
334        for inst in instruments {
335            let coin = inst.raw_symbol().inner();
336            self.instruments.insert(coin, inst);
337        }
338        log::info!(
339            "Hyperliquid instrument cache initialized with {} instruments",
340            self.instruments.len()
341        );
342    }
343
344    /// Caches a single instrument.
345    ///
346    /// Any existing instrument with the same raw_symbol will be replaced.
347    pub fn cache_instrument(&self, instrument: InstrumentAny) {
348        let coin = instrument.raw_symbol().inner();
349        self.instruments.insert(coin, instrument.clone());
350
351        // Before connect() the handler isn't running; this send will fail and that's expected
352        // because connect() replays the instruments via InitializeInstruments
353        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
354            let _ = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument));
355        }
356    }
357
358    /// Caches spot fill coin mappings for instrument lookup.
359    ///
360    /// Hyperliquid WebSocket fills for spot use `@{pair_index}` format (e.g., `@107`),
361    /// while instruments are identified by full symbols (e.g., `HYPE-USDC-SPOT`).
362    /// This mapping allows the handler to look up instruments from spot fills.
363    pub fn cache_spot_fill_coins(&self, mapping: AHashMap<Ustr, Ustr>) {
364        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
365            let _ = cmd_tx.send(HandlerCommand::CacheSpotFillCoins(mapping));
366        }
367    }
368
369    /// Caches a cloid (hex hash) to client_order_id mapping for order/fill resolution.
370    ///
371    /// The cloid is a keccak256 hash of the client_order_id that Hyperliquid uses internally.
372    /// This mapping allows WebSocket order status and fill reports to be resolved back to
373    /// the original client_order_id.
374    ///
375    /// This writes directly to a shared cache that the handler reads from, avoiding any
376    /// race conditions between caching and WebSocket message processing.
377    pub fn cache_cloid_mapping(&self, cloid: Ustr, client_order_id: ClientOrderId) {
378        log::debug!("Caching cloid mapping: {cloid} -> {client_order_id}");
379        self.cloid_cache.insert(cloid, client_order_id);
380    }
381
382    /// Removes a cloid mapping from the cache.
383    ///
384    /// Should be called when an order reaches a terminal state (filled, canceled, expired)
385    /// to prevent unbounded memory growth in long-running sessions.
386    pub fn remove_cloid_mapping(&self, cloid: &Ustr) {
387        if self.cloid_cache.remove(cloid).is_some() {
388            log::debug!("Removed cloid mapping: {cloid}");
389        }
390    }
391
392    /// Clears all cloid mappings from the cache.
393    ///
394    /// Useful for cleanup during reconnection or shutdown.
395    pub fn clear_cloid_cache(&self) {
396        let count = self.cloid_cache.len();
397        self.cloid_cache.clear();
398        if count > 0 {
399            log::debug!("Cleared {count} cloid mappings from cache");
400        }
401    }
402
403    /// Returns the number of cloid mappings in the cache.
404    #[must_use]
405    pub fn cloid_cache_len(&self) -> usize {
406        self.cloid_cache.len()
407    }
408
409    /// Looks up a client_order_id by its cloid hash.
410    ///
411    /// Returns `Some(ClientOrderId)` if the mapping exists, `None` otherwise.
412    #[must_use]
413    pub fn get_cloid_mapping(&self, cloid: &Ustr) -> Option<ClientOrderId> {
414        self.cloid_cache.get(cloid).map(|entry| *entry.value())
415    }
416
417    /// Gets an instrument from the cache by ID.
418    ///
419    /// Searches the cache for a matching instrument ID.
420    pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
421        self.instruments
422            .iter()
423            .find(|entry| entry.value().id() == *id)
424            .map(|entry| entry.value().clone())
425    }
426
427    /// Gets an instrument from the cache by raw_symbol (coin).
428    pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
429        self.instruments.get(symbol).map(|e| e.value().clone())
430    }
431
432    /// Returns the count of confirmed subscriptions.
433    pub fn subscription_count(&self) -> usize {
434        self.subscriptions.len()
435    }
436
437    /// Gets a bar type from the cache by coin and interval.
438    ///
439    /// This looks up the subscription key created when subscribing to bars.
440    pub fn get_bar_type(&self, coin: &str, interval: &str) -> Option<BarType> {
441        // Use canonical key format matching subscribe_bars
442        let key = format!("candle:{coin}:{interval}");
443        self.bar_types.get(&key).map(|entry| *entry.value())
444    }
445
446    /// Subscribe to L2 order book for an instrument.
447    pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
448        let instrument = self
449            .get_instrument(&instrument_id)
450            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
451        let coin = instrument.raw_symbol().inner();
452
453        let cmd_tx = self.cmd_tx.read().await;
454
455        // Update the handler's coin→instrument mapping for this subscription
456        cmd_tx
457            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
458            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
459
460        let subscription = SubscriptionRequest::L2Book {
461            coin,
462            mantissa: None,
463            n_sig_figs: None,
464        };
465
466        cmd_tx
467            .send(HandlerCommand::Subscribe {
468                subscriptions: vec![subscription],
469            })
470            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
471        Ok(())
472    }
473
474    /// Subscribe to best bid/offer (BBO) quotes for an instrument.
475    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
476        let instrument = self
477            .get_instrument(&instrument_id)
478            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
479        let coin = instrument.raw_symbol().inner();
480
481        let cmd_tx = self.cmd_tx.read().await;
482
483        // Update the handler's coin→instrument mapping for this subscription
484        cmd_tx
485            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
486            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
487
488        let subscription = SubscriptionRequest::Bbo { coin };
489
490        cmd_tx
491            .send(HandlerCommand::Subscribe {
492                subscriptions: vec![subscription],
493            })
494            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
495        Ok(())
496    }
497
498    /// Subscribe to trades for an instrument.
499    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
500        let instrument = self
501            .get_instrument(&instrument_id)
502            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
503        let coin = instrument.raw_symbol().inner();
504
505        let cmd_tx = self.cmd_tx.read().await;
506
507        // Update the handler's coin→instrument mapping for this subscription
508        cmd_tx
509            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
510            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
511
512        let subscription = SubscriptionRequest::Trades { coin };
513
514        cmd_tx
515            .send(HandlerCommand::Subscribe {
516                subscriptions: vec![subscription],
517            })
518            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
519        Ok(())
520    }
521
522    /// Subscribe to mark price updates for an instrument.
523    pub async fn subscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
524        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
525            .await
526    }
527
528    /// Subscribe to index/oracle price updates for an instrument.
529    pub async fn subscribe_index_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
530        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
531            .await
532    }
533
534    /// Subscribe to candle/bar data for a specific coin and interval.
535    pub async fn subscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
536        // Get the instrument to extract the raw_symbol (Hyperliquid ticker)
537        let instrument = self
538            .get_instrument(&bar_type.instrument_id())
539            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
540        let coin = instrument.raw_symbol().inner();
541        let interval = bar_type_to_interval(&bar_type)?;
542        let subscription = SubscriptionRequest::Candle { coin, interval };
543
544        // Cache the bar type for parsing using canonical key
545        let key = format!("candle:{coin}:{interval}");
546        self.bar_types.insert(key.clone(), bar_type);
547
548        let cmd_tx = self.cmd_tx.read().await;
549
550        cmd_tx
551            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
552            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
553
554        cmd_tx
555            .send(HandlerCommand::AddBarType { key, bar_type })
556            .map_err(|e| anyhow::anyhow!("Failed to send AddBarType command: {e}"))?;
557
558        cmd_tx
559            .send(HandlerCommand::Subscribe {
560                subscriptions: vec![subscription],
561            })
562            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
563        Ok(())
564    }
565
566    /// Subscribe to funding rate updates for an instrument.
567    pub async fn subscribe_funding_rates(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
568        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
569            .await
570    }
571
572    /// Subscribe to order updates for a specific user address.
573    pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
574        let subscription = SubscriptionRequest::OrderUpdates {
575            user: user.to_string(),
576        };
577        self.cmd_tx
578            .read()
579            .await
580            .send(HandlerCommand::Subscribe {
581                subscriptions: vec![subscription],
582            })
583            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
584        Ok(())
585    }
586
587    /// Subscribe to user events (fills, funding, liquidations) for a specific user address.
588    pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
589        let subscription = SubscriptionRequest::UserEvents {
590            user: user.to_string(),
591        };
592        self.cmd_tx
593            .read()
594            .await
595            .send(HandlerCommand::Subscribe {
596                subscriptions: vec![subscription],
597            })
598            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
599        Ok(())
600    }
601
602    /// Subscribe to user fills for a specific user address.
603    ///
604    /// Note: This channel is redundant with `userEvents` which already includes fills.
605    /// Prefer using `subscribe_user_events` or `subscribe_all_user_channels` instead.
606    pub async fn subscribe_user_fills(&self, user: &str) -> anyhow::Result<()> {
607        let subscription = SubscriptionRequest::UserFills {
608            user: user.to_string(),
609            aggregate_by_time: None,
610        };
611        self.cmd_tx
612            .read()
613            .await
614            .send(HandlerCommand::Subscribe {
615                subscriptions: vec![subscription],
616            })
617            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
618        Ok(())
619    }
620
621    /// Subscribe to all user channels (order updates + user events) for convenience.
622    ///
623    /// Note: `userEvents` already includes fills, so we don't subscribe to `userFills`
624    /// separately to avoid duplicate fill messages.
625    pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
626        self.subscribe_order_updates(user).await?;
627        self.subscribe_user_events(user).await?;
628        Ok(())
629    }
630
631    /// Unsubscribe from L2 order book for an instrument.
632    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
633        let instrument = self
634            .get_instrument(&instrument_id)
635            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
636        let coin = instrument.raw_symbol().inner();
637
638        let subscription = SubscriptionRequest::L2Book {
639            coin,
640            mantissa: None,
641            n_sig_figs: None,
642        };
643
644        self.cmd_tx
645            .read()
646            .await
647            .send(HandlerCommand::Unsubscribe {
648                subscriptions: vec![subscription],
649            })
650            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
651        Ok(())
652    }
653
654    /// Unsubscribe from quote ticks for an instrument.
655    pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
656        let instrument = self
657            .get_instrument(&instrument_id)
658            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
659        let coin = instrument.raw_symbol().inner();
660
661        let subscription = SubscriptionRequest::Bbo { coin };
662
663        self.cmd_tx
664            .read()
665            .await
666            .send(HandlerCommand::Unsubscribe {
667                subscriptions: vec![subscription],
668            })
669            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
670        Ok(())
671    }
672
673    /// Unsubscribe from trades for an instrument.
674    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
675        let instrument = self
676            .get_instrument(&instrument_id)
677            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
678        let coin = instrument.raw_symbol().inner();
679
680        let subscription = SubscriptionRequest::Trades { coin };
681
682        self.cmd_tx
683            .read()
684            .await
685            .send(HandlerCommand::Unsubscribe {
686                subscriptions: vec![subscription],
687            })
688            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
689        Ok(())
690    }
691
692    /// Unsubscribe from mark price updates for an instrument.
693    pub async fn unsubscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
694        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
695            .await
696    }
697
698    /// Unsubscribe from index/oracle price updates for an instrument.
699    pub async fn unsubscribe_index_prices(
700        &self,
701        instrument_id: InstrumentId,
702    ) -> anyhow::Result<()> {
703        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
704            .await
705    }
706
707    /// Unsubscribe from candle/bar data.
708    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
709        // Get the instrument to extract the raw_symbol (Hyperliquid ticker)
710        let instrument = self
711            .get_instrument(&bar_type.instrument_id())
712            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
713        let coin = instrument.raw_symbol().inner();
714        let interval = bar_type_to_interval(&bar_type)?;
715        let subscription = SubscriptionRequest::Candle { coin, interval };
716
717        let key = format!("candle:{coin}:{interval}");
718        self.bar_types.remove(&key);
719
720        let cmd_tx = self.cmd_tx.read().await;
721
722        cmd_tx
723            .send(HandlerCommand::RemoveBarType { key })
724            .map_err(|e| anyhow::anyhow!("Failed to send RemoveBarType command: {e}"))?;
725
726        cmd_tx
727            .send(HandlerCommand::Unsubscribe {
728                subscriptions: vec![subscription],
729            })
730            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
731        Ok(())
732    }
733
734    /// Unsubscribe from funding rate updates for an instrument.
735    pub async fn unsubscribe_funding_rates(
736        &self,
737        instrument_id: InstrumentId,
738    ) -> anyhow::Result<()> {
739        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
740            .await
741    }
742
743    async fn subscribe_asset_context_data(
744        &self,
745        instrument_id: InstrumentId,
746        data_type: AssetContextDataType,
747    ) -> anyhow::Result<()> {
748        let instrument = self
749            .get_instrument(&instrument_id)
750            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
751        let coin = instrument.raw_symbol().inner();
752
753        let mut entry = self.asset_context_subs.entry(coin).or_default();
754        let is_first_subscription = entry.is_empty();
755        entry.insert(data_type);
756        let data_types = entry.clone();
757        drop(entry);
758
759        let cmd_tx = self.cmd_tx.read().await;
760
761        cmd_tx
762            .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
763            .map_err(|e| anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}"))?;
764
765        if is_first_subscription {
766            log::debug!(
767                "First asset context subscription for coin '{coin}', subscribing to ActiveAssetCtx"
768            );
769            let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
770
771            cmd_tx
772                .send(HandlerCommand::UpdateInstrument(instrument.clone()))
773                .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
774
775            cmd_tx
776                .send(HandlerCommand::Subscribe {
777                    subscriptions: vec![subscription],
778                })
779                .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
780        } else {
781            log::debug!(
782                "Already subscribed to ActiveAssetCtx for coin '{coin}', adding {data_type:?} to tracked types"
783            );
784        }
785
786        Ok(())
787    }
788
789    async fn unsubscribe_asset_context_data(
790        &self,
791        instrument_id: InstrumentId,
792        data_type: AssetContextDataType,
793    ) -> anyhow::Result<()> {
794        let instrument = self
795            .get_instrument(&instrument_id)
796            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
797        let coin = instrument.raw_symbol().inner();
798
799        if let Some(mut entry) = self.asset_context_subs.get_mut(&coin) {
800            entry.remove(&data_type);
801            let should_unsubscribe = entry.is_empty();
802            let data_types = entry.clone();
803            drop(entry);
804
805            let cmd_tx = self.cmd_tx.read().await;
806
807            if should_unsubscribe {
808                self.asset_context_subs.remove(&coin);
809
810                log::debug!(
811                    "Last asset context subscription removed for coin '{coin}', unsubscribing from ActiveAssetCtx"
812                );
813                let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
814
815                cmd_tx
816                    .send(HandlerCommand::UpdateAssetContextSubs {
817                        coin,
818                        data_types: AHashSet::new(),
819                    })
820                    .map_err(|e| {
821                        anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
822                    })?;
823
824                cmd_tx
825                    .send(HandlerCommand::Unsubscribe {
826                        subscriptions: vec![subscription],
827                    })
828                    .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
829            } else {
830                log::debug!(
831                    "Removed {data_type:?} from tracked types for coin '{coin}', but keeping ActiveAssetCtx subscription"
832                );
833
834                cmd_tx
835                    .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
836                    .map_err(|e| {
837                        anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
838                    })?;
839            }
840        }
841
842        Ok(())
843    }
844
845    /// Receives the next message from the WebSocket handler.
846    ///
847    /// Returns `None` if the handler has disconnected or the receiver was already taken.
848    pub async fn next_event(&mut self) -> Option<NautilusWsMessage> {
849        if let Some(ref mut rx) = self.out_rx {
850            rx.recv().await
851        } else {
852            None
853        }
854    }
855}
856
857/// Reconstructs a subscription request from a topic string.
858fn subscription_from_topic(topic: &str) -> anyhow::Result<SubscriptionRequest> {
859    let parts: Vec<&str> = topic.split(':').collect();
860
861    match parts.first() {
862        Some(&"allMids") => {
863            let dex = parts.get(1).map(|s| (*s).to_string());
864            Ok(SubscriptionRequest::AllMids { dex })
865        }
866        Some(&"notification") => Ok(SubscriptionRequest::Notification {
867            user: (*parts.get(1).context("Missing user")?).to_string(),
868        }),
869        Some(&"webData2") => Ok(SubscriptionRequest::WebData2 {
870            user: (*parts.get(1).context("Missing user")?).to_string(),
871        }),
872        Some(&"candle") => {
873            let coin = Ustr::from(parts.get(1).context("Missing coin")?);
874            let interval_str = parts.get(2).context("Missing interval")?;
875            let interval = HyperliquidBarInterval::from_str(interval_str)?;
876            Ok(SubscriptionRequest::Candle { coin, interval })
877        }
878        Some(&"l2Book") => Ok(SubscriptionRequest::L2Book {
879            coin: Ustr::from(parts.get(1).context("Missing coin")?),
880            mantissa: None,
881            n_sig_figs: None,
882        }),
883        Some(&"trades") => Ok(SubscriptionRequest::Trades {
884            coin: Ustr::from(parts.get(1).context("Missing coin")?),
885        }),
886        Some(&"orderUpdates") => Ok(SubscriptionRequest::OrderUpdates {
887            user: (*parts.get(1).context("Missing user")?).to_string(),
888        }),
889        Some(&"userEvents") => Ok(SubscriptionRequest::UserEvents {
890            user: (*parts.get(1).context("Missing user")?).to_string(),
891        }),
892        Some(&"userFills") => Ok(SubscriptionRequest::UserFills {
893            user: (*parts.get(1).context("Missing user")?).to_string(),
894            aggregate_by_time: None,
895        }),
896        Some(&"userFundings") => Ok(SubscriptionRequest::UserFundings {
897            user: (*parts.get(1).context("Missing user")?).to_string(),
898        }),
899        Some(&"userNonFundingLedgerUpdates") => {
900            Ok(SubscriptionRequest::UserNonFundingLedgerUpdates {
901                user: (*parts.get(1).context("Missing user")?).to_string(),
902            })
903        }
904        Some(&"activeAssetCtx") => Ok(SubscriptionRequest::ActiveAssetCtx {
905            coin: Ustr::from(parts.get(1).context("Missing coin")?),
906        }),
907        Some(&"activeSpotAssetCtx") => Ok(SubscriptionRequest::ActiveSpotAssetCtx {
908            coin: Ustr::from(parts.get(1).context("Missing coin")?),
909        }),
910        Some(&"activeAssetData") => Ok(SubscriptionRequest::ActiveAssetData {
911            user: (*parts.get(1).context("Missing user")?).to_string(),
912            coin: (*parts.get(2).context("Missing coin")?).to_string(),
913        }),
914        Some(&"userTwapSliceFills") => Ok(SubscriptionRequest::UserTwapSliceFills {
915            user: (*parts.get(1).context("Missing user")?).to_string(),
916        }),
917        Some(&"userTwapHistory") => Ok(SubscriptionRequest::UserTwapHistory {
918            user: (*parts.get(1).context("Missing user")?).to_string(),
919        }),
920        Some(&"bbo") => Ok(SubscriptionRequest::Bbo {
921            coin: Ustr::from(parts.get(1).context("Missing coin")?),
922        }),
923        Some(channel) => anyhow::bail!("Unknown subscription channel: {channel}"),
924        None => anyhow::bail!("Empty topic string"),
925    }
926}
927
928#[cfg(test)]
929mod tests {
930    use rstest::rstest;
931
932    use super::*;
933    use crate::common::enums::HyperliquidBarInterval;
934
935    /// Generates a unique topic key for a subscription request.
936    fn subscription_topic(sub: &SubscriptionRequest) -> String {
937        match sub {
938            SubscriptionRequest::AllMids { dex } => {
939                if let Some(dex) = dex {
940                    format!("allMids:{dex}")
941                } else {
942                    "allMids".to_string()
943                }
944            }
945            SubscriptionRequest::Notification { user } => format!("notification:{user}"),
946            SubscriptionRequest::WebData2 { user } => format!("webData2:{user}"),
947            SubscriptionRequest::Candle { coin, interval } => {
948                format!("candle:{coin}:{}", interval.as_str())
949            }
950            SubscriptionRequest::L2Book { coin, .. } => format!("l2Book:{coin}"),
951            SubscriptionRequest::Trades { coin } => format!("trades:{coin}"),
952            SubscriptionRequest::OrderUpdates { user } => format!("orderUpdates:{user}"),
953            SubscriptionRequest::UserEvents { user } => format!("userEvents:{user}"),
954            SubscriptionRequest::UserFills { user, .. } => format!("userFills:{user}"),
955            SubscriptionRequest::UserFundings { user } => format!("userFundings:{user}"),
956            SubscriptionRequest::UserNonFundingLedgerUpdates { user } => {
957                format!("userNonFundingLedgerUpdates:{user}")
958            }
959            SubscriptionRequest::ActiveAssetCtx { coin } => format!("activeAssetCtx:{coin}"),
960            SubscriptionRequest::ActiveSpotAssetCtx { coin } => {
961                format!("activeSpotAssetCtx:{coin}")
962            }
963            SubscriptionRequest::ActiveAssetData { user, coin } => {
964                format!("activeAssetData:{user}:{coin}")
965            }
966            SubscriptionRequest::UserTwapSliceFills { user } => {
967                format!("userTwapSliceFills:{user}")
968            }
969            SubscriptionRequest::UserTwapHistory { user } => format!("userTwapHistory:{user}"),
970            SubscriptionRequest::Bbo { coin } => format!("bbo:{coin}"),
971        }
972    }
973
974    #[rstest]
975    #[case(SubscriptionRequest::Trades { coin: "BTC".into() }, "trades:BTC")]
976    #[case(SubscriptionRequest::Bbo { coin: "BTC".into() }, "bbo:BTC")]
977    #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() }, "orderUpdates:0x123")]
978    #[case(SubscriptionRequest::UserEvents { user: "0xabc".to_string() }, "userEvents:0xabc")]
979    fn test_subscription_topic_generation(
980        #[case] subscription: SubscriptionRequest,
981        #[case] expected_topic: &str,
982    ) {
983        assert_eq!(subscription_topic(&subscription), expected_topic);
984    }
985
986    #[rstest]
987    fn test_subscription_topics_unique() {
988        let sub1 = SubscriptionRequest::Trades { coin: "BTC".into() };
989        let sub2 = SubscriptionRequest::Bbo { coin: "BTC".into() };
990
991        let topic1 = subscription_topic(&sub1);
992        let topic2 = subscription_topic(&sub2);
993
994        assert_ne!(topic1, topic2);
995    }
996
997    #[rstest]
998    #[case(SubscriptionRequest::Trades { coin: "BTC".into() })]
999    #[case(SubscriptionRequest::Bbo { coin: "ETH".into() })]
1000    #[case(SubscriptionRequest::Candle { coin: "SOL".into(), interval: HyperliquidBarInterval::OneHour })]
1001    #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() })]
1002    fn test_subscription_reconstruction(#[case] subscription: SubscriptionRequest) {
1003        let topic = subscription_topic(&subscription);
1004        let reconstructed = subscription_from_topic(&topic).expect("Failed to reconstruct");
1005        assert_eq!(subscription_topic(&reconstructed), topic);
1006    }
1007
1008    #[rstest]
1009    fn test_subscription_topic_candle() {
1010        let sub = SubscriptionRequest::Candle {
1011            coin: "BTC".into(),
1012            interval: HyperliquidBarInterval::OneHour,
1013        };
1014
1015        let topic = subscription_topic(&sub);
1016        assert_eq!(topic, "candle:BTC:1h");
1017    }
1018}