nautilus_hyperliquid/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashSet, sync::Arc, time::Duration};
17
18use anyhow::Result;
19use futures_util::future::BoxFuture;
20use nautilus_network::websocket::{WebSocketClient, WebSocketConfig, channel_message_handler};
21use tokio::sync::mpsc;
22use tokio_tungstenite::tungstenite::Message;
23use tracing::{debug, error, info, warn};
24
25use crate::{
26    http::error::{Error, Result as HyperliquidResult},
27    websocket::{
28        messages::{
29            ActionPayload, HyperliquidWsMessage, HyperliquidWsRequest, PostRequest,
30            PostResponsePayload, SubscriptionRequest,
31        },
32        post::{
33            PostBatcher, PostIds, PostLane, PostRouter, ScheduledPost, WsSender, lane_for_action,
34        },
35    },
36};
37
38/// Errors that can occur during Hyperliquid WebSocket operations.
39#[derive(Debug, Clone, thiserror::Error)]
40pub enum HyperliquidError {
41    #[error("URL parsing failed: {0}")]
42    UrlParsing(String),
43
44    #[error("Message serialization failed: {0}")]
45    MessageSerialization(String),
46
47    #[error("Message deserialization failed: {0}")]
48    MessageDeserialization(String),
49
50    #[error("WebSocket connection failed: {0}")]
51    Connection(String),
52
53    #[error("Channel send failed: {0}")]
54    ChannelSend(String),
55}
56
57/// Codec for encoding and decoding Hyperliquid WebSocket messages.
58///
59/// This struct provides methods to validate URLs and serialize/deserialize messages
60/// according to the Hyperliquid WebSocket protocol.
61#[derive(Debug, Default)]
62pub struct HyperliquidCodec;
63
64impl HyperliquidCodec {
65    /// Creates a new Hyperliquid codec instance.
66    pub fn new() -> Self {
67        Self
68    }
69
70    /// Validates that a URL is a proper WebSocket URL.
71    pub fn validate_url(url: &str) -> Result<(), HyperliquidError> {
72        if url.starts_with("ws://") || url.starts_with("wss://") {
73            Ok(())
74        } else {
75            Err(HyperliquidError::UrlParsing(format!(
76                "URL must start with ws:// or wss://, got: {}",
77                url
78            )))
79        }
80    }
81
82    /// Encodes a WebSocket request to JSON bytes.
83    pub fn encode(&self, request: &HyperliquidWsRequest) -> Result<Vec<u8>, HyperliquidError> {
84        serde_json::to_vec(request).map_err(|e| {
85            HyperliquidError::MessageSerialization(format!("Failed to serialize request: {}", e))
86        })
87    }
88
89    /// Decodes JSON bytes to a WebSocket message.
90    pub fn decode(&self, data: &[u8]) -> Result<HyperliquidWsMessage, HyperliquidError> {
91        serde_json::from_slice(data).map_err(|e| {
92            HyperliquidError::MessageDeserialization(format!(
93                "Failed to deserialize message: {}",
94                e
95            ))
96        })
97    }
98}
99
100/// Low-level Hyperliquid WebSocket client that wraps Nautilus WebSocketClient.
101///
102/// This is the inner client that handles the transport layer and provides low-level
103/// WebSocket methods with `ws_*` prefixes.
104#[derive(Debug)]
105pub struct HyperliquidWebSocketInnerClient {
106    inner: Arc<WebSocketClient>,
107    rx_inbound: mpsc::Receiver<HyperliquidWsMessage>,
108    sent_subscriptions: HashSet<String>,
109    _reader_task: tokio::task::JoinHandle<()>,
110    post_router: Arc<PostRouter>,
111    post_ids: PostIds,
112    #[allow(dead_code)] // Reserved for future direct WebSocket operations
113    ws_sender: WsSender,
114    post_batcher: PostBatcher,
115}
116
117impl HyperliquidWebSocketInnerClient {
118    /// Creates a new Hyperliquid WebSocket inner client with reconnection/backoff/heartbeat.
119    /// Returns a client that owns the inbound message receiver.
120    pub async fn connect(url: &str) -> Result<Self> {
121        // Create message handler for receiving raw WebSocket messages
122        let (message_handler, mut raw_rx) = channel_message_handler();
123
124        let cfg = WebSocketConfig {
125            url: url.to_string(),
126            headers: vec![],
127            message_handler: Some(message_handler),
128            heartbeat: Some(20), // seconds; set lower than server idle timeout
129            heartbeat_msg: None, // use WS Ping frames by default
130            ping_handler: None,
131            reconnect_timeout_ms: Some(15_000),
132            reconnect_delay_initial_ms: Some(250),
133            reconnect_delay_max_ms: Some(5_000),
134            reconnect_backoff_factor: Some(2.0),
135            reconnect_jitter_ms: Some(200),
136        };
137
138        let client = Arc::new(WebSocketClient::connect(cfg, None, vec![], None).await?);
139        info!("Hyperliquid WebSocket connected: {}", url);
140
141        let post_router = PostRouter::new();
142        let post_ids = PostIds::new(1);
143        let (tx_inbound, rx_inbound) = mpsc::channel::<HyperliquidWsMessage>(1024);
144        let (tx_outbound, mut rx_outbound) = mpsc::channel::<HyperliquidWsRequest>(1024);
145
146        let ws_sender = WsSender::new(tx_outbound);
147
148        // Reader task: decode messages and route post replies *before* handing to general pipeline.
149        let post_router_for_reader = Arc::clone(&post_router);
150        let reader_task = tokio::spawn(async move {
151            while let Some(msg) = raw_rx.recv().await {
152                match msg {
153                    Message::Text(txt) => {
154                        debug!("Received WS text: {}", txt);
155                        match serde_json::from_str::<HyperliquidWsMessage>(&txt) {
156                            Ok(hl_msg) => {
157                                if let HyperliquidWsMessage::Post { data } = &hl_msg {
158                                    // Route the correlated response
159                                    post_router_for_reader.complete(data.clone()).await;
160                                }
161                                if let Err(e) = tx_inbound.send(hl_msg).await {
162                                    error!("Failed to send decoded message: {}", e);
163                                    break;
164                                }
165                            }
166                            Err(err) => {
167                                error!(
168                                    "Failed to decode Hyperliquid message: {} | text: {}",
169                                    err, txt
170                                );
171                            }
172                        }
173                    }
174                    Message::Binary(data) => {
175                        debug!("Received binary message ({} bytes), ignoring", data.len())
176                    }
177                    Message::Ping(data) => debug!("Received ping frame ({} bytes)", data.len()),
178                    Message::Pong(data) => debug!("Received pong frame ({} bytes)", data.len()),
179                    Message::Close(close_frame) => {
180                        info!("Received close frame: {:?}", close_frame);
181                        break;
182                    }
183                    Message::Frame(_) => warn!("Received raw frame (unexpected)"),
184                }
185            }
186            info!("Hyperliquid WebSocket reader finished");
187        });
188
189        // Spawn task to handle outbound messages
190        let client_for_sender = Arc::clone(&client);
191        tokio::spawn(async move {
192            while let Some(req) = rx_outbound.recv().await {
193                let json = match serde_json::to_string(&req) {
194                    Ok(json) => json,
195                    Err(e) => {
196                        error!("Failed to serialize WS request: {}", e);
197                        continue;
198                    }
199                };
200                debug!("Sending WS message: {}", json);
201                if let Err(e) = client_for_sender.send_text(json, None).await {
202                    error!("Failed to send WS message: {}", e);
203                    break;
204                }
205            }
206            info!("WebSocket sender task finished");
207        });
208
209        // Create send function for batcher using a proper async closure
210        let ws_sender_for_batcher = ws_sender.clone();
211
212        let send_fn =
213            move |req: HyperliquidWsRequest| -> BoxFuture<'static, HyperliquidResult<()>> {
214                let sender = ws_sender_for_batcher.clone();
215                Box::pin(async move { sender.send(req).await })
216            };
217
218        let post_batcher = PostBatcher::new(send_fn);
219
220        let hl_client = Self {
221            inner: client,
222            rx_inbound,
223            sent_subscriptions: HashSet::new(),
224            _reader_task: reader_task,
225            post_router,
226            post_ids,
227            ws_sender,
228            post_batcher,
229        };
230
231        Ok(hl_client)
232    }
233
234    /// Low-level method to send a Hyperliquid WebSocket request.
235    pub async fn ws_send(&self, request: &HyperliquidWsRequest) -> Result<()> {
236        let json = serde_json::to_string(request)?;
237        debug!("Sending WS message: {}", json);
238        self.inner
239            .send_text(json, None)
240            .await
241            .map_err(|e| anyhow::anyhow!(e))
242    }
243
244    /// Low-level method to send a request only once (dedup by JSON serialization).
245    pub async fn ws_send_once(&mut self, request: &HyperliquidWsRequest) -> Result<()> {
246        let json = serde_json::to_string(request)?;
247        if self.sent_subscriptions.contains(&json) {
248            debug!("Skipping duplicate request: {}", json);
249            return Ok(());
250        }
251
252        debug!("Sending WS message: {}", json);
253        self.inner
254            .send_text(json.clone(), None)
255            .await
256            .map_err(|e| anyhow::anyhow!(e))?;
257
258        self.sent_subscriptions.insert(json);
259        Ok(())
260    }
261
262    /// Low-level method to subscribe to a specific channel.
263    pub async fn ws_subscribe(&mut self, subscription: SubscriptionRequest) -> Result<()> {
264        let request = HyperliquidWsRequest::Subscribe { subscription };
265        self.ws_send_once(&request).await
266    }
267
268    /// Get the next event from the WebSocket stream.
269    /// Returns None when the connection is closed or the receiver is exhausted.
270    pub async fn ws_next_event(&mut self) -> Option<HyperliquidWsMessage> {
271        self.rx_inbound.recv().await
272    }
273
274    /// Returns true if the WebSocket connection is active.
275    pub fn is_active(&self) -> bool {
276        self.inner.is_active()
277    }
278
279    /// Returns true if the WebSocket is reconnecting.
280    pub fn is_reconnecting(&self) -> bool {
281        self.inner.is_reconnecting()
282    }
283
284    /// Returns true if the WebSocket is disconnecting.
285    pub fn is_disconnecting(&self) -> bool {
286        self.inner.is_disconnecting()
287    }
288
289    /// Returns true if the WebSocket is closed.
290    pub fn is_closed(&self) -> bool {
291        self.inner.is_closed()
292    }
293
294    /// Disconnect the WebSocket client.
295    pub async fn ws_disconnect(&mut self) -> Result<()> {
296        self.inner.disconnect().await;
297        Ok(())
298    }
299
300    /// Convenience: enqueue a post on a specific lane.
301    async fn enqueue_post(
302        &self,
303        id: u64,
304        request: PostRequest,
305        lane: PostLane,
306    ) -> HyperliquidResult<()> {
307        self.post_batcher
308            .enqueue(ScheduledPost { id, request, lane })
309            .await
310    }
311
312    /// Core: send an Info post and await response with timeout.
313    pub async fn post_info_raw(
314        &self,
315        payload: serde_json::Value,
316        timeout: Duration,
317    ) -> HyperliquidResult<PostResponsePayload> {
318        let id = self.post_ids.next();
319        let rx = self.post_router.register(id).await?;
320        self.enqueue_post(id, PostRequest::Info { payload }, PostLane::Normal)
321            .await?;
322        let resp = self.post_router.await_with_timeout(id, rx, timeout).await?;
323        Ok(resp.response)
324    }
325
326    /// Core: send an Action post and await response with timeout.
327    pub async fn post_action_raw(
328        &self,
329        action: ActionPayload,
330        timeout: Duration,
331    ) -> HyperliquidResult<PostResponsePayload> {
332        let id = self.post_ids.next();
333        let rx = self.post_router.register(id).await?;
334        let lane = lane_for_action(&action.action);
335        self.enqueue_post(id, PostRequest::Action { payload: action }, lane)
336            .await?;
337        let resp = self.post_router.await_with_timeout(id, rx, timeout).await?;
338        Ok(resp.response)
339    }
340
341    /// Get l2Book via WS post and parse using shared REST model.
342    pub async fn info_l2_book(
343        &self,
344        coin: &str,
345        timeout: Duration,
346    ) -> HyperliquidResult<crate::http::models::HyperliquidL2Book> {
347        let payload = match self
348            .post_info_raw(serde_json::json!({"type":"l2Book","coin":coin}), timeout)
349            .await?
350        {
351            PostResponsePayload::Info { payload } => payload,
352            PostResponsePayload::Error { payload } => return Err(Error::exchange(payload)),
353            PostResponsePayload::Action { .. } => {
354                return Err(Error::decode("expected info payload, got action"));
355            }
356        };
357        serde_json::from_value(payload).map_err(Error::Serde)
358    }
359}
360
361/// High-level Hyperliquid WebSocket client that provides standardized domain methods.
362///
363/// This is the outer client that wraps the inner client and provides Nautilus-specific
364/// functionality for WebSocket operations using standard domain methods.
365#[derive(Debug)]
366pub struct HyperliquidWebSocketClient {
367    inner: HyperliquidWebSocketInnerClient,
368}
369
370impl HyperliquidWebSocketClient {
371    /// Creates a new Hyperliquid WebSocket client.
372    pub async fn connect(url: &str) -> Result<Self> {
373        let inner = HyperliquidWebSocketInnerClient::connect(url).await?;
374        Ok(Self { inner })
375    }
376
377    /// Subscribe to order updates for a specific user address.
378    pub async fn subscribe_order_updates(&mut self, user: &str) -> Result<()> {
379        let subscription = SubscriptionRequest::OrderUpdates {
380            user: user.to_string(),
381        };
382        self.inner.ws_subscribe(subscription).await
383    }
384
385    /// Subscribe to user events (fills, funding, liquidations) for a specific user address.
386    pub async fn subscribe_user_events(&mut self, user: &str) -> Result<()> {
387        let subscription = SubscriptionRequest::UserEvents {
388            user: user.to_string(),
389        };
390        self.inner.ws_subscribe(subscription).await
391    }
392
393    /// Subscribe to all user channels (order updates + user events) for convenience.
394    pub async fn subscribe_all_user_channels(&mut self, user: &str) -> Result<()> {
395        self.subscribe_order_updates(user).await?;
396        self.subscribe_user_events(user).await?;
397        Ok(())
398    }
399
400    /// Get the next event from the WebSocket stream.
401    /// Returns None when the connection is closed or the receiver is exhausted.
402    pub async fn next_event(&mut self) -> Option<HyperliquidWsMessage> {
403        self.inner.ws_next_event().await
404    }
405
406    /// Returns true if the WebSocket connection is active.
407    pub fn is_active(&self) -> bool {
408        self.inner.is_active()
409    }
410
411    /// Returns true if the WebSocket is reconnecting.
412    pub fn is_reconnecting(&self) -> bool {
413        self.inner.is_reconnecting()
414    }
415
416    /// Returns true if the WebSocket is disconnecting.
417    pub fn is_disconnecting(&self) -> bool {
418        self.inner.is_disconnecting()
419    }
420
421    /// Returns true if the WebSocket is closed.
422    pub fn is_closed(&self) -> bool {
423        self.inner.is_closed()
424    }
425
426    /// Disconnect the WebSocket client.
427    pub async fn disconnect(&mut self) -> Result<()> {
428        self.inner.ws_disconnect().await
429    }
430
431    /// Escape hatch: send raw requests for tests/power users.
432    pub async fn send_raw(&mut self, request: &HyperliquidWsRequest) -> Result<()> {
433        self.inner.ws_send(request).await
434    }
435
436    /// High-level: call info l2Book (WS post)
437    pub async fn info_l2_book(
438        &mut self,
439        coin: &str,
440        timeout: Duration,
441    ) -> HyperliquidResult<crate::http::models::HyperliquidL2Book> {
442        self.inner.info_l2_book(coin, timeout).await
443    }
444
445    /// High-level: fire arbitrary info (WS post) returning raw payload.
446    pub async fn post_info_raw(
447        &mut self,
448        payload: serde_json::Value,
449        timeout: Duration,
450    ) -> HyperliquidResult<PostResponsePayload> {
451        self.inner.post_info_raw(payload, timeout).await
452    }
453
454    /// High-level: fire action (already signed ActionPayload)
455    pub async fn post_action_raw(
456        &mut self,
457        action: ActionPayload,
458        timeout: Duration,
459    ) -> HyperliquidResult<PostResponsePayload> {
460        self.inner.post_action_raw(action, timeout).await
461    }
462}