nautilus_hyperliquid/websocket/
codec.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::str::FromStr;
17
18use rust_decimal::Decimal;
19use serde::{Deserialize, Deserializer, Serialize, Serializer};
20use ustr::Ustr;
21
22use crate::{
23    common::enums::{
24        HyperliquidBarInterval::{self, OneMinute},
25        HyperliquidSide,
26    },
27    websocket::{
28        HyperliquidWsChannel, HyperliquidWsError,
29        messages::{
30            HyperliquidWsMessage, HyperliquidWsRequest, PostRequest, SubscriptionRequest,
31            WsLevelData,
32        },
33    },
34};
35
36/// Codec for encoding and decoding Hyperliquid WebSocket messages.
37///
38/// This struct provides methods to validate URLs and serialize/deserialize messages,
39/// according to the Hyperliquid WebSocket protocol.
40#[derive(Debug, Default)]
41pub struct HyperliquidCodec;
42
43impl HyperliquidCodec {
44    /// Creates a new Hyperliquid codec instance.
45    pub fn new() -> Self {
46        Self
47    }
48
49    /// Validates that a URL is a proper WebSocket URL.
50    pub fn validate_url(url: &str) -> Result<(), HyperliquidWsError> {
51        if url.starts_with("ws://") || url.starts_with("wss://") {
52            Ok(())
53        } else {
54            Err(HyperliquidWsError::UrlParsing(format!(
55                "URL must start with ws:// or wss://, was: {}",
56                url
57            )))
58        }
59    }
60
61    /// Encodes a WebSocket request to JSON bytes.
62    pub fn encode(&self, request: &HyperliquidWsRequest) -> Result<Vec<u8>, HyperliquidWsError> {
63        serde_json::to_vec(request).map_err(|e| {
64            HyperliquidWsError::MessageSerialization(format!("Failed to serialize request: {e}"))
65        })
66    }
67
68    /// Decodes JSON bytes to a WebSocket message.
69    pub fn decode(&self, data: &[u8]) -> Result<HyperliquidWsMessage, HyperliquidWsError> {
70        serde_json::from_slice(data).map_err(|e| {
71            HyperliquidWsError::MessageDeserialization(format!(
72                "Failed to deserialize message: {}",
73                e
74            ))
75        })
76    }
77}
78
79/// Canonical outbound (mirrors OKX/BitMEX "op + args" pattern).
80#[derive(Debug, Clone, Serialize, Deserialize)]
81#[serde(tag = "op", rename_all = "camelCase")]
82pub enum WsOutbound {
83    Subscribe {
84        args: Vec<SubArg>,
85        id: Option<String>,
86    },
87    Unsubscribe {
88        args: Vec<SubArg>,
89        id: Option<String>,
90    },
91    Ping,
92    Post {
93        id: String,
94        path: String,
95        body: serde_json::Value,
96    },
97    Auth {
98        payload: serde_json::Value,
99    },
100}
101
102// Type aliases for convenience and compatibility with your request
103pub type SubRequest = SubArg;
104pub type TradeSide = Side;
105
106#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
107pub struct SubArg {
108    pub channel: HyperliquidWsChannel,
109    #[serde(default)]
110    pub symbol: Option<Ustr>, // unified symbol (coin in Hyperliquid)
111    #[serde(default)]
112    pub params: Option<serde_json::Value>, // {"interval":"1m","user":"0x123"} etc.
113}
114
115/// Canonical inbound (single tagged enum). Unknown stays debuggable.
116#[derive(Debug, Clone, Serialize, Deserialize)]
117#[serde(tag = "channel", content = "data", rename_all = "camelCase")]
118pub enum WsInbound {
119    Trades(Vec<WsTrade>),
120    L2Book(WsBook),
121    Bbo(WsBbo),
122    Candle(Vec<WsCandle>),
123    AllMids(Vec<WsMid>),
124    UserFills(Vec<WsFill>),
125    UserFundings(Vec<WsFunding>),
126    UserEvents(Vec<WsUserEvent>),
127
128    SubscriptionResponse(SubResp),
129    Pong(Option<i64>),
130    Notification(Notice),
131    Post(PostAck),
132
133    #[serde(other)]
134    Unknown,
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct SubResp {
139    pub ok: bool,
140    pub id: Option<String>,
141    pub message: Option<String>,
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct Notice {
146    pub code: Option<String>,
147    pub msg: Option<String>,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct PostAck {
152    pub id: String,
153    pub ok: bool,
154    pub error: Option<String>,
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct WsTrade {
159    pub instrument: Ustr,
160    #[serde(
161        serialize_with = "serialize_decimal",
162        deserialize_with = "deserialize_decimal"
163    )]
164    pub px: Decimal,
165    #[serde(
166        serialize_with = "serialize_decimal",
167        deserialize_with = "deserialize_decimal"
168    )]
169    pub qty: Decimal,
170    pub side: Side,
171    pub ts: i64, // ms
172    pub id: Option<String>,
173}
174
175#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
176#[serde(rename_all = "lowercase")]
177pub enum Side {
178    Buy,
179    Sell,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct WsBbo {
184    pub instrument: Ustr,
185    #[serde(
186        serialize_with = "serialize_decimal",
187        deserialize_with = "deserialize_decimal"
188    )]
189    pub bid_px: Decimal,
190    #[serde(
191        serialize_with = "serialize_decimal",
192        deserialize_with = "deserialize_decimal"
193    )]
194    pub bid_qty: Decimal,
195    #[serde(
196        serialize_with = "serialize_decimal",
197        deserialize_with = "deserialize_decimal"
198    )]
199    pub ask_px: Decimal,
200    #[serde(
201        serialize_with = "serialize_decimal",
202        deserialize_with = "deserialize_decimal"
203    )]
204    pub ask_qty: Decimal,
205    pub ts: i64, // ms
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct WsCandle {
210    pub instrument: Ustr,
211    pub interval: HyperliquidBarInterval,
212    pub open_ts: i64,
213    #[serde(
214        serialize_with = "serialize_decimal",
215        deserialize_with = "deserialize_decimal"
216    )]
217    pub o: Decimal,
218    #[serde(
219        serialize_with = "serialize_decimal",
220        deserialize_with = "deserialize_decimal"
221    )]
222    pub h: Decimal,
223    #[serde(
224        serialize_with = "serialize_decimal",
225        deserialize_with = "deserialize_decimal"
226    )]
227    pub l: Decimal,
228    #[serde(
229        serialize_with = "serialize_decimal",
230        deserialize_with = "deserialize_decimal"
231    )]
232    pub c: Decimal,
233    #[serde(
234        serialize_with = "serialize_decimal",
235        deserialize_with = "deserialize_decimal"
236    )]
237    pub v: Decimal,
238}
239
240#[derive(Debug, Clone, Serialize, Deserialize)]
241pub struct WsBook {
242    pub instrument: Ustr,
243    pub is_snapshot: bool,
244    pub seq: Option<u64>,
245    pub checksum: Option<u32>,
246    pub bids: Vec<Level>,
247    pub asks: Vec<Level>,
248    pub ts: i64, // ms
249}
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct Level {
253    #[serde(
254        serialize_with = "serialize_decimal",
255        deserialize_with = "deserialize_decimal"
256    )]
257    pub px: Decimal,
258    #[serde(
259        serialize_with = "serialize_decimal",
260        deserialize_with = "deserialize_decimal"
261    )]
262    pub qty: Decimal,
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct WsMid {
267    pub symbol: String,
268    #[serde(
269        serialize_with = "serialize_decimal",
270        deserialize_with = "deserialize_decimal"
271    )]
272    pub mid: Decimal,
273    pub ts: Option<i64>,
274}
275
276#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct WsFill {
278    pub symbol: String,
279    pub order_id: String,
280    pub trade_id: String,
281    #[serde(
282        serialize_with = "serialize_decimal",
283        deserialize_with = "deserialize_decimal"
284    )]
285    pub px: Decimal,
286    #[serde(
287        serialize_with = "serialize_decimal",
288        deserialize_with = "deserialize_decimal"
289    )]
290    pub qty: Decimal,
291    pub side: Side,
292    pub ts: i64,
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
296pub struct WsFunding {
297    pub symbol: String,
298    #[serde(
299        serialize_with = "serialize_decimal",
300        deserialize_with = "deserialize_decimal"
301    )]
302    pub rate: Decimal,
303    pub ts: i64,
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize)]
307pub struct WsUserEvent {
308    pub event_type: String,
309    pub data: serde_json::Value,
310    pub ts: i64,
311}
312
313fn serialize_decimal<S: Serializer>(d: &Decimal, s: S) -> Result<S::Ok, S::Error> {
314    s.serialize_str(&d.normalize().to_string())
315}
316
317fn deserialize_decimal<'de, D: Deserializer<'de>>(d: D) -> Result<Decimal, D::Error> {
318    let v = serde_json::Value::deserialize(d)?;
319    match v {
320        serde_json::Value::String(s) => Decimal::from_str(&s).map_err(serde::de::Error::custom),
321        serde_json::Value::Number(n) => {
322            Decimal::from_str(&n.to_string()).map_err(serde::de::Error::custom)
323        }
324        _ => Err(serde::de::Error::custom(
325            "expected decimal string or number",
326        )),
327    }
328}
329
330/// Convert normalized outbound message to Hyperliquid native format.
331pub fn encode_outbound(msg: &WsOutbound) -> HyperliquidWsRequest {
332    match msg {
333        WsOutbound::Subscribe { args, id: _ } => {
334            // Convert first SubArg to Hyperliquid SubscriptionRequest
335            if let Some(arg) = args.first() {
336                let subscription = match arg.channel {
337                    HyperliquidWsChannel::Trades => SubscriptionRequest::Trades {
338                        coin: arg.symbol.unwrap_or_default(),
339                    },
340                    HyperliquidWsChannel::L2Book => SubscriptionRequest::L2Book {
341                        coin: arg.symbol.unwrap_or_default(),
342                        n_sig_figs: arg
343                            .params
344                            .as_ref()
345                            .and_then(|p| p.get("nSigFigs"))
346                            .and_then(|v| v.as_u64())
347                            .map(|u| u as u32),
348                        mantissa: arg
349                            .params
350                            .as_ref()
351                            .and_then(|p| p.get("mantissa"))
352                            .and_then(|v| v.as_u64())
353                            .map(|u| u as u32),
354                    },
355                    HyperliquidWsChannel::Bbo => SubscriptionRequest::Bbo {
356                        coin: arg.symbol.unwrap_or_default(),
357                    },
358                    HyperliquidWsChannel::Candle => SubscriptionRequest::Candle {
359                        coin: arg.symbol.unwrap_or_default(),
360                        interval: arg
361                            .params
362                            .as_ref()
363                            .and_then(|p| p.get("interval"))
364                            .and_then(|v| v.as_str())
365                            .and_then(|s| s.parse::<HyperliquidBarInterval>().ok())
366                            .unwrap_or(OneMinute),
367                    },
368                    HyperliquidWsChannel::AllMids => SubscriptionRequest::AllMids {
369                        dex: arg
370                            .params
371                            .as_ref()
372                            .and_then(|p| p.get("dex"))
373                            .and_then(|v| v.as_str())
374                            .map(|s| s.to_string()),
375                    },
376                    HyperliquidWsChannel::Notification => SubscriptionRequest::Notification {
377                        user: arg
378                            .params
379                            .as_ref()
380                            .and_then(|p| p.get("user"))
381                            .and_then(|v| v.as_str())
382                            .unwrap_or_default()
383                            .to_string(),
384                    },
385                    _ => SubscriptionRequest::AllMids { dex: None }, // Default fallback
386                };
387
388                HyperliquidWsRequest::Subscribe { subscription }
389            } else {
390                HyperliquidWsRequest::Ping // Fallback
391            }
392        }
393        WsOutbound::Unsubscribe { args, id: _ } => {
394            if let Some(arg) = args.first() {
395                let subscription = match arg.channel {
396                    HyperliquidWsChannel::Trades => SubscriptionRequest::Trades {
397                        coin: arg.symbol.unwrap_or_default(),
398                    },
399                    HyperliquidWsChannel::L2Book => SubscriptionRequest::L2Book {
400                        coin: arg.symbol.unwrap_or_default(),
401                        n_sig_figs: None,
402                        mantissa: None,
403                    },
404                    HyperliquidWsChannel::Bbo => SubscriptionRequest::Bbo {
405                        coin: arg.symbol.unwrap_or_default(),
406                    },
407                    HyperliquidWsChannel::Candle => SubscriptionRequest::Candle {
408                        coin: arg.symbol.unwrap_or_default(),
409                        interval: arg
410                            .params
411                            .as_ref()
412                            .and_then(|p| p.get("interval"))
413                            .and_then(|v| v.as_str())
414                            .and_then(|s| s.parse::<HyperliquidBarInterval>().ok())
415                            .unwrap_or(OneMinute),
416                    },
417                    _ => SubscriptionRequest::AllMids { dex: None },
418                };
419
420                HyperliquidWsRequest::Unsubscribe { subscription }
421            } else {
422                HyperliquidWsRequest::Ping
423            }
424        }
425        WsOutbound::Ping => HyperliquidWsRequest::Ping,
426        WsOutbound::Post { id, path: _, body } => HyperliquidWsRequest::Post {
427            id: id.parse::<u64>().unwrap_or(1),
428            request: PostRequest::Info {
429                payload: body.clone(),
430            },
431        },
432        WsOutbound::Auth { payload } => HyperliquidWsRequest::Post {
433            id: 1,
434            request: PostRequest::Info {
435                payload: payload.clone(),
436            }, // Simplified for now
437        },
438    }
439}
440
441/// Convert Hyperliquid native message to normalized inbound format.
442pub fn decode_inbound(msg: &HyperliquidWsMessage) -> WsInbound {
443    match msg {
444        HyperliquidWsMessage::SubscriptionResponse { data } => {
445            WsInbound::SubscriptionResponse(SubResp {
446                ok: true,
447                id: None,
448                message: Some(format!("Subscribed to {:?}", data)),
449            })
450        }
451        HyperliquidWsMessage::Post { data } => WsInbound::Post(PostAck {
452            id: data.id.to_string(),
453            ok: true,
454            error: None,
455        }),
456        HyperliquidWsMessage::Trades { data } => {
457            let trades = data
458                .iter()
459                .map(|t| WsTrade {
460                    instrument: t.coin,
461                    px: Decimal::from_str(&t.px).unwrap_or_default(),
462                    qty: Decimal::from_str(&t.sz).unwrap_or_default(),
463                    side: match t.side {
464                        HyperliquidSide::Sell => Side::Sell,
465                        HyperliquidSide::Buy => Side::Buy,
466                    },
467                    ts: t.time as i64,
468                    id: Some(t.tid.to_string()),
469                })
470                .collect();
471            WsInbound::Trades(trades)
472        }
473        HyperliquidWsMessage::L2Book { data } => {
474            let bids = data.levels[0]
475                .iter()
476                .filter(|l| l.n > 0) // Active levels
477                .map(|l| Level {
478                    px: Decimal::from_str(&l.px).unwrap_or_default(),
479                    qty: Decimal::from_str(&l.sz).unwrap_or_default(),
480                })
481                .collect();
482
483            let asks = data.levels[1]
484                .iter()
485                .filter(|l| l.n > 0) // Active levels
486                .map(|l| Level {
487                    px: Decimal::from_str(&l.px).unwrap_or_default(),
488                    qty: Decimal::from_str(&l.sz).unwrap_or_default(),
489                })
490                .collect();
491
492            WsInbound::L2Book(WsBook {
493                instrument: data.coin,
494                is_snapshot: true, // Hyperliquid sends snapshots
495                seq: Some(data.time),
496                checksum: None,
497                bids,
498                asks,
499                ts: data.time as i64,
500            })
501        }
502        HyperliquidWsMessage::Bbo { data } => {
503            // Access bid and ask from the bbo array: [bid, ask]
504            let default_level = WsLevelData {
505                px: "0".to_string(),
506                sz: "0".to_string(),
507                n: 0,
508            };
509            let bid = data.bbo[0].as_ref().unwrap_or(&default_level);
510            let ask = data.bbo[1].as_ref().unwrap_or(&default_level);
511
512            WsInbound::Bbo(WsBbo {
513                instrument: data.coin,
514                bid_px: Decimal::from_str(&bid.px).unwrap_or_default(),
515                bid_qty: Decimal::from_str(&bid.sz).unwrap_or_default(),
516                ask_px: Decimal::from_str(&ask.px).unwrap_or_default(),
517                ask_qty: Decimal::from_str(&ask.sz).unwrap_or_default(),
518                ts: data.time as i64,
519            })
520        }
521        HyperliquidWsMessage::Candle { data } => match HyperliquidBarInterval::from_str(&data.i) {
522            Ok(interval) => {
523                let candle = WsCandle {
524                    instrument: data.s,
525                    interval,
526                    open_ts: data.t as i64,
527                    o: Decimal::from_str(&data.o).unwrap_or_default(),
528                    h: Decimal::from_str(&data.h).unwrap_or_default(),
529                    l: Decimal::from_str(&data.l).unwrap_or_default(),
530                    c: Decimal::from_str(&data.c).unwrap_or_default(),
531                    v: Decimal::from_str(&data.v).unwrap_or_default(),
532                };
533                WsInbound::Candle(vec![candle])
534            }
535            Err(e) => {
536                tracing::error!("Failed to parse candle interval '{}': {}", data.i, e);
537                WsInbound::Unknown
538            }
539        },
540        HyperliquidWsMessage::Notification { data } => WsInbound::Notification(Notice {
541            code: None,
542            msg: Some(data.notification.clone()),
543        }),
544        HyperliquidWsMessage::Pong => WsInbound::Pong(Some(chrono::Utc::now().timestamp_millis())),
545        _ => WsInbound::Unknown,
546    }
547}