nautilus_hyperliquid/websocket/
codec.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::str::FromStr;
17
18use rust_decimal::Decimal;
19use serde::{Deserialize, Deserializer, Serialize, Serializer};
20use ustr::Ustr;
21
22use crate::{
23    common::enums::{
24        HyperliquidBarInterval::{self, OneMinute},
25        HyperliquidSide,
26    },
27    websocket::{
28        HyperliquidWsChannel, HyperliquidWsError,
29        messages::{
30            HyperliquidWsMessage, HyperliquidWsRequest, PostRequest, SubscriptionRequest,
31            WsLevelData,
32        },
33    },
34};
35
36/// Codec for encoding and decoding Hyperliquid WebSocket messages.
37///
38/// This struct provides methods to validate URLs and serialize/deserialize messages,
39/// according to the Hyperliquid WebSocket protocol.
40#[derive(Debug, Default)]
41pub struct HyperliquidCodec;
42
43impl HyperliquidCodec {
44    /// Creates a new Hyperliquid codec instance.
45    pub fn new() -> Self {
46        Self
47    }
48
49    /// Validates that a URL is a proper WebSocket URL.
50    pub fn validate_url(url: &str) -> Result<(), HyperliquidWsError> {
51        if url.starts_with("ws://") || url.starts_with("wss://") {
52            Ok(())
53        } else {
54            Err(HyperliquidWsError::UrlParsing(format!(
55                "URL must start with ws:// or wss://, was: {url}"
56            )))
57        }
58    }
59
60    /// Encodes a WebSocket request to JSON bytes.
61    pub fn encode(&self, request: &HyperliquidWsRequest) -> Result<Vec<u8>, HyperliquidWsError> {
62        serde_json::to_vec(request).map_err(|e| {
63            HyperliquidWsError::MessageSerialization(format!("Failed to serialize request: {e}"))
64        })
65    }
66
67    /// Decodes JSON bytes to a WebSocket message.
68    pub fn decode(&self, data: &[u8]) -> Result<HyperliquidWsMessage, HyperliquidWsError> {
69        serde_json::from_slice(data).map_err(|e| {
70            HyperliquidWsError::MessageDeserialization(format!(
71                "Failed to deserialize message: {e}"
72            ))
73        })
74    }
75}
76
77/// Canonical outbound (mirrors OKX/BitMEX "op + args" pattern).
78#[derive(Debug, Clone, Serialize, Deserialize)]
79#[serde(tag = "op", rename_all = "camelCase")]
80pub enum WsOutbound {
81    Subscribe {
82        args: Vec<SubArg>,
83        id: Option<String>,
84    },
85    Unsubscribe {
86        args: Vec<SubArg>,
87        id: Option<String>,
88    },
89    Ping,
90    Post {
91        id: String,
92        path: String,
93        body: serde_json::Value,
94    },
95    Auth {
96        payload: serde_json::Value,
97    },
98}
99
100// Type aliases for convenience and compatibility with your request
101pub type SubRequest = SubArg;
102pub type TradeSide = Side;
103
104#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
105pub struct SubArg {
106    pub channel: HyperliquidWsChannel,
107    #[serde(default)]
108    pub symbol: Option<Ustr>, // unified symbol (coin in Hyperliquid)
109    #[serde(default)]
110    pub params: Option<serde_json::Value>, // {"interval":"1m","user":"0x123"} etc.
111}
112
113/// Canonical inbound (single tagged enum). Unknown stays debuggable.
114#[derive(Debug, Clone, Serialize, Deserialize)]
115#[serde(tag = "channel", content = "data", rename_all = "camelCase")]
116pub enum WsInbound {
117    Trades(Vec<WsTrade>),
118    L2Book(WsBook),
119    Bbo(WsBbo),
120    Candle(Vec<WsCandle>),
121    AllMids(Vec<WsMid>),
122    UserFills(Vec<WsFill>),
123    UserFundings(Vec<WsFunding>),
124    UserEvents(Vec<WsUserEvent>),
125
126    SubscriptionResponse(SubResp),
127    Pong(Option<i64>),
128    Notification(Notice),
129    Post(PostAck),
130
131    #[serde(other)]
132    Unknown,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct SubResp {
137    pub ok: bool,
138    pub id: Option<String>,
139    pub message: Option<String>,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct Notice {
144    pub code: Option<String>,
145    pub msg: Option<String>,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct PostAck {
150    pub id: String,
151    pub ok: bool,
152    pub error: Option<String>,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct WsTrade {
157    pub instrument: Ustr,
158    #[serde(
159        serialize_with = "serialize_decimal",
160        deserialize_with = "deserialize_decimal"
161    )]
162    pub px: Decimal,
163    #[serde(
164        serialize_with = "serialize_decimal",
165        deserialize_with = "deserialize_decimal"
166    )]
167    pub qty: Decimal,
168    pub side: Side,
169    pub ts: i64, // ms
170    pub id: Option<String>,
171}
172
173#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
174#[serde(rename_all = "lowercase")]
175pub enum Side {
176    Buy,
177    Sell,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct WsBbo {
182    pub instrument: Ustr,
183    #[serde(
184        serialize_with = "serialize_decimal",
185        deserialize_with = "deserialize_decimal"
186    )]
187    pub bid_px: Decimal,
188    #[serde(
189        serialize_with = "serialize_decimal",
190        deserialize_with = "deserialize_decimal"
191    )]
192    pub bid_qty: Decimal,
193    #[serde(
194        serialize_with = "serialize_decimal",
195        deserialize_with = "deserialize_decimal"
196    )]
197    pub ask_px: Decimal,
198    #[serde(
199        serialize_with = "serialize_decimal",
200        deserialize_with = "deserialize_decimal"
201    )]
202    pub ask_qty: Decimal,
203    pub ts: i64, // ms
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct WsCandle {
208    pub instrument: Ustr,
209    pub interval: HyperliquidBarInterval,
210    pub open_ts: i64,
211    #[serde(
212        serialize_with = "serialize_decimal",
213        deserialize_with = "deserialize_decimal"
214    )]
215    pub o: Decimal,
216    #[serde(
217        serialize_with = "serialize_decimal",
218        deserialize_with = "deserialize_decimal"
219    )]
220    pub h: Decimal,
221    #[serde(
222        serialize_with = "serialize_decimal",
223        deserialize_with = "deserialize_decimal"
224    )]
225    pub l: Decimal,
226    #[serde(
227        serialize_with = "serialize_decimal",
228        deserialize_with = "deserialize_decimal"
229    )]
230    pub c: Decimal,
231    #[serde(
232        serialize_with = "serialize_decimal",
233        deserialize_with = "deserialize_decimal"
234    )]
235    pub v: Decimal,
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
239pub struct WsBook {
240    pub instrument: Ustr,
241    pub is_snapshot: bool,
242    pub seq: Option<u64>,
243    pub checksum: Option<u32>,
244    pub bids: Vec<Level>,
245    pub asks: Vec<Level>,
246    pub ts: i64, // ms
247}
248
249#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct Level {
251    #[serde(
252        serialize_with = "serialize_decimal",
253        deserialize_with = "deserialize_decimal"
254    )]
255    pub px: Decimal,
256    #[serde(
257        serialize_with = "serialize_decimal",
258        deserialize_with = "deserialize_decimal"
259    )]
260    pub qty: Decimal,
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize)]
264pub struct WsMid {
265    pub symbol: String,
266    #[serde(
267        serialize_with = "serialize_decimal",
268        deserialize_with = "deserialize_decimal"
269    )]
270    pub mid: Decimal,
271    pub ts: Option<i64>,
272}
273
274#[derive(Debug, Clone, Serialize, Deserialize)]
275pub struct WsFill {
276    pub symbol: String,
277    pub order_id: String,
278    pub trade_id: String,
279    #[serde(
280        serialize_with = "serialize_decimal",
281        deserialize_with = "deserialize_decimal"
282    )]
283    pub px: Decimal,
284    #[serde(
285        serialize_with = "serialize_decimal",
286        deserialize_with = "deserialize_decimal"
287    )]
288    pub qty: Decimal,
289    pub side: Side,
290    pub ts: i64,
291}
292
293#[derive(Debug, Clone, Serialize, Deserialize)]
294pub struct WsFunding {
295    pub symbol: String,
296    #[serde(
297        serialize_with = "serialize_decimal",
298        deserialize_with = "deserialize_decimal"
299    )]
300    pub rate: Decimal,
301    pub ts: i64,
302}
303
304#[derive(Debug, Clone, Serialize, Deserialize)]
305pub struct WsUserEvent {
306    pub event_type: String,
307    pub data: serde_json::Value,
308    pub ts: i64,
309}
310
311fn serialize_decimal<S: Serializer>(d: &Decimal, s: S) -> Result<S::Ok, S::Error> {
312    s.serialize_str(&d.normalize().to_string())
313}
314
315fn deserialize_decimal<'de, D: Deserializer<'de>>(d: D) -> Result<Decimal, D::Error> {
316    let v = serde_json::Value::deserialize(d)?;
317    match v {
318        serde_json::Value::String(s) => Decimal::from_str(&s).map_err(serde::de::Error::custom),
319        serde_json::Value::Number(n) => {
320            Decimal::from_str(&n.to_string()).map_err(serde::de::Error::custom)
321        }
322        _ => Err(serde::de::Error::custom(
323            "expected decimal string or number",
324        )),
325    }
326}
327
328/// Convert normalized outbound message to Hyperliquid native format.
329pub fn encode_outbound(msg: &WsOutbound) -> HyperliquidWsRequest {
330    match msg {
331        WsOutbound::Subscribe { args, id: _ } => {
332            // Convert first SubArg to Hyperliquid SubscriptionRequest
333            if let Some(arg) = args.first() {
334                let subscription = match arg.channel {
335                    HyperliquidWsChannel::Trades => SubscriptionRequest::Trades {
336                        coin: arg.symbol.unwrap_or_default(),
337                    },
338                    HyperliquidWsChannel::L2Book => SubscriptionRequest::L2Book {
339                        coin: arg.symbol.unwrap_or_default(),
340                        n_sig_figs: arg
341                            .params
342                            .as_ref()
343                            .and_then(|p| p.get("nSigFigs"))
344                            .and_then(|v| v.as_u64())
345                            .map(|u| u as u32),
346                        mantissa: arg
347                            .params
348                            .as_ref()
349                            .and_then(|p| p.get("mantissa"))
350                            .and_then(|v| v.as_u64())
351                            .map(|u| u as u32),
352                    },
353                    HyperliquidWsChannel::Bbo => SubscriptionRequest::Bbo {
354                        coin: arg.symbol.unwrap_or_default(),
355                    },
356                    HyperliquidWsChannel::Candle => SubscriptionRequest::Candle {
357                        coin: arg.symbol.unwrap_or_default(),
358                        interval: arg
359                            .params
360                            .as_ref()
361                            .and_then(|p| p.get("interval"))
362                            .and_then(|v| v.as_str())
363                            .and_then(|s| s.parse::<HyperliquidBarInterval>().ok())
364                            .unwrap_or(OneMinute),
365                    },
366                    HyperliquidWsChannel::AllMids => SubscriptionRequest::AllMids {
367                        dex: arg
368                            .params
369                            .as_ref()
370                            .and_then(|p| p.get("dex"))
371                            .and_then(|v| v.as_str())
372                            .map(|s| s.to_string()),
373                    },
374                    HyperliquidWsChannel::Notification => SubscriptionRequest::Notification {
375                        user: arg
376                            .params
377                            .as_ref()
378                            .and_then(|p| p.get("user"))
379                            .and_then(|v| v.as_str())
380                            .unwrap_or_default()
381                            .to_string(),
382                    },
383                    _ => SubscriptionRequest::AllMids { dex: None }, // Default fallback
384                };
385
386                HyperliquidWsRequest::Subscribe { subscription }
387            } else {
388                HyperliquidWsRequest::Ping // Fallback
389            }
390        }
391        WsOutbound::Unsubscribe { args, id: _ } => {
392            if let Some(arg) = args.first() {
393                let subscription = match arg.channel {
394                    HyperliquidWsChannel::Trades => SubscriptionRequest::Trades {
395                        coin: arg.symbol.unwrap_or_default(),
396                    },
397                    HyperliquidWsChannel::L2Book => SubscriptionRequest::L2Book {
398                        coin: arg.symbol.unwrap_or_default(),
399                        n_sig_figs: None,
400                        mantissa: None,
401                    },
402                    HyperliquidWsChannel::Bbo => SubscriptionRequest::Bbo {
403                        coin: arg.symbol.unwrap_or_default(),
404                    },
405                    HyperliquidWsChannel::Candle => SubscriptionRequest::Candle {
406                        coin: arg.symbol.unwrap_or_default(),
407                        interval: arg
408                            .params
409                            .as_ref()
410                            .and_then(|p| p.get("interval"))
411                            .and_then(|v| v.as_str())
412                            .and_then(|s| s.parse::<HyperliquidBarInterval>().ok())
413                            .unwrap_or(OneMinute),
414                    },
415                    _ => SubscriptionRequest::AllMids { dex: None },
416                };
417
418                HyperliquidWsRequest::Unsubscribe { subscription }
419            } else {
420                HyperliquidWsRequest::Ping
421            }
422        }
423        WsOutbound::Ping => HyperliquidWsRequest::Ping,
424        WsOutbound::Post { id, path: _, body } => HyperliquidWsRequest::Post {
425            id: id.parse::<u64>().unwrap_or(1),
426            request: PostRequest::Info {
427                payload: body.clone(),
428            },
429        },
430        WsOutbound::Auth { payload } => HyperliquidWsRequest::Post {
431            id: 1,
432            request: PostRequest::Info {
433                payload: payload.clone(),
434            }, // Simplified for now
435        },
436    }
437}
438
439/// Convert Hyperliquid native message to normalized inbound format.
440pub fn decode_inbound(msg: &HyperliquidWsMessage) -> WsInbound {
441    match msg {
442        HyperliquidWsMessage::SubscriptionResponse { data } => {
443            WsInbound::SubscriptionResponse(SubResp {
444                ok: true,
445                id: None,
446                message: Some(format!("Subscribed to {data:?}")),
447            })
448        }
449        HyperliquidWsMessage::Post { data } => WsInbound::Post(PostAck {
450            id: data.id.to_string(),
451            ok: true,
452            error: None,
453        }),
454        HyperliquidWsMessage::Trades { data } => {
455            let trades = data
456                .iter()
457                .map(|t| WsTrade {
458                    instrument: t.coin,
459                    px: Decimal::from_str(&t.px).unwrap_or_default(),
460                    qty: Decimal::from_str(&t.sz).unwrap_or_default(),
461                    side: match t.side {
462                        HyperliquidSide::Sell => Side::Sell,
463                        HyperliquidSide::Buy => Side::Buy,
464                    },
465                    ts: t.time as i64,
466                    id: Some(t.tid.to_string()),
467                })
468                .collect();
469            WsInbound::Trades(trades)
470        }
471        HyperliquidWsMessage::L2Book { data } => {
472            let bids = data.levels[0]
473                .iter()
474                .filter(|l| l.n > 0) // Active levels
475                .map(|l| Level {
476                    px: Decimal::from_str(&l.px).unwrap_or_default(),
477                    qty: Decimal::from_str(&l.sz).unwrap_or_default(),
478                })
479                .collect();
480
481            let asks = data.levels[1]
482                .iter()
483                .filter(|l| l.n > 0) // Active levels
484                .map(|l| Level {
485                    px: Decimal::from_str(&l.px).unwrap_or_default(),
486                    qty: Decimal::from_str(&l.sz).unwrap_or_default(),
487                })
488                .collect();
489
490            WsInbound::L2Book(WsBook {
491                instrument: data.coin,
492                is_snapshot: true, // Hyperliquid sends snapshots
493                seq: Some(data.time),
494                checksum: None,
495                bids,
496                asks,
497                ts: data.time as i64,
498            })
499        }
500        HyperliquidWsMessage::Bbo { data } => {
501            // Access bid and ask from the bbo array: [bid, ask]
502            let default_level = WsLevelData {
503                px: "0".to_string(),
504                sz: "0".to_string(),
505                n: 0,
506            };
507            let bid = data.bbo[0].as_ref().unwrap_or(&default_level);
508            let ask = data.bbo[1].as_ref().unwrap_or(&default_level);
509
510            WsInbound::Bbo(WsBbo {
511                instrument: data.coin,
512                bid_px: Decimal::from_str(&bid.px).unwrap_or_default(),
513                bid_qty: Decimal::from_str(&bid.sz).unwrap_or_default(),
514                ask_px: Decimal::from_str(&ask.px).unwrap_or_default(),
515                ask_qty: Decimal::from_str(&ask.sz).unwrap_or_default(),
516                ts: data.time as i64,
517            })
518        }
519        HyperliquidWsMessage::Candle { data } => match HyperliquidBarInterval::from_str(&data.i) {
520            Ok(interval) => {
521                let candle = WsCandle {
522                    instrument: data.s,
523                    interval,
524                    open_ts: data.t as i64,
525                    o: Decimal::from_str(&data.o).unwrap_or_default(),
526                    h: Decimal::from_str(&data.h).unwrap_or_default(),
527                    l: Decimal::from_str(&data.l).unwrap_or_default(),
528                    c: Decimal::from_str(&data.c).unwrap_or_default(),
529                    v: Decimal::from_str(&data.v).unwrap_or_default(),
530                };
531                WsInbound::Candle(vec![candle])
532            }
533            Err(e) => {
534                tracing::error!("Failed to parse candle interval '{}': {}", data.i, e);
535                WsInbound::Unknown
536            }
537        },
538        HyperliquidWsMessage::Notification { data } => WsInbound::Notification(Notice {
539            code: None,
540            msg: Some(data.notification.clone()),
541        }),
542        HyperliquidWsMessage::Pong => WsInbound::Pong(Some(chrono::Utc::now().timestamp_millis())),
543        _ => WsInbound::Unknown,
544    }
545}