nautilus_hyperliquid/websocket/
codec.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::str::FromStr;
17
18use rust_decimal::Decimal;
19use serde::{Deserialize, Serialize};
20
21use crate::websocket::messages::{
22    HyperliquidWsMessage, HyperliquidWsRequest, PostRequest, SubscriptionRequest, WsLevelData,
23};
24
25/// Canonical outbound (mirrors OKX/BitMEX "op + args" pattern).
26#[derive(Debug, Clone, Serialize, Deserialize)]
27#[serde(tag = "op", rename_all = "camelCase")]
28pub enum WsOutbound {
29    Subscribe {
30        args: Vec<SubArg>,
31        id: Option<String>,
32    },
33    Unsubscribe {
34        args: Vec<SubArg>,
35        id: Option<String>,
36    },
37    Ping,
38    Post {
39        id: String,
40        path: String,
41        body: serde_json::Value,
42    },
43    Auth {
44        payload: serde_json::Value,
45    },
46}
47
48// Type aliases for convenience and compatibility with your request
49pub type SubRequest = SubArg;
50pub type TradeSide = Side;
51
52#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
53pub struct SubArg {
54    pub channel: String, // e.g. "trades" | "l2Book" | "bbo" | "candle"
55    #[serde(default)]
56    pub symbol: Option<String>, // unified symbol (coin in Hyperliquid)
57    #[serde(default)]
58    pub params: Option<serde_json::Value>, // {"interval":"1m","user":"0x123"} etc.
59}
60
61/// Canonical inbound (single tagged enum). Unknown stays debuggable.
62#[derive(Debug, Clone, Serialize, Deserialize)]
63#[serde(tag = "channel", content = "data", rename_all = "camelCase")]
64pub enum WsInbound {
65    Trades(Vec<WsTrade>),
66    L2Book(WsBook),
67    Bbo(WsBbo),
68    Candle(Vec<WsCandle>),
69    AllMids(Vec<WsMid>),
70    UserFills(Vec<WsFill>),
71    UserFundings(Vec<WsFunding>),
72    UserEvents(Vec<WsUserEvent>),
73
74    SubscriptionResponse(SubResp),
75    Pong(Option<i64>),
76    Notification(Notice),
77    Post(PostAck),
78
79    #[serde(other)]
80    Unknown,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct SubResp {
85    pub ok: bool,
86    pub id: Option<String>,
87    pub message: Option<String>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct Notice {
92    pub code: Option<String>,
93    pub msg: Option<String>,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct PostAck {
98    pub id: String,
99    pub ok: bool,
100    pub error: Option<String>,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct WsTrade {
105    pub instrument: String,
106    #[serde(with = "decimal_serde")]
107    pub px: Decimal,
108    #[serde(with = "decimal_serde")]
109    pub qty: Decimal,
110    pub side: Side,
111    pub ts: i64, // ms
112    pub id: Option<String>,
113}
114
115#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
116#[serde(rename_all = "lowercase")]
117pub enum Side {
118    Buy,
119    Sell,
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct WsBbo {
124    pub instrument: String,
125    #[serde(with = "decimal_serde")]
126    pub bid_px: Decimal,
127    #[serde(with = "decimal_serde")]
128    pub bid_qty: Decimal,
129    #[serde(with = "decimal_serde")]
130    pub ask_px: Decimal,
131    #[serde(with = "decimal_serde")]
132    pub ask_qty: Decimal,
133    pub ts: i64, // ms
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct WsCandle {
138    pub instrument: String,
139    pub interval: String, // "1m", "5m", ...
140    pub open_ts: i64,
141    #[serde(with = "decimal_serde")]
142    pub o: Decimal,
143    #[serde(with = "decimal_serde")]
144    pub h: Decimal,
145    #[serde(with = "decimal_serde")]
146    pub l: Decimal,
147    #[serde(with = "decimal_serde")]
148    pub c: Decimal,
149    #[serde(with = "decimal_serde")]
150    pub v: Decimal,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct WsBook {
155    pub instrument: String,
156    pub is_snapshot: bool,
157    pub seq: Option<u64>,
158    pub checksum: Option<u32>,
159    pub bids: Vec<Level>,
160    pub asks: Vec<Level>,
161    pub ts: i64, // ms
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct Level {
166    #[serde(with = "decimal_serde")]
167    pub px: Decimal,
168    #[serde(with = "decimal_serde")]
169    pub qty: Decimal,
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct WsMid {
174    pub symbol: String,
175    #[serde(with = "decimal_serde")]
176    pub mid: Decimal,
177    pub ts: Option<i64>,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct WsFill {
182    pub symbol: String,
183    pub order_id: String,
184    pub trade_id: String,
185    #[serde(with = "decimal_serde")]
186    pub px: Decimal,
187    #[serde(with = "decimal_serde")]
188    pub qty: Decimal,
189    pub side: Side,
190    pub ts: i64,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct WsFunding {
195    pub symbol: String,
196    #[serde(with = "decimal_serde")]
197    pub rate: Decimal,
198    pub ts: i64,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct WsUserEvent {
203    pub event_type: String,
204    pub data: serde_json::Value,
205    pub ts: i64,
206}
207
208// Decimal serde module
209mod decimal_serde {
210    use serde::{Deserializer, Serializer, de::Error};
211
212    use super::*;
213
214    pub fn serialize<S: Serializer>(d: &Decimal, s: S) -> Result<S::Ok, S::Error> {
215        s.serialize_str(&d.normalize().to_string())
216    }
217
218    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Decimal, D::Error> {
219        let v = serde_json::Value::deserialize(d)?;
220        match v {
221            serde_json::Value::String(s) => Decimal::from_str(&s).map_err(Error::custom),
222            serde_json::Value::Number(n) => {
223                Decimal::from_str(&n.to_string()).map_err(Error::custom)
224            }
225            _ => Err(Error::custom("expected decimal string or number")),
226        }
227    }
228}
229
230/// Convert normalized outbound message to Hyperliquid native format.
231pub fn encode_outbound(msg: &WsOutbound) -> HyperliquidWsRequest {
232    match msg {
233        WsOutbound::Subscribe { args, id: _ } => {
234            // Convert first SubArg to Hyperliquid SubscriptionRequest
235            if let Some(arg) = args.first() {
236                let subscription = match arg.channel.as_str() {
237                    "trades" => SubscriptionRequest::Trades {
238                        coin: arg.symbol.clone().unwrap_or_default(),
239                    },
240                    "l2Book" => SubscriptionRequest::L2Book {
241                        coin: arg.symbol.clone().unwrap_or_default(),
242                        n_sig_figs: arg
243                            .params
244                            .as_ref()
245                            .and_then(|p| p.get("nSigFigs"))
246                            .and_then(|v| v.as_u64())
247                            .map(|u| u as u32),
248                        mantissa: arg
249                            .params
250                            .as_ref()
251                            .and_then(|p| p.get("mantissa"))
252                            .and_then(|v| v.as_u64())
253                            .map(|u| u as u32),
254                    },
255                    "bbo" => SubscriptionRequest::Bbo {
256                        coin: arg.symbol.clone().unwrap_or_default(),
257                    },
258                    "candle" => SubscriptionRequest::Candle {
259                        coin: arg.symbol.clone().unwrap_or_default(),
260                        interval: arg
261                            .params
262                            .as_ref()
263                            .and_then(|p| p.get("interval"))
264                            .and_then(|v| v.as_str())
265                            .unwrap_or("1m")
266                            .to_string(),
267                    },
268                    "allMids" => SubscriptionRequest::AllMids {
269                        dex: arg
270                            .params
271                            .as_ref()
272                            .and_then(|p| p.get("dex"))
273                            .and_then(|v| v.as_str())
274                            .map(|s| s.to_string()),
275                    },
276                    "notification" => SubscriptionRequest::Notification {
277                        user: arg
278                            .params
279                            .as_ref()
280                            .and_then(|p| p.get("user"))
281                            .and_then(|v| v.as_str())
282                            .unwrap_or_default()
283                            .to_string(),
284                    },
285                    _ => SubscriptionRequest::AllMids { dex: None }, // Default fallback
286                };
287
288                HyperliquidWsRequest::Subscribe { subscription }
289            } else {
290                HyperliquidWsRequest::Ping // Fallback
291            }
292        }
293        WsOutbound::Unsubscribe { args, id: _ } => {
294            if let Some(arg) = args.first() {
295                let subscription = match arg.channel.as_str() {
296                    "trades" => SubscriptionRequest::Trades {
297                        coin: arg.symbol.clone().unwrap_or_default(),
298                    },
299                    "l2Book" => SubscriptionRequest::L2Book {
300                        coin: arg.symbol.clone().unwrap_or_default(),
301                        n_sig_figs: None,
302                        mantissa: None,
303                    },
304                    "bbo" => SubscriptionRequest::Bbo {
305                        coin: arg.symbol.clone().unwrap_or_default(),
306                    },
307                    "candle" => SubscriptionRequest::Candle {
308                        coin: arg.symbol.clone().unwrap_or_default(),
309                        interval: arg
310                            .params
311                            .as_ref()
312                            .and_then(|p| p.get("interval"))
313                            .and_then(|v| v.as_str())
314                            .unwrap_or("1m")
315                            .to_string(),
316                    },
317                    _ => SubscriptionRequest::AllMids { dex: None },
318                };
319
320                HyperliquidWsRequest::Unsubscribe { subscription }
321            } else {
322                HyperliquidWsRequest::Ping
323            }
324        }
325        WsOutbound::Ping => HyperliquidWsRequest::Ping,
326        WsOutbound::Post { id, path: _, body } => HyperliquidWsRequest::Post {
327            id: id.parse::<u64>().unwrap_or(1),
328            request: PostRequest::Info {
329                payload: body.clone(),
330            },
331        },
332        WsOutbound::Auth { payload } => HyperliquidWsRequest::Post {
333            id: 1,
334            request: PostRequest::Info {
335                payload: payload.clone(),
336            }, // Simplified for now
337        },
338    }
339}
340
341/// Convert Hyperliquid native message to normalized inbound format.
342pub fn decode_inbound(msg: &HyperliquidWsMessage) -> WsInbound {
343    match msg {
344        HyperliquidWsMessage::SubscriptionResponse { data } => {
345            WsInbound::SubscriptionResponse(SubResp {
346                ok: true,
347                id: None,
348                message: Some(format!("Subscribed to {:?}", data)),
349            })
350        }
351        HyperliquidWsMessage::Post { data } => WsInbound::Post(PostAck {
352            id: data.id.to_string(),
353            ok: true,
354            error: None,
355        }),
356        HyperliquidWsMessage::Trades { data } => {
357            let trades = data
358                .iter()
359                .map(|t| WsTrade {
360                    instrument: t.coin.clone(),
361                    px: Decimal::from_str(&t.px).unwrap_or_default(),
362                    qty: Decimal::from_str(&t.sz).unwrap_or_default(),
363                    side: if t.side == "A" { Side::Sell } else { Side::Buy },
364                    ts: t.time as i64,
365                    id: Some(t.tid.to_string()),
366                })
367                .collect();
368            WsInbound::Trades(trades)
369        }
370        HyperliquidWsMessage::L2Book { data } => {
371            let bids = data.levels[0]
372                .iter()
373                .filter(|l| l.n > 0) // Active levels
374                .map(|l| Level {
375                    px: Decimal::from_str(&l.px).unwrap_or_default(),
376                    qty: Decimal::from_str(&l.sz).unwrap_or_default(),
377                })
378                .collect();
379
380            let asks = data.levels[1]
381                .iter()
382                .filter(|l| l.n > 0) // Active levels
383                .map(|l| Level {
384                    px: Decimal::from_str(&l.px).unwrap_or_default(),
385                    qty: Decimal::from_str(&l.sz).unwrap_or_default(),
386                })
387                .collect();
388
389            WsInbound::L2Book(WsBook {
390                instrument: data.coin.clone(),
391                is_snapshot: true, // Hyperliquid sends snapshots
392                seq: Some(data.time),
393                checksum: None,
394                bids,
395                asks,
396                ts: data.time as i64,
397            })
398        }
399        HyperliquidWsMessage::Bbo { data } => {
400            // Access bid and ask from the bbo array: [bid, ask]
401            let default_level = WsLevelData {
402                px: "0".to_string(),
403                sz: "0".to_string(),
404                n: 0,
405            };
406            let bid = data.bbo[0].as_ref().unwrap_or(&default_level);
407            let ask = data.bbo[1].as_ref().unwrap_or(&default_level);
408
409            WsInbound::Bbo(WsBbo {
410                instrument: data.coin.clone(),
411                bid_px: Decimal::from_str(&bid.px).unwrap_or_default(),
412                bid_qty: Decimal::from_str(&bid.sz).unwrap_or_default(),
413                ask_px: Decimal::from_str(&ask.px).unwrap_or_default(),
414                ask_qty: Decimal::from_str(&ask.sz).unwrap_or_default(),
415                ts: data.time as i64,
416            })
417        }
418        HyperliquidWsMessage::Candle { data } => {
419            let candles = data
420                .iter()
421                .map(|c| WsCandle {
422                    instrument: c.s.clone(),
423                    interval: c.i.clone(),
424                    open_ts: c.t as i64,
425                    o: Decimal::try_from(c.o).unwrap_or_default(),
426                    h: Decimal::try_from(c.h).unwrap_or_default(),
427                    l: Decimal::try_from(c.l).unwrap_or_default(),
428                    c: Decimal::try_from(c.c).unwrap_or_default(),
429                    v: Decimal::try_from(c.v).unwrap_or_default(),
430                })
431                .collect();
432            WsInbound::Candle(candles)
433        }
434        HyperliquidWsMessage::Notification { data } => WsInbound::Notification(Notice {
435            code: None,
436            msg: Some(data.notification.clone()),
437        }),
438        HyperliquidWsMessage::Pong => WsInbound::Pong(Some(chrono::Utc::now().timestamp_millis())),
439        _ => WsInbound::Unknown,
440    }
441}