nautilus_hyperliquid/websocket/
codec.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::str::FromStr;
17
18use rust_decimal::Decimal;
19use serde::{Deserialize, Serialize};
20use ustr::Ustr;
21
22use crate::websocket::messages::{
23    HyperliquidWsMessage, HyperliquidWsRequest, PostRequest, SubscriptionRequest, WsLevelData,
24};
25
26/// Canonical outbound (mirrors OKX/BitMEX "op + args" pattern).
27#[derive(Debug, Clone, Serialize, Deserialize)]
28#[serde(tag = "op", rename_all = "camelCase")]
29pub enum WsOutbound {
30    Subscribe {
31        args: Vec<SubArg>,
32        id: Option<String>,
33    },
34    Unsubscribe {
35        args: Vec<SubArg>,
36        id: Option<String>,
37    },
38    Ping,
39    Post {
40        id: String,
41        path: String,
42        body: serde_json::Value,
43    },
44    Auth {
45        payload: serde_json::Value,
46    },
47}
48
49// Type aliases for convenience and compatibility with your request
50pub type SubRequest = SubArg;
51pub type TradeSide = Side;
52
53#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
54pub struct SubArg {
55    pub channel: String, // e.g. "trades" | "l2Book" | "bbo" | "candle"
56    #[serde(default)]
57    pub symbol: Option<Ustr>, // unified symbol (coin in Hyperliquid)
58    #[serde(default)]
59    pub params: Option<serde_json::Value>, // {"interval":"1m","user":"0x123"} etc.
60}
61
62/// Canonical inbound (single tagged enum). Unknown stays debuggable.
63#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(tag = "channel", content = "data", rename_all = "camelCase")]
65pub enum WsInbound {
66    Trades(Vec<WsTrade>),
67    L2Book(WsBook),
68    Bbo(WsBbo),
69    Candle(Vec<WsCandle>),
70    AllMids(Vec<WsMid>),
71    UserFills(Vec<WsFill>),
72    UserFundings(Vec<WsFunding>),
73    UserEvents(Vec<WsUserEvent>),
74
75    SubscriptionResponse(SubResp),
76    Pong(Option<i64>),
77    Notification(Notice),
78    Post(PostAck),
79
80    #[serde(other)]
81    Unknown,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct SubResp {
86    pub ok: bool,
87    pub id: Option<String>,
88    pub message: Option<String>,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct Notice {
93    pub code: Option<String>,
94    pub msg: Option<String>,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct PostAck {
99    pub id: String,
100    pub ok: bool,
101    pub error: Option<String>,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct WsTrade {
106    pub instrument: Ustr,
107    #[serde(with = "decimal_serde")]
108    pub px: Decimal,
109    #[serde(with = "decimal_serde")]
110    pub qty: Decimal,
111    pub side: Side,
112    pub ts: i64, // ms
113    pub id: Option<String>,
114}
115
116#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
117#[serde(rename_all = "lowercase")]
118pub enum Side {
119    Buy,
120    Sell,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct WsBbo {
125    pub instrument: Ustr,
126    #[serde(with = "decimal_serde")]
127    pub bid_px: Decimal,
128    #[serde(with = "decimal_serde")]
129    pub bid_qty: Decimal,
130    #[serde(with = "decimal_serde")]
131    pub ask_px: Decimal,
132    #[serde(with = "decimal_serde")]
133    pub ask_qty: Decimal,
134    pub ts: i64, // ms
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct WsCandle {
139    pub instrument: Ustr,
140    pub interval: String, // "1m", "5m", ...
141    pub open_ts: i64,
142    #[serde(with = "decimal_serde")]
143    pub o: Decimal,
144    #[serde(with = "decimal_serde")]
145    pub h: Decimal,
146    #[serde(with = "decimal_serde")]
147    pub l: Decimal,
148    #[serde(with = "decimal_serde")]
149    pub c: Decimal,
150    #[serde(with = "decimal_serde")]
151    pub v: Decimal,
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct WsBook {
156    pub instrument: Ustr,
157    pub is_snapshot: bool,
158    pub seq: Option<u64>,
159    pub checksum: Option<u32>,
160    pub bids: Vec<Level>,
161    pub asks: Vec<Level>,
162    pub ts: i64, // ms
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct Level {
167    #[serde(with = "decimal_serde")]
168    pub px: Decimal,
169    #[serde(with = "decimal_serde")]
170    pub qty: Decimal,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct WsMid {
175    pub symbol: String,
176    #[serde(with = "decimal_serde")]
177    pub mid: Decimal,
178    pub ts: Option<i64>,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct WsFill {
183    pub symbol: String,
184    pub order_id: String,
185    pub trade_id: String,
186    #[serde(with = "decimal_serde")]
187    pub px: Decimal,
188    #[serde(with = "decimal_serde")]
189    pub qty: Decimal,
190    pub side: Side,
191    pub ts: i64,
192}
193
194#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct WsFunding {
196    pub symbol: String,
197    #[serde(with = "decimal_serde")]
198    pub rate: Decimal,
199    pub ts: i64,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct WsUserEvent {
204    pub event_type: String,
205    pub data: serde_json::Value,
206    pub ts: i64,
207}
208
209// Decimal serde module
210mod decimal_serde {
211    use serde::{Deserializer, Serializer, de::Error};
212
213    use super::*;
214
215    pub fn serialize<S: Serializer>(d: &Decimal, s: S) -> Result<S::Ok, S::Error> {
216        s.serialize_str(&d.normalize().to_string())
217    }
218
219    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Decimal, D::Error> {
220        let v = serde_json::Value::deserialize(d)?;
221        match v {
222            serde_json::Value::String(s) => Decimal::from_str(&s).map_err(Error::custom),
223            serde_json::Value::Number(n) => {
224                Decimal::from_str(&n.to_string()).map_err(Error::custom)
225            }
226            _ => Err(Error::custom("expected decimal string or number")),
227        }
228    }
229}
230
231/// Convert normalized outbound message to Hyperliquid native format.
232pub fn encode_outbound(msg: &WsOutbound) -> HyperliquidWsRequest {
233    match msg {
234        WsOutbound::Subscribe { args, id: _ } => {
235            // Convert first SubArg to Hyperliquid SubscriptionRequest
236            if let Some(arg) = args.first() {
237                let subscription = match arg.channel.as_str() {
238                    "trades" => SubscriptionRequest::Trades {
239                        coin: arg.symbol.unwrap_or_default(),
240                    },
241                    "l2Book" => SubscriptionRequest::L2Book {
242                        coin: arg.symbol.unwrap_or_default(),
243                        n_sig_figs: arg
244                            .params
245                            .as_ref()
246                            .and_then(|p| p.get("nSigFigs"))
247                            .and_then(|v| v.as_u64())
248                            .map(|u| u as u32),
249                        mantissa: arg
250                            .params
251                            .as_ref()
252                            .and_then(|p| p.get("mantissa"))
253                            .and_then(|v| v.as_u64())
254                            .map(|u| u as u32),
255                    },
256                    "bbo" => SubscriptionRequest::Bbo {
257                        coin: arg.symbol.unwrap_or_default(),
258                    },
259                    "candle" => SubscriptionRequest::Candle {
260                        coin: arg.symbol.unwrap_or_default(),
261                        interval: arg
262                            .params
263                            .as_ref()
264                            .and_then(|p| p.get("interval"))
265                            .and_then(|v| v.as_str())
266                            .unwrap_or("1m")
267                            .to_string(),
268                    },
269                    "allMids" => SubscriptionRequest::AllMids {
270                        dex: arg
271                            .params
272                            .as_ref()
273                            .and_then(|p| p.get("dex"))
274                            .and_then(|v| v.as_str())
275                            .map(|s| s.to_string()),
276                    },
277                    "notification" => SubscriptionRequest::Notification {
278                        user: arg
279                            .params
280                            .as_ref()
281                            .and_then(|p| p.get("user"))
282                            .and_then(|v| v.as_str())
283                            .unwrap_or_default()
284                            .to_string(),
285                    },
286                    _ => SubscriptionRequest::AllMids { dex: None }, // Default fallback
287                };
288
289                HyperliquidWsRequest::Subscribe { subscription }
290            } else {
291                HyperliquidWsRequest::Ping // Fallback
292            }
293        }
294        WsOutbound::Unsubscribe { args, id: _ } => {
295            if let Some(arg) = args.first() {
296                let subscription = match arg.channel.as_str() {
297                    "trades" => SubscriptionRequest::Trades {
298                        coin: arg.symbol.unwrap_or_default(),
299                    },
300                    "l2Book" => SubscriptionRequest::L2Book {
301                        coin: arg.symbol.unwrap_or_default(),
302                        n_sig_figs: None,
303                        mantissa: None,
304                    },
305                    "bbo" => SubscriptionRequest::Bbo {
306                        coin: arg.symbol.unwrap_or_default(),
307                    },
308                    "candle" => SubscriptionRequest::Candle {
309                        coin: arg.symbol.unwrap_or_default(),
310                        interval: arg
311                            .params
312                            .as_ref()
313                            .and_then(|p| p.get("interval"))
314                            .and_then(|v| v.as_str())
315                            .unwrap_or("1m")
316                            .to_string(),
317                    },
318                    _ => SubscriptionRequest::AllMids { dex: None },
319                };
320
321                HyperliquidWsRequest::Unsubscribe { subscription }
322            } else {
323                HyperliquidWsRequest::Ping
324            }
325        }
326        WsOutbound::Ping => HyperliquidWsRequest::Ping,
327        WsOutbound::Post { id, path: _, body } => HyperliquidWsRequest::Post {
328            id: id.parse::<u64>().unwrap_or(1),
329            request: PostRequest::Info {
330                payload: body.clone(),
331            },
332        },
333        WsOutbound::Auth { payload } => HyperliquidWsRequest::Post {
334            id: 1,
335            request: PostRequest::Info {
336                payload: payload.clone(),
337            }, // Simplified for now
338        },
339    }
340}
341
342/// Convert Hyperliquid native message to normalized inbound format.
343pub fn decode_inbound(msg: &HyperliquidWsMessage) -> WsInbound {
344    match msg {
345        HyperliquidWsMessage::SubscriptionResponse { data } => {
346            WsInbound::SubscriptionResponse(SubResp {
347                ok: true,
348                id: None,
349                message: Some(format!("Subscribed to {:?}", data)),
350            })
351        }
352        HyperliquidWsMessage::Post { data } => WsInbound::Post(PostAck {
353            id: data.id.to_string(),
354            ok: true,
355            error: None,
356        }),
357        HyperliquidWsMessage::Trades { data } => {
358            let trades = data
359                .iter()
360                .map(|t| WsTrade {
361                    instrument: t.coin,
362                    px: Decimal::from_str(&t.px).unwrap_or_default(),
363                    qty: Decimal::from_str(&t.sz).unwrap_or_default(),
364                    side: if t.side == "A" { Side::Sell } else { Side::Buy },
365                    ts: t.time as i64,
366                    id: Some(t.tid.to_string()),
367                })
368                .collect();
369            WsInbound::Trades(trades)
370        }
371        HyperliquidWsMessage::L2Book { data } => {
372            let bids = data.levels[0]
373                .iter()
374                .filter(|l| l.n > 0) // Active levels
375                .map(|l| Level {
376                    px: Decimal::from_str(&l.px).unwrap_or_default(),
377                    qty: Decimal::from_str(&l.sz).unwrap_or_default(),
378                })
379                .collect();
380
381            let asks = data.levels[1]
382                .iter()
383                .filter(|l| l.n > 0) // Active levels
384                .map(|l| Level {
385                    px: Decimal::from_str(&l.px).unwrap_or_default(),
386                    qty: Decimal::from_str(&l.sz).unwrap_or_default(),
387                })
388                .collect();
389
390            WsInbound::L2Book(WsBook {
391                instrument: data.coin,
392                is_snapshot: true, // Hyperliquid sends snapshots
393                seq: Some(data.time),
394                checksum: None,
395                bids,
396                asks,
397                ts: data.time as i64,
398            })
399        }
400        HyperliquidWsMessage::Bbo { data } => {
401            // Access bid and ask from the bbo array: [bid, ask]
402            let default_level = WsLevelData {
403                px: "0".to_string(),
404                sz: "0".to_string(),
405                n: 0,
406            };
407            let bid = data.bbo[0].as_ref().unwrap_or(&default_level);
408            let ask = data.bbo[1].as_ref().unwrap_or(&default_level);
409
410            WsInbound::Bbo(WsBbo {
411                instrument: data.coin,
412                bid_px: Decimal::from_str(&bid.px).unwrap_or_default(),
413                bid_qty: Decimal::from_str(&bid.sz).unwrap_or_default(),
414                ask_px: Decimal::from_str(&ask.px).unwrap_or_default(),
415                ask_qty: Decimal::from_str(&ask.sz).unwrap_or_default(),
416                ts: data.time as i64,
417            })
418        }
419        HyperliquidWsMessage::Candle { data } => {
420            let candle = WsCandle {
421                instrument: data.s,
422                interval: data.i.clone(),
423                open_ts: data.t as i64,
424                o: Decimal::from_str(&data.o).unwrap_or_default(),
425                h: Decimal::from_str(&data.h).unwrap_or_default(),
426                l: Decimal::from_str(&data.l).unwrap_or_default(),
427                c: Decimal::from_str(&data.c).unwrap_or_default(),
428                v: Decimal::from_str(&data.v).unwrap_or_default(),
429            };
430            WsInbound::Candle(vec![candle])
431        }
432        HyperliquidWsMessage::Notification { data } => WsInbound::Notification(Notice {
433            code: None,
434            msg: Some(data.notification.clone()),
435        }),
436        HyperliquidWsMessage::Pong => WsInbound::Pong(Some(chrono::Utc::now().timestamp_millis())),
437        _ => WsInbound::Unknown,
438    }
439}