nautilus_hyperliquid/websocket/
client.rs1use std::{collections::HashSet, sync::Arc, time::Duration};
17
18use anyhow::Result;
19use futures_util::future::BoxFuture;
20use nautilus_network::websocket::{WebSocketClient, WebSocketConfig, channel_message_handler};
21use tokio::sync::mpsc;
22use tokio_tungstenite::tungstenite::Message;
23use tracing::{debug, error, info, warn};
24
25use crate::{
26 http::error::{Error, Result as HyperliquidResult},
27 websocket::{
28 messages::{
29 ActionPayload, HyperliquidWsMessage, HyperliquidWsRequest, PostRequest,
30 PostResponsePayload, SubscriptionRequest,
31 },
32 post::{
33 PostBatcher, PostIds, PostLane, PostRouter, ScheduledPost, WsSender, lane_for_action,
34 },
35 },
36};
37
38#[derive(Debug, Clone, thiserror::Error)]
40pub enum HyperliquidError {
41 #[error("URL parsing failed: {0}")]
42 UrlParsing(String),
43
44 #[error("Message serialization failed: {0}")]
45 MessageSerialization(String),
46
47 #[error("Message deserialization failed: {0}")]
48 MessageDeserialization(String),
49
50 #[error("WebSocket connection failed: {0}")]
51 Connection(String),
52
53 #[error("Channel send failed: {0}")]
54 ChannelSend(String),
55}
56
57#[derive(Debug, Default)]
62pub struct HyperliquidCodec;
63
64impl HyperliquidCodec {
65 pub fn new() -> Self {
67 Self
68 }
69
70 pub fn validate_url(url: &str) -> Result<(), HyperliquidError> {
72 if url.starts_with("ws://") || url.starts_with("wss://") {
73 Ok(())
74 } else {
75 Err(HyperliquidError::UrlParsing(format!(
76 "URL must start with ws:// or wss://, got: {}",
77 url
78 )))
79 }
80 }
81
82 pub fn encode(&self, request: &HyperliquidWsRequest) -> Result<Vec<u8>, HyperliquidError> {
84 serde_json::to_vec(request).map_err(|e| {
85 HyperliquidError::MessageSerialization(format!("Failed to serialize request: {}", e))
86 })
87 }
88
89 pub fn decode(&self, data: &[u8]) -> Result<HyperliquidWsMessage, HyperliquidError> {
91 serde_json::from_slice(data).map_err(|e| {
92 HyperliquidError::MessageDeserialization(format!(
93 "Failed to deserialize message: {}",
94 e
95 ))
96 })
97 }
98}
99
100#[derive(Debug)]
105pub struct HyperliquidWebSocketInnerClient {
106 inner: Arc<WebSocketClient>,
107 rx_inbound: mpsc::Receiver<HyperliquidWsMessage>,
108 sent_subscriptions: HashSet<String>,
109 _reader_task: tokio::task::JoinHandle<()>,
110 post_router: Arc<PostRouter>,
111 post_ids: PostIds,
112 #[allow(dead_code)] ws_sender: WsSender,
114 post_batcher: PostBatcher,
115}
116
117impl HyperliquidWebSocketInnerClient {
118 pub async fn connect(url: &str) -> Result<Self> {
121 let (message_handler, mut raw_rx) = channel_message_handler();
123
124 let cfg = WebSocketConfig {
125 url: url.to_string(),
126 headers: vec![],
127 message_handler: Some(message_handler),
128 heartbeat: Some(20), heartbeat_msg: None, ping_handler: None,
131 reconnect_timeout_ms: Some(15_000),
132 reconnect_delay_initial_ms: Some(250),
133 reconnect_delay_max_ms: Some(5_000),
134 reconnect_backoff_factor: Some(2.0),
135 reconnect_jitter_ms: Some(200),
136 };
137
138 let client = Arc::new(WebSocketClient::connect(cfg, None, vec![], None).await?);
139 info!("Hyperliquid WebSocket connected: {}", url);
140
141 let post_router = PostRouter::new();
142 let post_ids = PostIds::new(1);
143 let (tx_inbound, rx_inbound) = mpsc::channel::<HyperliquidWsMessage>(1024);
144 let (tx_outbound, mut rx_outbound) = mpsc::channel::<HyperliquidWsRequest>(1024);
145
146 let ws_sender = WsSender::new(tx_outbound);
147
148 let post_router_for_reader = Arc::clone(&post_router);
150 let reader_task = tokio::spawn(async move {
151 while let Some(msg) = raw_rx.recv().await {
152 match msg {
153 Message::Text(txt) => {
154 debug!("Received WS text: {}", txt);
155 match serde_json::from_str::<HyperliquidWsMessage>(&txt) {
156 Ok(hl_msg) => {
157 if let HyperliquidWsMessage::Post { data } = &hl_msg {
158 post_router_for_reader.complete(data.clone()).await;
160 }
161 if let Err(e) = tx_inbound.send(hl_msg).await {
162 error!("Failed to send decoded message: {}", e);
163 break;
164 }
165 }
166 Err(err) => {
167 error!(
168 "Failed to decode Hyperliquid message: {} | text: {}",
169 err, txt
170 );
171 }
172 }
173 }
174 Message::Binary(data) => {
175 debug!("Received binary message ({} bytes), ignoring", data.len())
176 }
177 Message::Ping(data) => debug!("Received ping frame ({} bytes)", data.len()),
178 Message::Pong(data) => debug!("Received pong frame ({} bytes)", data.len()),
179 Message::Close(close_frame) => {
180 info!("Received close frame: {:?}", close_frame);
181 break;
182 }
183 Message::Frame(_) => warn!("Received raw frame (unexpected)"),
184 }
185 }
186 info!("Hyperliquid WebSocket reader finished");
187 });
188
189 let client_for_sender = Arc::clone(&client);
191 tokio::spawn(async move {
192 while let Some(req) = rx_outbound.recv().await {
193 let json = match serde_json::to_string(&req) {
194 Ok(json) => json,
195 Err(e) => {
196 error!("Failed to serialize WS request: {}", e);
197 continue;
198 }
199 };
200 debug!("Sending WS message: {}", json);
201 if let Err(e) = client_for_sender.send_text(json, None).await {
202 error!("Failed to send WS message: {}", e);
203 break;
204 }
205 }
206 info!("WebSocket sender task finished");
207 });
208
209 let ws_sender_for_batcher = ws_sender.clone();
211
212 let send_fn =
213 move |req: HyperliquidWsRequest| -> BoxFuture<'static, HyperliquidResult<()>> {
214 let sender = ws_sender_for_batcher.clone();
215 Box::pin(async move { sender.send(req).await })
216 };
217
218 let post_batcher = PostBatcher::new(send_fn);
219
220 let hl_client = Self {
221 inner: client,
222 rx_inbound,
223 sent_subscriptions: HashSet::new(),
224 _reader_task: reader_task,
225 post_router,
226 post_ids,
227 ws_sender,
228 post_batcher,
229 };
230
231 Ok(hl_client)
232 }
233
234 pub async fn ws_send(&self, request: &HyperliquidWsRequest) -> Result<()> {
236 let json = serde_json::to_string(request)?;
237 debug!("Sending WS message: {}", json);
238 self.inner
239 .send_text(json, None)
240 .await
241 .map_err(|e| anyhow::anyhow!(e))
242 }
243
244 pub async fn ws_send_once(&mut self, request: &HyperliquidWsRequest) -> Result<()> {
246 let json = serde_json::to_string(request)?;
247 if self.sent_subscriptions.contains(&json) {
248 debug!("Skipping duplicate request: {}", json);
249 return Ok(());
250 }
251
252 debug!("Sending WS message: {}", json);
253 self.inner
254 .send_text(json.clone(), None)
255 .await
256 .map_err(|e| anyhow::anyhow!(e))?;
257
258 self.sent_subscriptions.insert(json);
259 Ok(())
260 }
261
262 pub async fn ws_subscribe(&mut self, subscription: SubscriptionRequest) -> Result<()> {
264 let request = HyperliquidWsRequest::Subscribe { subscription };
265 self.ws_send_once(&request).await
266 }
267
268 pub async fn ws_next_event(&mut self) -> Option<HyperliquidWsMessage> {
271 self.rx_inbound.recv().await
272 }
273
274 pub fn is_active(&self) -> bool {
276 self.inner.is_active()
277 }
278
279 pub fn is_reconnecting(&self) -> bool {
281 self.inner.is_reconnecting()
282 }
283
284 pub fn is_disconnecting(&self) -> bool {
286 self.inner.is_disconnecting()
287 }
288
289 pub fn is_closed(&self) -> bool {
291 self.inner.is_closed()
292 }
293
294 pub async fn ws_disconnect(&mut self) -> Result<()> {
296 self.inner.disconnect().await;
297 Ok(())
298 }
299
300 async fn enqueue_post(
302 &self,
303 id: u64,
304 request: PostRequest,
305 lane: PostLane,
306 ) -> HyperliquidResult<()> {
307 self.post_batcher
308 .enqueue(ScheduledPost { id, request, lane })
309 .await
310 }
311
312 pub async fn post_info_raw(
314 &self,
315 payload: serde_json::Value,
316 timeout: Duration,
317 ) -> HyperliquidResult<PostResponsePayload> {
318 let id = self.post_ids.next();
319 let rx = self.post_router.register(id).await?;
320 self.enqueue_post(id, PostRequest::Info { payload }, PostLane::Normal)
321 .await?;
322 let resp = self.post_router.await_with_timeout(id, rx, timeout).await?;
323 Ok(resp.response)
324 }
325
326 pub async fn post_action_raw(
328 &self,
329 action: ActionPayload,
330 timeout: Duration,
331 ) -> HyperliquidResult<PostResponsePayload> {
332 let id = self.post_ids.next();
333 let rx = self.post_router.register(id).await?;
334 let lane = lane_for_action(&action.action);
335 self.enqueue_post(id, PostRequest::Action { payload: action }, lane)
336 .await?;
337 let resp = self.post_router.await_with_timeout(id, rx, timeout).await?;
338 Ok(resp.response)
339 }
340
341 pub async fn info_l2_book(
343 &self,
344 coin: &str,
345 timeout: Duration,
346 ) -> HyperliquidResult<crate::http::models::HyperliquidL2Book> {
347 let payload = match self
348 .post_info_raw(serde_json::json!({"type":"l2Book","coin":coin}), timeout)
349 .await?
350 {
351 PostResponsePayload::Info { payload } => payload,
352 PostResponsePayload::Error { payload } => return Err(Error::exchange(payload)),
353 PostResponsePayload::Action { .. } => {
354 return Err(Error::decode("expected info payload, got action"));
355 }
356 };
357 serde_json::from_value(payload).map_err(Error::Serde)
358 }
359}
360
361#[derive(Debug)]
366pub struct HyperliquidWebSocketClient {
367 inner: HyperliquidWebSocketInnerClient,
368}
369
370impl HyperliquidWebSocketClient {
371 pub async fn connect(url: &str) -> Result<Self> {
373 let inner = HyperliquidWebSocketInnerClient::connect(url).await?;
374 Ok(Self { inner })
375 }
376
377 pub async fn subscribe_order_updates(&mut self, user: &str) -> Result<()> {
379 let subscription = SubscriptionRequest::OrderUpdates {
380 user: user.to_string(),
381 };
382 self.inner.ws_subscribe(subscription).await
383 }
384
385 pub async fn subscribe_user_events(&mut self, user: &str) -> Result<()> {
387 let subscription = SubscriptionRequest::UserEvents {
388 user: user.to_string(),
389 };
390 self.inner.ws_subscribe(subscription).await
391 }
392
393 pub async fn subscribe_all_user_channels(&mut self, user: &str) -> Result<()> {
395 self.subscribe_order_updates(user).await?;
396 self.subscribe_user_events(user).await?;
397 Ok(())
398 }
399
400 pub async fn next_event(&mut self) -> Option<HyperliquidWsMessage> {
403 self.inner.ws_next_event().await
404 }
405
406 pub fn is_active(&self) -> bool {
408 self.inner.is_active()
409 }
410
411 pub fn is_reconnecting(&self) -> bool {
413 self.inner.is_reconnecting()
414 }
415
416 pub fn is_disconnecting(&self) -> bool {
418 self.inner.is_disconnecting()
419 }
420
421 pub fn is_closed(&self) -> bool {
423 self.inner.is_closed()
424 }
425
426 pub async fn disconnect(&mut self) -> Result<()> {
428 self.inner.ws_disconnect().await
429 }
430
431 pub async fn send_raw(&mut self, request: &HyperliquidWsRequest) -> Result<()> {
433 self.inner.ws_send(request).await
434 }
435
436 pub async fn info_l2_book(
438 &mut self,
439 coin: &str,
440 timeout: Duration,
441 ) -> HyperliquidResult<crate::http::models::HyperliquidL2Book> {
442 self.inner.info_l2_book(coin, timeout).await
443 }
444
445 pub async fn post_info_raw(
447 &mut self,
448 payload: serde_json::Value,
449 timeout: Duration,
450 ) -> HyperliquidResult<PostResponsePayload> {
451 self.inner.post_info_raw(payload, timeout).await
452 }
453
454 pub async fn post_action_raw(
456 &mut self,
457 action: ActionPayload,
458 timeout: Duration,
459 ) -> HyperliquidResult<PostResponsePayload> {
460 self.inner.post_action_raw(action, timeout).await
461 }
462}