Skip to main content

nautilus_hyperliquid/websocket/
codec.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::str::FromStr;
17
18use nautilus_core::serialization::{
19    deserialize_decimal, serialize_decimal_as_str as serialize_decimal,
20};
21use rust_decimal::Decimal;
22use serde::{Deserialize, Serialize};
23use ustr::Ustr;
24
25use crate::{
26    common::enums::{
27        HyperliquidBarInterval::{self, OneMinute},
28        HyperliquidSide,
29    },
30    websocket::{
31        HyperliquidWsChannel, HyperliquidWsError,
32        messages::{
33            HyperliquidWsMessage, HyperliquidWsRequest, PostRequest, SubscriptionRequest,
34            WsLevelData,
35        },
36    },
37};
38
39/// Codec for encoding and decoding Hyperliquid WebSocket messages.
40///
41/// This struct provides methods to validate URLs and serialize/deserialize messages,
42/// according to the Hyperliquid WebSocket protocol.
43#[derive(Debug, Default)]
44pub struct HyperliquidCodec;
45
46impl HyperliquidCodec {
47    /// Creates a new Hyperliquid codec instance.
48    pub fn new() -> Self {
49        Self
50    }
51
52    /// Validates that a URL is a proper WebSocket URL.
53    pub fn validate_url(url: &str) -> Result<(), HyperliquidWsError> {
54        if url.starts_with("ws://") || url.starts_with("wss://") {
55            Ok(())
56        } else {
57            Err(HyperliquidWsError::UrlParsing(format!(
58                "URL must start with ws:// or wss://, was: {url}"
59            )))
60        }
61    }
62
63    /// Encodes a WebSocket request to JSON bytes.
64    pub fn encode(&self, request: &HyperliquidWsRequest) -> Result<Vec<u8>, HyperliquidWsError> {
65        serde_json::to_vec(request).map_err(|e| {
66            HyperliquidWsError::MessageSerialization(format!("Failed to serialize request: {e}"))
67        })
68    }
69
70    /// Decodes JSON bytes to a WebSocket message.
71    pub fn decode(&self, data: &[u8]) -> Result<HyperliquidWsMessage, HyperliquidWsError> {
72        serde_json::from_slice(data).map_err(|e| {
73            HyperliquidWsError::MessageDeserialization(format!(
74                "Failed to deserialize message: {e}"
75            ))
76        })
77    }
78}
79
80/// Canonical outbound (mirrors OKX/BitMEX "op + args" pattern).
81#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(tag = "op", rename_all = "camelCase")]
83pub enum WsOutbound {
84    Subscribe {
85        args: Vec<SubArg>,
86        id: Option<String>,
87    },
88    Unsubscribe {
89        args: Vec<SubArg>,
90        id: Option<String>,
91    },
92    Ping,
93    Post {
94        id: String,
95        path: String,
96        body: serde_json::Value,
97    },
98    Auth {
99        payload: serde_json::Value,
100    },
101}
102
103// Type aliases for convenience and compatibility with your request
104pub type SubRequest = SubArg;
105pub type TradeSide = Side;
106
107#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
108pub struct SubArg {
109    pub channel: HyperliquidWsChannel,
110    #[serde(default)]
111    pub symbol: Option<Ustr>, // unified symbol (coin in Hyperliquid)
112    #[serde(default)]
113    pub params: Option<serde_json::Value>, // {"interval":"1m","user":"0x123"} etc.
114}
115
116/// Canonical inbound (single tagged enum). Unknown stays debuggable.
117#[derive(Debug, Clone, Serialize, Deserialize)]
118#[serde(tag = "channel", content = "data", rename_all = "camelCase")]
119pub enum WsInbound {
120    Trades(Vec<WsTrade>),
121    L2Book(WsBook),
122    Bbo(WsBbo),
123    Candle(Vec<WsCandle>),
124    AllMids(Vec<WsMid>),
125    UserFills(Vec<WsFill>),
126    UserFundings(Vec<WsFunding>),
127    UserEvents(Vec<WsUserEvent>),
128    SubscriptionResponse(SubResp),
129    Pong(Option<i64>),
130    Notification(Notice),
131    Post(PostAck),
132    #[serde(other)]
133    Unknown,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct SubResp {
138    pub ok: bool,
139    pub id: Option<String>,
140    pub message: Option<String>,
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct Notice {
145    pub code: Option<String>,
146    pub msg: Option<String>,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct PostAck {
151    pub id: String,
152    pub ok: bool,
153    pub error: Option<String>,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct WsTrade {
158    pub instrument: Ustr,
159    #[serde(
160        serialize_with = "serialize_decimal",
161        deserialize_with = "deserialize_decimal"
162    )]
163    pub px: Decimal,
164    #[serde(
165        serialize_with = "serialize_decimal",
166        deserialize_with = "deserialize_decimal"
167    )]
168    pub qty: Decimal,
169    pub side: Side,
170    pub ts: i64, // ms
171    pub id: Option<String>,
172}
173
174#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
175#[serde(rename_all = "lowercase")]
176pub enum Side {
177    Buy,
178    Sell,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct WsBbo {
183    pub instrument: Ustr,
184    #[serde(
185        serialize_with = "serialize_decimal",
186        deserialize_with = "deserialize_decimal"
187    )]
188    pub bid_px: Decimal,
189    #[serde(
190        serialize_with = "serialize_decimal",
191        deserialize_with = "deserialize_decimal"
192    )]
193    pub bid_qty: Decimal,
194    #[serde(
195        serialize_with = "serialize_decimal",
196        deserialize_with = "deserialize_decimal"
197    )]
198    pub ask_px: Decimal,
199    #[serde(
200        serialize_with = "serialize_decimal",
201        deserialize_with = "deserialize_decimal"
202    )]
203    pub ask_qty: Decimal,
204    pub ts: i64, // ms
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct WsCandle {
209    pub instrument: Ustr,
210    pub interval: HyperliquidBarInterval,
211    pub open_ts: i64,
212    #[serde(
213        serialize_with = "serialize_decimal",
214        deserialize_with = "deserialize_decimal"
215    )]
216    pub o: Decimal,
217    #[serde(
218        serialize_with = "serialize_decimal",
219        deserialize_with = "deserialize_decimal"
220    )]
221    pub h: Decimal,
222    #[serde(
223        serialize_with = "serialize_decimal",
224        deserialize_with = "deserialize_decimal"
225    )]
226    pub l: Decimal,
227    #[serde(
228        serialize_with = "serialize_decimal",
229        deserialize_with = "deserialize_decimal"
230    )]
231    pub c: Decimal,
232    #[serde(
233        serialize_with = "serialize_decimal",
234        deserialize_with = "deserialize_decimal"
235    )]
236    pub v: Decimal,
237}
238
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct WsBook {
241    pub instrument: Ustr,
242    pub is_snapshot: bool,
243    pub seq: Option<u64>,
244    pub checksum: Option<u32>,
245    pub bids: Vec<Level>,
246    pub asks: Vec<Level>,
247    pub ts: i64, // ms
248}
249
250#[derive(Debug, Clone, Serialize, Deserialize)]
251pub struct Level {
252    #[serde(
253        serialize_with = "serialize_decimal",
254        deserialize_with = "deserialize_decimal"
255    )]
256    pub px: Decimal,
257    #[serde(
258        serialize_with = "serialize_decimal",
259        deserialize_with = "deserialize_decimal"
260    )]
261    pub qty: Decimal,
262}
263
264#[derive(Debug, Clone, Serialize, Deserialize)]
265pub struct WsMid {
266    pub symbol: String,
267    #[serde(
268        serialize_with = "serialize_decimal",
269        deserialize_with = "deserialize_decimal"
270    )]
271    pub mid: Decimal,
272    pub ts: Option<i64>,
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
276pub struct WsFill {
277    pub symbol: String,
278    pub order_id: String,
279    pub trade_id: String,
280    #[serde(
281        serialize_with = "serialize_decimal",
282        deserialize_with = "deserialize_decimal"
283    )]
284    pub px: Decimal,
285    #[serde(
286        serialize_with = "serialize_decimal",
287        deserialize_with = "deserialize_decimal"
288    )]
289    pub qty: Decimal,
290    pub side: Side,
291    pub ts: i64,
292}
293
294#[derive(Debug, Clone, Serialize, Deserialize)]
295pub struct WsFunding {
296    pub symbol: String,
297    #[serde(
298        serialize_with = "serialize_decimal",
299        deserialize_with = "deserialize_decimal"
300    )]
301    pub rate: Decimal,
302    pub ts: i64,
303}
304
305#[derive(Debug, Clone, Serialize, Deserialize)]
306pub struct WsUserEvent {
307    pub event_type: String,
308    pub data: serde_json::Value,
309    pub ts: i64,
310}
311
312/// Convert normalized outbound message to Hyperliquid native format.
313pub fn encode_outbound(msg: &WsOutbound) -> HyperliquidWsRequest {
314    match msg {
315        WsOutbound::Subscribe { args, id: _ } => {
316            // Convert first SubArg to Hyperliquid SubscriptionRequest
317            if let Some(arg) = args.first() {
318                let subscription = match arg.channel {
319                    HyperliquidWsChannel::Trades => SubscriptionRequest::Trades {
320                        coin: arg.symbol.unwrap_or_default(),
321                    },
322                    HyperliquidWsChannel::L2Book => SubscriptionRequest::L2Book {
323                        coin: arg.symbol.unwrap_or_default(),
324                        n_sig_figs: arg
325                            .params
326                            .as_ref()
327                            .and_then(|p| p.get("nSigFigs"))
328                            .and_then(|v| v.as_u64())
329                            .map(|u| u as u32),
330                        mantissa: arg
331                            .params
332                            .as_ref()
333                            .and_then(|p| p.get("mantissa"))
334                            .and_then(|v| v.as_u64())
335                            .map(|u| u as u32),
336                    },
337                    HyperliquidWsChannel::Bbo => SubscriptionRequest::Bbo {
338                        coin: arg.symbol.unwrap_or_default(),
339                    },
340                    HyperliquidWsChannel::Candle => SubscriptionRequest::Candle {
341                        coin: arg.symbol.unwrap_or_default(),
342                        interval: arg
343                            .params
344                            .as_ref()
345                            .and_then(|p| p.get("interval"))
346                            .and_then(|v| v.as_str())
347                            .and_then(|s| s.parse::<HyperliquidBarInterval>().ok())
348                            .unwrap_or(OneMinute),
349                    },
350                    HyperliquidWsChannel::AllMids => SubscriptionRequest::AllMids {
351                        dex: arg
352                            .params
353                            .as_ref()
354                            .and_then(|p| p.get("dex"))
355                            .and_then(|v| v.as_str())
356                            .map(|s| s.to_string()),
357                    },
358                    HyperliquidWsChannel::Notification => SubscriptionRequest::Notification {
359                        user: arg
360                            .params
361                            .as_ref()
362                            .and_then(|p| p.get("user"))
363                            .and_then(|v| v.as_str())
364                            .unwrap_or_default()
365                            .to_string(),
366                    },
367                    _ => SubscriptionRequest::AllMids { dex: None }, // Default fallback
368                };
369
370                HyperliquidWsRequest::Subscribe { subscription }
371            } else {
372                HyperliquidWsRequest::Ping // Fallback
373            }
374        }
375        WsOutbound::Unsubscribe { args, id: _ } => {
376            if let Some(arg) = args.first() {
377                let subscription = match arg.channel {
378                    HyperliquidWsChannel::Trades => SubscriptionRequest::Trades {
379                        coin: arg.symbol.unwrap_or_default(),
380                    },
381                    HyperliquidWsChannel::L2Book => SubscriptionRequest::L2Book {
382                        coin: arg.symbol.unwrap_or_default(),
383                        n_sig_figs: None,
384                        mantissa: None,
385                    },
386                    HyperliquidWsChannel::Bbo => SubscriptionRequest::Bbo {
387                        coin: arg.symbol.unwrap_or_default(),
388                    },
389                    HyperliquidWsChannel::Candle => SubscriptionRequest::Candle {
390                        coin: arg.symbol.unwrap_or_default(),
391                        interval: arg
392                            .params
393                            .as_ref()
394                            .and_then(|p| p.get("interval"))
395                            .and_then(|v| v.as_str())
396                            .and_then(|s| s.parse::<HyperliquidBarInterval>().ok())
397                            .unwrap_or(OneMinute),
398                    },
399                    _ => SubscriptionRequest::AllMids { dex: None },
400                };
401
402                HyperliquidWsRequest::Unsubscribe { subscription }
403            } else {
404                HyperliquidWsRequest::Ping
405            }
406        }
407        WsOutbound::Ping => HyperliquidWsRequest::Ping,
408        WsOutbound::Post { id, path: _, body } => HyperliquidWsRequest::Post {
409            id: id.parse::<u64>().unwrap_or(1),
410            request: PostRequest::Info {
411                payload: body.clone(),
412            },
413        },
414        WsOutbound::Auth { payload } => HyperliquidWsRequest::Post {
415            id: 1,
416            request: PostRequest::Info {
417                payload: payload.clone(),
418            }, // Simplified for now
419        },
420    }
421}
422
423/// Convert Hyperliquid native message to normalized inbound format.
424pub fn decode_inbound(msg: &HyperliquidWsMessage) -> WsInbound {
425    match msg {
426        HyperliquidWsMessage::SubscriptionResponse { data } => {
427            WsInbound::SubscriptionResponse(SubResp {
428                ok: true,
429                id: None,
430                message: Some(format!("Subscribed to {data:?}")),
431            })
432        }
433        HyperliquidWsMessage::Post { data } => WsInbound::Post(PostAck {
434            id: data.id.to_string(),
435            ok: true,
436            error: None,
437        }),
438        HyperliquidWsMessage::Trades { data } => {
439            let trades = data
440                .iter()
441                .map(|t| WsTrade {
442                    instrument: t.coin,
443                    px: Decimal::from_str(&t.px).unwrap_or_default(),
444                    qty: Decimal::from_str(&t.sz).unwrap_or_default(),
445                    side: match t.side {
446                        HyperliquidSide::Sell => Side::Sell,
447                        HyperliquidSide::Buy => Side::Buy,
448                    },
449                    ts: t.time as i64,
450                    id: Some(t.tid.to_string()),
451                })
452                .collect();
453            WsInbound::Trades(trades)
454        }
455        HyperliquidWsMessage::L2Book { data } => {
456            let bids = data.levels[0]
457                .iter()
458                .filter(|l| l.n > 0) // Active levels
459                .map(|l| Level {
460                    px: Decimal::from_str(&l.px).unwrap_or_default(),
461                    qty: Decimal::from_str(&l.sz).unwrap_or_default(),
462                })
463                .collect();
464
465            let asks = data.levels[1]
466                .iter()
467                .filter(|l| l.n > 0) // Active levels
468                .map(|l| Level {
469                    px: Decimal::from_str(&l.px).unwrap_or_default(),
470                    qty: Decimal::from_str(&l.sz).unwrap_or_default(),
471                })
472                .collect();
473
474            WsInbound::L2Book(WsBook {
475                instrument: data.coin,
476                is_snapshot: true, // Hyperliquid sends snapshots
477                seq: Some(data.time),
478                checksum: None,
479                bids,
480                asks,
481                ts: data.time as i64,
482            })
483        }
484        HyperliquidWsMessage::Bbo { data } => {
485            // Access bid and ask from the bbo array: [bid, ask]
486            let default_level = WsLevelData {
487                px: "0".to_string(),
488                sz: "0".to_string(),
489                n: 0,
490            };
491            let bid = data.bbo[0].as_ref().unwrap_or(&default_level);
492            let ask = data.bbo[1].as_ref().unwrap_or(&default_level);
493
494            WsInbound::Bbo(WsBbo {
495                instrument: data.coin,
496                bid_px: Decimal::from_str(&bid.px).unwrap_or_default(),
497                bid_qty: Decimal::from_str(&bid.sz).unwrap_or_default(),
498                ask_px: Decimal::from_str(&ask.px).unwrap_or_default(),
499                ask_qty: Decimal::from_str(&ask.sz).unwrap_or_default(),
500                ts: data.time as i64,
501            })
502        }
503        HyperliquidWsMessage::Candle { data } => match HyperliquidBarInterval::from_str(&data.i) {
504            Ok(interval) => {
505                let candle = WsCandle {
506                    instrument: data.s,
507                    interval,
508                    open_ts: data.t as i64,
509                    o: Decimal::from_str(&data.o).unwrap_or_default(),
510                    h: Decimal::from_str(&data.h).unwrap_or_default(),
511                    l: Decimal::from_str(&data.l).unwrap_or_default(),
512                    c: Decimal::from_str(&data.c).unwrap_or_default(),
513                    v: Decimal::from_str(&data.v).unwrap_or_default(),
514                };
515                WsInbound::Candle(vec![candle])
516            }
517            Err(e) => {
518                log::error!("Failed to parse candle interval '{}': {}", data.i, e);
519                WsInbound::Unknown
520            }
521        },
522        HyperliquidWsMessage::Notification { data } => WsInbound::Notification(Notice {
523            code: None,
524            msg: Some(data.notification.clone()),
525        }),
526        HyperliquidWsMessage::Pong => WsInbound::Pong(Some(chrono::Utc::now().timestamp_millis())),
527        _ => WsInbound::Unknown,
528    }
529}