nautilus_hyperliquid/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashSet,
18    str::FromStr,
19    sync::{
20        Arc,
21        atomic::{AtomicBool, AtomicU8, Ordering},
22    },
23};
24
25use anyhow::Context;
26use arc_swap::ArcSwap;
27use dashmap::DashMap;
28use nautilus_model::{
29    data::BarType,
30    identifiers::{AccountId, InstrumentId},
31    instruments::{Instrument, InstrumentAny},
32};
33use nautilus_network::{
34    mode::ConnectionMode,
35    websocket::{
36        AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
37    },
38};
39use ustr::Ustr;
40
41use crate::{
42    common::{HyperliquidProductType, enums::HyperliquidBarInterval, parse::bar_type_to_interval},
43    websocket::{
44        handler::{FeedHandler, HandlerCommand},
45        messages::{NautilusWsMessage, SubscriptionRequest},
46    },
47};
48
49const HYPERLIQUID_HEARTBEAT_MSG: &str = r#"{"method":"ping"}"#;
50
51/// Represents the different data types available from asset context subscriptions.
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
53pub(super) enum AssetContextDataType {
54    MarkPrice,
55    IndexPrice,
56    FundingRate,
57}
58
59/// Hyperliquid WebSocket client following the BitMEX pattern.
60///
61/// Orchestrates WebSocket connection and subscriptions using a command-based architecture,
62/// where the inner FeedHandler owns the WebSocketClient and handles all I/O.
63#[derive(Debug)]
64#[cfg_attr(
65    feature = "python",
66    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
67)]
68pub struct HyperliquidWebSocketClient {
69    url: String,
70    product_type: HyperliquidProductType,
71    connection_mode: Arc<ArcSwap<AtomicU8>>,
72    signal: Arc<AtomicBool>,
73    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
74    out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
75    auth_tracker: AuthTracker,
76    subscriptions: SubscriptionState,
77    instruments: Arc<DashMap<Ustr, InstrumentAny>>,
78    bar_types: Arc<DashMap<String, BarType>>,
79    asset_context_subs: Arc<DashMap<Ustr, HashSet<AssetContextDataType>>>,
80    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
81    account_id: Option<AccountId>,
82}
83
84impl Clone for HyperliquidWebSocketClient {
85    fn clone(&self) -> Self {
86        Self {
87            url: self.url.clone(),
88            product_type: self.product_type,
89            connection_mode: Arc::clone(&self.connection_mode),
90            signal: Arc::clone(&self.signal),
91            cmd_tx: Arc::clone(&self.cmd_tx),
92            out_rx: None,
93            auth_tracker: self.auth_tracker.clone(),
94            subscriptions: self.subscriptions.clone(),
95            instruments: Arc::clone(&self.instruments),
96            bar_types: Arc::clone(&self.bar_types),
97            asset_context_subs: Arc::clone(&self.asset_context_subs),
98            task_handle: None,
99            account_id: self.account_id,
100        }
101    }
102}
103
104impl HyperliquidWebSocketClient {
105    /// Creates a new Hyperliquid WebSocket client without connecting.
106    ///
107    /// If `url` is `None`, the appropriate URL will be determined based on the `testnet` flag:
108    /// - `testnet=false`: `wss://api.hyperliquid.xyz/ws`
109    /// - `testnet=true`: `wss://api.hyperliquid-testnet.xyz/ws`
110    ///
111    /// The connection will be established when `connect()` is called.
112    pub fn new(
113        url: Option<String>,
114        testnet: bool,
115        product_type: HyperliquidProductType,
116        account_id: Option<AccountId>,
117    ) -> Self {
118        let url = url.unwrap_or_else(|| {
119            if testnet {
120                "wss://api.hyperliquid-testnet.xyz/ws".to_string()
121            } else {
122                "wss://api.hyperliquid.xyz/ws".to_string()
123            }
124        });
125        let connection_mode = Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
126            ConnectionMode::Closed as u8,
127        ))));
128        Self {
129            url,
130            product_type,
131            connection_mode,
132            signal: Arc::new(AtomicBool::new(false)),
133            auth_tracker: AuthTracker::new(),
134            subscriptions: SubscriptionState::new(':'),
135            instruments: Arc::new(DashMap::new()),
136            bar_types: Arc::new(DashMap::new()),
137            asset_context_subs: Arc::new(DashMap::new()),
138            cmd_tx: {
139                // Placeholder channel until connect() creates the real handler and replays queued instruments
140                let (tx, _) = tokio::sync::mpsc::unbounded_channel();
141                Arc::new(tokio::sync::RwLock::new(tx))
142            },
143            out_rx: None,
144            task_handle: None,
145            account_id,
146        }
147    }
148
149    /// Establishes WebSocket connection and spawns the message handler.
150    pub async fn connect(&mut self) -> anyhow::Result<()> {
151        if self.is_active() {
152            tracing::warn!("WebSocket already connected");
153            return Ok(());
154        }
155        let (message_handler, raw_rx) = channel_message_handler();
156        let cfg = WebSocketConfig {
157            url: self.url.clone(),
158            headers: vec![],
159            message_handler: Some(message_handler),
160            heartbeat: Some(30),
161            heartbeat_msg: Some(HYPERLIQUID_HEARTBEAT_MSG.to_string()),
162            ping_handler: None,
163            reconnect_timeout_ms: Some(15_000),
164            reconnect_delay_initial_ms: Some(250),
165            reconnect_delay_max_ms: Some(5_000),
166            reconnect_backoff_factor: Some(2.0),
167            reconnect_jitter_ms: Some(200),
168            reconnect_max_attempts: None,
169        };
170        let client = WebSocketClient::connect(cfg, None, vec![], None).await?;
171
172        // Atomically swap connection state to the client's atomic
173        self.connection_mode.store(client.connection_mode_atomic());
174        tracing::info!("Hyperliquid WebSocket connected: {}", self.url);
175
176        // Create channels for handler communication
177        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
178        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
179
180        // Send SetClient command immediately
181        if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
182            anyhow::bail!("Failed to send SetClient command: {e}");
183        }
184
185        // Initialize handler with existing instruments
186        let instruments_vec: Vec<InstrumentAny> = self
187            .instruments
188            .iter()
189            .map(|entry| entry.value().clone())
190            .collect();
191        if !instruments_vec.is_empty()
192            && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments_vec))
193        {
194            tracing::error!("Failed to send InitializeInstruments: {e}");
195        }
196
197        // Spawn handler task
198        let signal = Arc::clone(&self.signal);
199        let account_id = self.account_id;
200        let subscriptions = self.subscriptions.clone();
201        let cmd_tx_for_reconnect = cmd_tx.clone();
202
203        let stream_handle = tokio::spawn(async move {
204            let mut handler = FeedHandler::new(
205                signal,
206                cmd_rx,
207                raw_rx,
208                out_tx,
209                account_id,
210                subscriptions.clone(),
211            );
212
213            let resubscribe_all = || {
214                let topics = subscriptions.all_topics();
215                if topics.is_empty() {
216                    tracing::debug!("No active subscriptions to restore after reconnection");
217                    return;
218                }
219
220                tracing::info!(
221                    "Resubscribing to {} active subscriptions after reconnection",
222                    topics.len()
223                );
224                for topic in topics {
225                    match subscription_from_topic(&topic) {
226                        Ok(subscription) => {
227                            if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe {
228                                subscriptions: vec![subscription],
229                            }) {
230                                tracing::error!(error = %e, "Failed to send resubscribe command");
231                            }
232                        }
233                        Err(e) => {
234                            tracing::error!(
235                                error = %e,
236                                topic = %topic,
237                                "Failed to reconstruct subscription from topic"
238                            );
239                        }
240                    }
241                }
242            };
243            loop {
244                match handler.next().await {
245                    Some(NautilusWsMessage::Reconnected) => {
246                        tracing::info!("WebSocket reconnected");
247                        resubscribe_all();
248                        continue;
249                    }
250                    Some(msg) => {
251                        if handler.send(msg).is_err() {
252                            tracing::error!("Failed to send message (receiver dropped)");
253                            break;
254                        }
255                    }
256                    None => {
257                        if handler.is_stopped() {
258                            tracing::debug!("Stop signal received, ending message processing");
259                            break;
260                        }
261                        tracing::warn!("WebSocket stream ended unexpectedly");
262                        break;
263                    }
264                }
265            }
266            tracing::debug!("Handler task completed");
267        });
268        self.task_handle = Some(Arc::new(stream_handle));
269        *self.cmd_tx.write().await = cmd_tx;
270        self.out_rx = Some(out_rx);
271        Ok(())
272    }
273
274    /// Disconnects the WebSocket connection.
275    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
276        tracing::info!("Disconnecting Hyperliquid WebSocket");
277        self.signal.store(true, Ordering::Relaxed);
278        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
279            tracing::debug!(
280                "Failed to send disconnect command (handler may already be shut down): {e}"
281            );
282        }
283        if let Some(task_handle) = self.task_handle.take() {
284            match Arc::try_unwrap(task_handle) {
285                Ok(handle) => {
286                    tracing::debug!("Waiting for task handle to complete");
287                    match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
288                        Ok(Ok(())) => tracing::debug!("Task handle completed successfully"),
289                        Ok(Err(e)) => tracing::error!("Task handle encountered an error: {e:?}"),
290                        Err(_) => {
291                            tracing::warn!(
292                                "Timeout waiting for task handle, task may still be running"
293                            );
294                        }
295                    }
296                }
297                Err(arc_handle) => {
298                    tracing::debug!(
299                        "Cannot take ownership of task handle - other references exist, aborting task"
300                    );
301                    arc_handle.abort();
302                }
303            }
304        } else {
305            tracing::debug!("No task handle to await");
306        }
307        tracing::debug!("Disconnected");
308        Ok(())
309    }
310
311    /// Returns true if the WebSocket is actively connected.
312    pub fn is_active(&self) -> bool {
313        let mode = self.connection_mode.load();
314        mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8
315    }
316
317    /// Returns the URL of this WebSocket client.
318    pub fn url(&self) -> &str {
319        &self.url
320    }
321
322    /// Subscribe to order updates for a specific user address.
323    pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
324        let subscription = SubscriptionRequest::OrderUpdates {
325            user: user.to_string(),
326        };
327        self.cmd_tx
328            .read()
329            .await
330            .send(HandlerCommand::Subscribe {
331                subscriptions: vec![subscription],
332            })
333            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
334        Ok(())
335    }
336
337    /// Subscribe to user events (fills, funding, liquidations) for a specific user address.
338    pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
339        let subscription = SubscriptionRequest::UserEvents {
340            user: user.to_string(),
341        };
342        self.cmd_tx
343            .read()
344            .await
345            .send(HandlerCommand::Subscribe {
346                subscriptions: vec![subscription],
347            })
348            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
349        Ok(())
350    }
351
352    /// Subscribe to all user channels (order updates + user events) for convenience.
353    pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
354        self.subscribe_order_updates(user).await?;
355        self.subscribe_user_events(user).await?;
356        Ok(())
357    }
358
359    /// Subscribe to trades for an instrument.
360    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
361        let instrument = self
362            .get_instrument(&instrument_id)
363            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
364        let coin = instrument.raw_symbol().inner();
365
366        let cmd_tx = self.cmd_tx.read().await;
367
368        // Update the handler's coin→instrument mapping for this subscription
369        cmd_tx
370            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
371            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
372
373        let subscription = SubscriptionRequest::Trades { coin };
374
375        cmd_tx
376            .send(HandlerCommand::Subscribe {
377                subscriptions: vec![subscription],
378            })
379            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
380        Ok(())
381    }
382
383    /// Unsubscribe from trades for an instrument.
384    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
385        let instrument = self
386            .get_instrument(&instrument_id)
387            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
388        let coin = instrument.raw_symbol().inner();
389
390        let subscription = SubscriptionRequest::Trades { coin };
391
392        self.cmd_tx
393            .read()
394            .await
395            .send(HandlerCommand::Unsubscribe {
396                subscriptions: vec![subscription],
397            })
398            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
399        Ok(())
400    }
401
402    /// Subscribe to best bid/offer (BBO) quotes for an instrument.
403    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
404        let instrument = self
405            .get_instrument(&instrument_id)
406            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
407        let coin = instrument.raw_symbol().inner();
408
409        let cmd_tx = self.cmd_tx.read().await;
410
411        // Update the handler's coin→instrument mapping for this subscription
412        cmd_tx
413            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
414            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
415
416        let subscription = SubscriptionRequest::Bbo { coin };
417
418        cmd_tx
419            .send(HandlerCommand::Subscribe {
420                subscriptions: vec![subscription],
421            })
422            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
423        Ok(())
424    }
425
426    /// Unsubscribe from quote ticks for an instrument.
427    pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
428        let instrument = self
429            .get_instrument(&instrument_id)
430            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
431        let coin = instrument.raw_symbol().inner();
432
433        let subscription = SubscriptionRequest::Bbo { coin };
434
435        self.cmd_tx
436            .read()
437            .await
438            .send(HandlerCommand::Unsubscribe {
439                subscriptions: vec![subscription],
440            })
441            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
442        Ok(())
443    }
444
445    /// Subscribe to L2 order book for an instrument.
446    pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
447        let instrument = self
448            .get_instrument(&instrument_id)
449            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
450        let coin = instrument.raw_symbol().inner();
451
452        let cmd_tx = self.cmd_tx.read().await;
453
454        // Update the handler's coin→instrument mapping for this subscription
455        cmd_tx
456            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
457            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
458
459        let subscription = SubscriptionRequest::L2Book {
460            coin,
461            mantissa: None,
462            n_sig_figs: None,
463        };
464
465        cmd_tx
466            .send(HandlerCommand::Subscribe {
467                subscriptions: vec![subscription],
468            })
469            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
470        Ok(())
471    }
472
473    /// Unsubscribe from L2 order book for an instrument.
474    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
475        let instrument = self
476            .get_instrument(&instrument_id)
477            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
478        let coin = instrument.raw_symbol().inner();
479
480        let subscription = SubscriptionRequest::L2Book {
481            coin,
482            mantissa: None,
483            n_sig_figs: None,
484        };
485
486        self.cmd_tx
487            .read()
488            .await
489            .send(HandlerCommand::Unsubscribe {
490                subscriptions: vec![subscription],
491            })
492            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
493        Ok(())
494    }
495
496    /// Subscribe to candle/bar data for a specific coin and interval.
497    pub async fn subscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
498        // Get the instrument to extract the raw_symbol (Hyperliquid ticker)
499        let instrument = self
500            .get_instrument(&bar_type.instrument_id())
501            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
502        let coin = instrument.raw_symbol().inner();
503        let interval = bar_type_to_interval(&bar_type)?;
504        let subscription = SubscriptionRequest::Candle { coin, interval };
505
506        // Cache the bar type for parsing using canonical key
507        let key = format!("candle:{coin}:{interval}");
508        self.bar_types.insert(key.clone(), bar_type);
509
510        let cmd_tx = self.cmd_tx.read().await;
511
512        cmd_tx
513            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
514            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
515
516        cmd_tx
517            .send(HandlerCommand::AddBarType { key, bar_type })
518            .map_err(|e| anyhow::anyhow!("Failed to send AddBarType command: {e}"))?;
519
520        cmd_tx
521            .send(HandlerCommand::Subscribe {
522                subscriptions: vec![subscription],
523            })
524            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
525        Ok(())
526    }
527
528    /// Unsubscribe from candle/bar data.
529    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
530        // Get the instrument to extract the raw_symbol (Hyperliquid ticker)
531        let instrument = self
532            .get_instrument(&bar_type.instrument_id())
533            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
534        let coin = instrument.raw_symbol().inner();
535        let interval = bar_type_to_interval(&bar_type)?;
536        let subscription = SubscriptionRequest::Candle { coin, interval };
537
538        let key = format!("candle:{coin}:{interval}");
539        self.bar_types.remove(&key);
540
541        let cmd_tx = self.cmd_tx.read().await;
542
543        cmd_tx
544            .send(HandlerCommand::RemoveBarType { key })
545            .map_err(|e| anyhow::anyhow!("Failed to send RemoveBarType command: {e}"))?;
546
547        cmd_tx
548            .send(HandlerCommand::Unsubscribe {
549                subscriptions: vec![subscription],
550            })
551            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
552        Ok(())
553    }
554
555    /// Subscribe to mark price updates for an instrument.
556    pub async fn subscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
557        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
558            .await
559    }
560
561    /// Unsubscribe from mark price updates for an instrument.
562    pub async fn unsubscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
563        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
564            .await
565    }
566
567    /// Subscribe to index/oracle price updates for an instrument.
568    pub async fn subscribe_index_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
569        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
570            .await
571    }
572
573    /// Unsubscribe from index/oracle price updates for an instrument.
574    pub async fn unsubscribe_index_prices(
575        &self,
576        instrument_id: InstrumentId,
577    ) -> anyhow::Result<()> {
578        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
579            .await
580    }
581
582    /// Subscribe to funding rate updates for an instrument.
583    pub async fn subscribe_funding_rates(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
584        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
585            .await
586    }
587
588    /// Unsubscribe from funding rate updates for an instrument.
589    pub async fn unsubscribe_funding_rates(
590        &self,
591        instrument_id: InstrumentId,
592    ) -> anyhow::Result<()> {
593        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
594            .await
595    }
596
597    async fn subscribe_asset_context_data(
598        &self,
599        instrument_id: InstrumentId,
600        data_type: AssetContextDataType,
601    ) -> anyhow::Result<()> {
602        let instrument = self
603            .get_instrument(&instrument_id)
604            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
605        let coin = instrument.raw_symbol().inner();
606
607        let mut entry = self.asset_context_subs.entry(coin).or_default();
608        let is_first_subscription = entry.is_empty();
609        entry.insert(data_type);
610        let data_types = entry.clone();
611        drop(entry);
612
613        let cmd_tx = self.cmd_tx.read().await;
614
615        cmd_tx
616            .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
617            .map_err(|e| anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}"))?;
618
619        if is_first_subscription {
620            tracing::debug!(
621                "First asset context subscription for coin '{coin}', subscribing to ActiveAssetCtx"
622            );
623            let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
624
625            cmd_tx
626                .send(HandlerCommand::UpdateInstrument(instrument.clone()))
627                .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
628
629            cmd_tx
630                .send(HandlerCommand::Subscribe {
631                    subscriptions: vec![subscription],
632                })
633                .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
634        } else {
635            tracing::debug!(
636                "Already subscribed to ActiveAssetCtx for coin '{coin}', adding {data_type:?} to tracked types"
637            );
638        }
639
640        Ok(())
641    }
642
643    async fn unsubscribe_asset_context_data(
644        &self,
645        instrument_id: InstrumentId,
646        data_type: AssetContextDataType,
647    ) -> anyhow::Result<()> {
648        let instrument = self
649            .get_instrument(&instrument_id)
650            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
651        let coin = instrument.raw_symbol().inner();
652
653        if let Some(mut entry) = self.asset_context_subs.get_mut(&coin) {
654            entry.remove(&data_type);
655            let should_unsubscribe = entry.is_empty();
656            let data_types = entry.clone();
657            drop(entry);
658
659            let cmd_tx = self.cmd_tx.read().await;
660
661            if should_unsubscribe {
662                self.asset_context_subs.remove(&coin);
663
664                tracing::debug!(
665                    "Last asset context subscription removed for coin '{coin}', unsubscribing from ActiveAssetCtx"
666                );
667                let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
668
669                cmd_tx
670                    .send(HandlerCommand::UpdateAssetContextSubs {
671                        coin,
672                        data_types: HashSet::new(),
673                    })
674                    .map_err(|e| {
675                        anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
676                    })?;
677
678                cmd_tx
679                    .send(HandlerCommand::Unsubscribe {
680                        subscriptions: vec![subscription],
681                    })
682                    .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
683            } else {
684                tracing::debug!(
685                    "Removed {data_type:?} from tracked types for coin '{coin}', but keeping ActiveAssetCtx subscription"
686                );
687
688                cmd_tx
689                    .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
690                    .map_err(|e| {
691                        anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
692                    })?;
693            }
694        }
695
696        Ok(())
697    }
698
699    /// Caches multiple instruments.
700    ///
701    /// Clears the existing cache first, then adds all provided instruments.
702    /// Instruments are keyed by their full Nautilus symbol (e.g., "BTC-USD-PERP").
703    pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
704        self.instruments.clear();
705        for inst in instruments {
706            let symbol = inst.symbol().inner();
707            self.instruments.insert(symbol, inst.clone());
708        }
709        tracing::info!(
710            "Hyperliquid instrument cache initialized with {} instruments",
711            self.instruments.len()
712        );
713    }
714
715    /// Caches a single instrument.
716    ///
717    /// Any existing instrument with the same symbol will be replaced.
718    pub fn cache_instrument(&self, instrument: InstrumentAny) {
719        let symbol = instrument.symbol().inner();
720        self.instruments.insert(symbol, instrument.clone());
721
722        // Before connect() the handler isn't running; this send will fail and that's expected
723        // because connect() replays the instruments via InitializeInstruments
724        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
725            let _ = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument));
726        }
727    }
728
729    /// Gets an instrument from the cache by ID.
730    ///
731    /// Looks up the instrument by its full Nautilus symbol (e.g., "BTC-USD-PERP").
732    pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
733        self.instruments
734            .get(&id.symbol.inner())
735            .map(|e| e.value().clone())
736    }
737
738    /// Gets an instrument from the cache by symbol.
739    pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
740        self.instruments.get(symbol).map(|e| e.value().clone())
741    }
742
743    /// Returns the count of confirmed subscriptions.
744    pub fn subscription_count(&self) -> usize {
745        self.subscriptions.len()
746    }
747
748    /// Gets a bar type from the cache by coin and interval.
749    ///
750    /// This looks up the subscription key created when subscribing to bars.
751    pub fn get_bar_type(&self, coin: &str, interval: &str) -> Option<BarType> {
752        // Use canonical key format matching subscribe_bars
753        let key = format!("candle:{coin}:{interval}");
754        self.bar_types.get(&key).map(|entry| *entry.value())
755    }
756
757    /// Receives the next message from the WebSocket handler.
758    ///
759    /// Returns `None` if the handler has disconnected or the receiver was already taken.
760    pub async fn next_event(&mut self) -> Option<NautilusWsMessage> {
761        if let Some(ref mut rx) = self.out_rx {
762            rx.recv().await
763        } else {
764            None
765        }
766    }
767}
768
769/// Reconstructs a subscription request from a topic string.
770fn subscription_from_topic(topic: &str) -> anyhow::Result<SubscriptionRequest> {
771    let parts: Vec<&str> = topic.split(':').collect();
772
773    match parts.first() {
774        Some(&"allMids") => {
775            let dex = parts.get(1).map(|s| (*s).to_string());
776            Ok(SubscriptionRequest::AllMids { dex })
777        }
778        Some(&"notification") => Ok(SubscriptionRequest::Notification {
779            user: (*parts.get(1).context("Missing user")?).to_string(),
780        }),
781        Some(&"webData2") => Ok(SubscriptionRequest::WebData2 {
782            user: (*parts.get(1).context("Missing user")?).to_string(),
783        }),
784        Some(&"candle") => {
785            let coin = Ustr::from(parts.get(1).context("Missing coin")?);
786            let interval_str = parts.get(2).context("Missing interval")?;
787            let interval = HyperliquidBarInterval::from_str(interval_str)?;
788            Ok(SubscriptionRequest::Candle { coin, interval })
789        }
790        Some(&"l2Book") => Ok(SubscriptionRequest::L2Book {
791            coin: Ustr::from(parts.get(1).context("Missing coin")?),
792            mantissa: None,
793            n_sig_figs: None,
794        }),
795        Some(&"trades") => Ok(SubscriptionRequest::Trades {
796            coin: Ustr::from(parts.get(1).context("Missing coin")?),
797        }),
798        Some(&"orderUpdates") => Ok(SubscriptionRequest::OrderUpdates {
799            user: (*parts.get(1).context("Missing user")?).to_string(),
800        }),
801        Some(&"userEvents") => Ok(SubscriptionRequest::UserEvents {
802            user: (*parts.get(1).context("Missing user")?).to_string(),
803        }),
804        Some(&"userFills") => Ok(SubscriptionRequest::UserFills {
805            user: (*parts.get(1).context("Missing user")?).to_string(),
806            aggregate_by_time: None,
807        }),
808        Some(&"userFundings") => Ok(SubscriptionRequest::UserFundings {
809            user: (*parts.get(1).context("Missing user")?).to_string(),
810        }),
811        Some(&"userNonFundingLedgerUpdates") => {
812            Ok(SubscriptionRequest::UserNonFundingLedgerUpdates {
813                user: (*parts.get(1).context("Missing user")?).to_string(),
814            })
815        }
816        Some(&"activeAssetCtx") => Ok(SubscriptionRequest::ActiveAssetCtx {
817            coin: Ustr::from(parts.get(1).context("Missing coin")?),
818        }),
819        Some(&"activeSpotAssetCtx") => Ok(SubscriptionRequest::ActiveSpotAssetCtx {
820            coin: Ustr::from(parts.get(1).context("Missing coin")?),
821        }),
822        Some(&"activeAssetData") => Ok(SubscriptionRequest::ActiveAssetData {
823            user: (*parts.get(1).context("Missing user")?).to_string(),
824            coin: (*parts.get(2).context("Missing coin")?).to_string(),
825        }),
826        Some(&"userTwapSliceFills") => Ok(SubscriptionRequest::UserTwapSliceFills {
827            user: (*parts.get(1).context("Missing user")?).to_string(),
828        }),
829        Some(&"userTwapHistory") => Ok(SubscriptionRequest::UserTwapHistory {
830            user: (*parts.get(1).context("Missing user")?).to_string(),
831        }),
832        Some(&"bbo") => Ok(SubscriptionRequest::Bbo {
833            coin: Ustr::from(parts.get(1).context("Missing coin")?),
834        }),
835        Some(channel) => anyhow::bail!("Unknown subscription channel: {channel}"),
836        None => anyhow::bail!("Empty topic string"),
837    }
838}
839
840////////////////////////////////////////////////////////////////////////////////
841// Tests
842////////////////////////////////////////////////////////////////////////////////
843
844#[cfg(test)]
845mod tests {
846    use rstest::rstest;
847
848    use super::*;
849    use crate::common::enums::HyperliquidBarInterval;
850
851    /// Generates a unique topic key for a subscription request.
852    fn subscription_topic(sub: &SubscriptionRequest) -> String {
853        match sub {
854            SubscriptionRequest::AllMids { dex } => {
855                if let Some(dex) = dex {
856                    format!("allMids:{dex}")
857                } else {
858                    "allMids".to_string()
859                }
860            }
861            SubscriptionRequest::Notification { user } => format!("notification:{user}"),
862            SubscriptionRequest::WebData2 { user } => format!("webData2:{user}"),
863            SubscriptionRequest::Candle { coin, interval } => {
864                format!("candle:{coin}:{}", interval.as_str())
865            }
866            SubscriptionRequest::L2Book { coin, .. } => format!("l2Book:{coin}"),
867            SubscriptionRequest::Trades { coin } => format!("trades:{coin}"),
868            SubscriptionRequest::OrderUpdates { user } => format!("orderUpdates:{user}"),
869            SubscriptionRequest::UserEvents { user } => format!("userEvents:{user}"),
870            SubscriptionRequest::UserFills { user, .. } => format!("userFills:{user}"),
871            SubscriptionRequest::UserFundings { user } => format!("userFundings:{user}"),
872            SubscriptionRequest::UserNonFundingLedgerUpdates { user } => {
873                format!("userNonFundingLedgerUpdates:{user}")
874            }
875            SubscriptionRequest::ActiveAssetCtx { coin } => format!("activeAssetCtx:{coin}"),
876            SubscriptionRequest::ActiveSpotAssetCtx { coin } => {
877                format!("activeSpotAssetCtx:{coin}")
878            }
879            SubscriptionRequest::ActiveAssetData { user, coin } => {
880                format!("activeAssetData:{user}:{coin}")
881            }
882            SubscriptionRequest::UserTwapSliceFills { user } => {
883                format!("userTwapSliceFills:{user}")
884            }
885            SubscriptionRequest::UserTwapHistory { user } => format!("userTwapHistory:{user}"),
886            SubscriptionRequest::Bbo { coin } => format!("bbo:{coin}"),
887        }
888    }
889
890    #[rstest]
891    #[case(SubscriptionRequest::Trades { coin: "BTC".into() }, "trades:BTC")]
892    #[case(SubscriptionRequest::Bbo { coin: "BTC".into() }, "bbo:BTC")]
893    #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() }, "orderUpdates:0x123")]
894    #[case(SubscriptionRequest::UserEvents { user: "0xabc".to_string() }, "userEvents:0xabc")]
895    fn test_subscription_topic_generation(
896        #[case] subscription: SubscriptionRequest,
897        #[case] expected_topic: &str,
898    ) {
899        assert_eq!(subscription_topic(&subscription), expected_topic);
900    }
901
902    #[rstest]
903    fn test_subscription_topics_unique() {
904        let sub1 = SubscriptionRequest::Trades { coin: "BTC".into() };
905        let sub2 = SubscriptionRequest::Bbo { coin: "BTC".into() };
906
907        let topic1 = subscription_topic(&sub1);
908        let topic2 = subscription_topic(&sub2);
909
910        assert_ne!(topic1, topic2);
911    }
912
913    #[rstest]
914    #[case(SubscriptionRequest::Trades { coin: "BTC".into() })]
915    #[case(SubscriptionRequest::Bbo { coin: "ETH".into() })]
916    #[case(SubscriptionRequest::Candle { coin: "SOL".into(), interval: HyperliquidBarInterval::OneHour })]
917    #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() })]
918    fn test_subscription_reconstruction(#[case] subscription: SubscriptionRequest) {
919        let topic = subscription_topic(&subscription);
920        let reconstructed = subscription_from_topic(&topic).expect("Failed to reconstruct");
921        assert_eq!(subscription_topic(&reconstructed), topic);
922    }
923
924    #[rstest]
925    fn test_subscription_topic_candle() {
926        let sub = SubscriptionRequest::Candle {
927            coin: "BTC".into(),
928            interval: HyperliquidBarInterval::OneHour,
929        };
930
931        let topic = subscription_topic(&sub);
932        assert_eq!(topic, "candle:BTC:1h");
933    }
934}