1use std::str::FromStr;
17
18use rust_decimal::Decimal;
19use serde::{Deserialize, Deserializer, Serialize, Serializer};
20use ustr::Ustr;
21
22use crate::{
23 common::enums::{
24 HyperliquidBarInterval::{self, OneMinute},
25 HyperliquidSide,
26 },
27 websocket::{
28 HyperliquidWsChannel, HyperliquidWsError,
29 messages::{
30 HyperliquidWsMessage, HyperliquidWsRequest, PostRequest, SubscriptionRequest,
31 WsLevelData,
32 },
33 },
34};
35
36#[derive(Debug, Default)]
41pub struct HyperliquidCodec;
42
43impl HyperliquidCodec {
44 pub fn new() -> Self {
46 Self
47 }
48
49 pub fn validate_url(url: &str) -> Result<(), HyperliquidWsError> {
51 if url.starts_with("ws://") || url.starts_with("wss://") {
52 Ok(())
53 } else {
54 Err(HyperliquidWsError::UrlParsing(format!(
55 "URL must start with ws:// or wss://, was: {url}"
56 )))
57 }
58 }
59
60 pub fn encode(&self, request: &HyperliquidWsRequest) -> Result<Vec<u8>, HyperliquidWsError> {
62 serde_json::to_vec(request).map_err(|e| {
63 HyperliquidWsError::MessageSerialization(format!("Failed to serialize request: {e}"))
64 })
65 }
66
67 pub fn decode(&self, data: &[u8]) -> Result<HyperliquidWsMessage, HyperliquidWsError> {
69 serde_json::from_slice(data).map_err(|e| {
70 HyperliquidWsError::MessageDeserialization(format!(
71 "Failed to deserialize message: {e}"
72 ))
73 })
74 }
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79#[serde(tag = "op", rename_all = "camelCase")]
80pub enum WsOutbound {
81 Subscribe {
82 args: Vec<SubArg>,
83 id: Option<String>,
84 },
85 Unsubscribe {
86 args: Vec<SubArg>,
87 id: Option<String>,
88 },
89 Ping,
90 Post {
91 id: String,
92 path: String,
93 body: serde_json::Value,
94 },
95 Auth {
96 payload: serde_json::Value,
97 },
98}
99
100pub type SubRequest = SubArg;
102pub type TradeSide = Side;
103
104#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
105pub struct SubArg {
106 pub channel: HyperliquidWsChannel,
107 #[serde(default)]
108 pub symbol: Option<Ustr>, #[serde(default)]
110 pub params: Option<serde_json::Value>, }
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115#[serde(tag = "channel", content = "data", rename_all = "camelCase")]
116pub enum WsInbound {
117 Trades(Vec<WsTrade>),
118 L2Book(WsBook),
119 Bbo(WsBbo),
120 Candle(Vec<WsCandle>),
121 AllMids(Vec<WsMid>),
122 UserFills(Vec<WsFill>),
123 UserFundings(Vec<WsFunding>),
124 UserEvents(Vec<WsUserEvent>),
125
126 SubscriptionResponse(SubResp),
127 Pong(Option<i64>),
128 Notification(Notice),
129 Post(PostAck),
130
131 #[serde(other)]
132 Unknown,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct SubResp {
137 pub ok: bool,
138 pub id: Option<String>,
139 pub message: Option<String>,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct Notice {
144 pub code: Option<String>,
145 pub msg: Option<String>,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct PostAck {
150 pub id: String,
151 pub ok: bool,
152 pub error: Option<String>,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct WsTrade {
157 pub instrument: Ustr,
158 #[serde(
159 serialize_with = "serialize_decimal",
160 deserialize_with = "deserialize_decimal"
161 )]
162 pub px: Decimal,
163 #[serde(
164 serialize_with = "serialize_decimal",
165 deserialize_with = "deserialize_decimal"
166 )]
167 pub qty: Decimal,
168 pub side: Side,
169 pub ts: i64, pub id: Option<String>,
171}
172
173#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
174#[serde(rename_all = "lowercase")]
175pub enum Side {
176 Buy,
177 Sell,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct WsBbo {
182 pub instrument: Ustr,
183 #[serde(
184 serialize_with = "serialize_decimal",
185 deserialize_with = "deserialize_decimal"
186 )]
187 pub bid_px: Decimal,
188 #[serde(
189 serialize_with = "serialize_decimal",
190 deserialize_with = "deserialize_decimal"
191 )]
192 pub bid_qty: Decimal,
193 #[serde(
194 serialize_with = "serialize_decimal",
195 deserialize_with = "deserialize_decimal"
196 )]
197 pub ask_px: Decimal,
198 #[serde(
199 serialize_with = "serialize_decimal",
200 deserialize_with = "deserialize_decimal"
201 )]
202 pub ask_qty: Decimal,
203 pub ts: i64, }
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct WsCandle {
208 pub instrument: Ustr,
209 pub interval: HyperliquidBarInterval,
210 pub open_ts: i64,
211 #[serde(
212 serialize_with = "serialize_decimal",
213 deserialize_with = "deserialize_decimal"
214 )]
215 pub o: Decimal,
216 #[serde(
217 serialize_with = "serialize_decimal",
218 deserialize_with = "deserialize_decimal"
219 )]
220 pub h: Decimal,
221 #[serde(
222 serialize_with = "serialize_decimal",
223 deserialize_with = "deserialize_decimal"
224 )]
225 pub l: Decimal,
226 #[serde(
227 serialize_with = "serialize_decimal",
228 deserialize_with = "deserialize_decimal"
229 )]
230 pub c: Decimal,
231 #[serde(
232 serialize_with = "serialize_decimal",
233 deserialize_with = "deserialize_decimal"
234 )]
235 pub v: Decimal,
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
239pub struct WsBook {
240 pub instrument: Ustr,
241 pub is_snapshot: bool,
242 pub seq: Option<u64>,
243 pub checksum: Option<u32>,
244 pub bids: Vec<Level>,
245 pub asks: Vec<Level>,
246 pub ts: i64, }
248
249#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct Level {
251 #[serde(
252 serialize_with = "serialize_decimal",
253 deserialize_with = "deserialize_decimal"
254 )]
255 pub px: Decimal,
256 #[serde(
257 serialize_with = "serialize_decimal",
258 deserialize_with = "deserialize_decimal"
259 )]
260 pub qty: Decimal,
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize)]
264pub struct WsMid {
265 pub symbol: String,
266 #[serde(
267 serialize_with = "serialize_decimal",
268 deserialize_with = "deserialize_decimal"
269 )]
270 pub mid: Decimal,
271 pub ts: Option<i64>,
272}
273
274#[derive(Debug, Clone, Serialize, Deserialize)]
275pub struct WsFill {
276 pub symbol: String,
277 pub order_id: String,
278 pub trade_id: String,
279 #[serde(
280 serialize_with = "serialize_decimal",
281 deserialize_with = "deserialize_decimal"
282 )]
283 pub px: Decimal,
284 #[serde(
285 serialize_with = "serialize_decimal",
286 deserialize_with = "deserialize_decimal"
287 )]
288 pub qty: Decimal,
289 pub side: Side,
290 pub ts: i64,
291}
292
293#[derive(Debug, Clone, Serialize, Deserialize)]
294pub struct WsFunding {
295 pub symbol: String,
296 #[serde(
297 serialize_with = "serialize_decimal",
298 deserialize_with = "deserialize_decimal"
299 )]
300 pub rate: Decimal,
301 pub ts: i64,
302}
303
304#[derive(Debug, Clone, Serialize, Deserialize)]
305pub struct WsUserEvent {
306 pub event_type: String,
307 pub data: serde_json::Value,
308 pub ts: i64,
309}
310
311fn serialize_decimal<S: Serializer>(d: &Decimal, s: S) -> Result<S::Ok, S::Error> {
312 s.serialize_str(&d.normalize().to_string())
313}
314
315fn deserialize_decimal<'de, D: Deserializer<'de>>(d: D) -> Result<Decimal, D::Error> {
316 let v = serde_json::Value::deserialize(d)?;
317 match v {
318 serde_json::Value::String(s) => Decimal::from_str(&s).map_err(serde::de::Error::custom),
319 serde_json::Value::Number(n) => {
320 Decimal::from_str(&n.to_string()).map_err(serde::de::Error::custom)
321 }
322 _ => Err(serde::de::Error::custom(
323 "expected decimal string or number",
324 )),
325 }
326}
327
328pub fn encode_outbound(msg: &WsOutbound) -> HyperliquidWsRequest {
330 match msg {
331 WsOutbound::Subscribe { args, id: _ } => {
332 if let Some(arg) = args.first() {
334 let subscription = match arg.channel {
335 HyperliquidWsChannel::Trades => SubscriptionRequest::Trades {
336 coin: arg.symbol.unwrap_or_default(),
337 },
338 HyperliquidWsChannel::L2Book => SubscriptionRequest::L2Book {
339 coin: arg.symbol.unwrap_or_default(),
340 n_sig_figs: arg
341 .params
342 .as_ref()
343 .and_then(|p| p.get("nSigFigs"))
344 .and_then(|v| v.as_u64())
345 .map(|u| u as u32),
346 mantissa: arg
347 .params
348 .as_ref()
349 .and_then(|p| p.get("mantissa"))
350 .and_then(|v| v.as_u64())
351 .map(|u| u as u32),
352 },
353 HyperliquidWsChannel::Bbo => SubscriptionRequest::Bbo {
354 coin: arg.symbol.unwrap_or_default(),
355 },
356 HyperliquidWsChannel::Candle => SubscriptionRequest::Candle {
357 coin: arg.symbol.unwrap_or_default(),
358 interval: arg
359 .params
360 .as_ref()
361 .and_then(|p| p.get("interval"))
362 .and_then(|v| v.as_str())
363 .and_then(|s| s.parse::<HyperliquidBarInterval>().ok())
364 .unwrap_or(OneMinute),
365 },
366 HyperliquidWsChannel::AllMids => SubscriptionRequest::AllMids {
367 dex: arg
368 .params
369 .as_ref()
370 .and_then(|p| p.get("dex"))
371 .and_then(|v| v.as_str())
372 .map(|s| s.to_string()),
373 },
374 HyperliquidWsChannel::Notification => SubscriptionRequest::Notification {
375 user: arg
376 .params
377 .as_ref()
378 .and_then(|p| p.get("user"))
379 .and_then(|v| v.as_str())
380 .unwrap_or_default()
381 .to_string(),
382 },
383 _ => SubscriptionRequest::AllMids { dex: None }, };
385
386 HyperliquidWsRequest::Subscribe { subscription }
387 } else {
388 HyperliquidWsRequest::Ping }
390 }
391 WsOutbound::Unsubscribe { args, id: _ } => {
392 if let Some(arg) = args.first() {
393 let subscription = match arg.channel {
394 HyperliquidWsChannel::Trades => SubscriptionRequest::Trades {
395 coin: arg.symbol.unwrap_or_default(),
396 },
397 HyperliquidWsChannel::L2Book => SubscriptionRequest::L2Book {
398 coin: arg.symbol.unwrap_or_default(),
399 n_sig_figs: None,
400 mantissa: None,
401 },
402 HyperliquidWsChannel::Bbo => SubscriptionRequest::Bbo {
403 coin: arg.symbol.unwrap_or_default(),
404 },
405 HyperliquidWsChannel::Candle => SubscriptionRequest::Candle {
406 coin: arg.symbol.unwrap_or_default(),
407 interval: arg
408 .params
409 .as_ref()
410 .and_then(|p| p.get("interval"))
411 .and_then(|v| v.as_str())
412 .and_then(|s| s.parse::<HyperliquidBarInterval>().ok())
413 .unwrap_or(OneMinute),
414 },
415 _ => SubscriptionRequest::AllMids { dex: None },
416 };
417
418 HyperliquidWsRequest::Unsubscribe { subscription }
419 } else {
420 HyperliquidWsRequest::Ping
421 }
422 }
423 WsOutbound::Ping => HyperliquidWsRequest::Ping,
424 WsOutbound::Post { id, path: _, body } => HyperliquidWsRequest::Post {
425 id: id.parse::<u64>().unwrap_or(1),
426 request: PostRequest::Info {
427 payload: body.clone(),
428 },
429 },
430 WsOutbound::Auth { payload } => HyperliquidWsRequest::Post {
431 id: 1,
432 request: PostRequest::Info {
433 payload: payload.clone(),
434 }, },
436 }
437}
438
439pub fn decode_inbound(msg: &HyperliquidWsMessage) -> WsInbound {
441 match msg {
442 HyperliquidWsMessage::SubscriptionResponse { data } => {
443 WsInbound::SubscriptionResponse(SubResp {
444 ok: true,
445 id: None,
446 message: Some(format!("Subscribed to {data:?}")),
447 })
448 }
449 HyperliquidWsMessage::Post { data } => WsInbound::Post(PostAck {
450 id: data.id.to_string(),
451 ok: true,
452 error: None,
453 }),
454 HyperliquidWsMessage::Trades { data } => {
455 let trades = data
456 .iter()
457 .map(|t| WsTrade {
458 instrument: t.coin,
459 px: Decimal::from_str(&t.px).unwrap_or_default(),
460 qty: Decimal::from_str(&t.sz).unwrap_or_default(),
461 side: match t.side {
462 HyperliquidSide::Sell => Side::Sell,
463 HyperliquidSide::Buy => Side::Buy,
464 },
465 ts: t.time as i64,
466 id: Some(t.tid.to_string()),
467 })
468 .collect();
469 WsInbound::Trades(trades)
470 }
471 HyperliquidWsMessage::L2Book { data } => {
472 let bids = data.levels[0]
473 .iter()
474 .filter(|l| l.n > 0) .map(|l| Level {
476 px: Decimal::from_str(&l.px).unwrap_or_default(),
477 qty: Decimal::from_str(&l.sz).unwrap_or_default(),
478 })
479 .collect();
480
481 let asks = data.levels[1]
482 .iter()
483 .filter(|l| l.n > 0) .map(|l| Level {
485 px: Decimal::from_str(&l.px).unwrap_or_default(),
486 qty: Decimal::from_str(&l.sz).unwrap_or_default(),
487 })
488 .collect();
489
490 WsInbound::L2Book(WsBook {
491 instrument: data.coin,
492 is_snapshot: true, seq: Some(data.time),
494 checksum: None,
495 bids,
496 asks,
497 ts: data.time as i64,
498 })
499 }
500 HyperliquidWsMessage::Bbo { data } => {
501 let default_level = WsLevelData {
503 px: "0".to_string(),
504 sz: "0".to_string(),
505 n: 0,
506 };
507 let bid = data.bbo[0].as_ref().unwrap_or(&default_level);
508 let ask = data.bbo[1].as_ref().unwrap_or(&default_level);
509
510 WsInbound::Bbo(WsBbo {
511 instrument: data.coin,
512 bid_px: Decimal::from_str(&bid.px).unwrap_or_default(),
513 bid_qty: Decimal::from_str(&bid.sz).unwrap_or_default(),
514 ask_px: Decimal::from_str(&ask.px).unwrap_or_default(),
515 ask_qty: Decimal::from_str(&ask.sz).unwrap_or_default(),
516 ts: data.time as i64,
517 })
518 }
519 HyperliquidWsMessage::Candle { data } => match HyperliquidBarInterval::from_str(&data.i) {
520 Ok(interval) => {
521 let candle = WsCandle {
522 instrument: data.s,
523 interval,
524 open_ts: data.t as i64,
525 o: Decimal::from_str(&data.o).unwrap_or_default(),
526 h: Decimal::from_str(&data.h).unwrap_or_default(),
527 l: Decimal::from_str(&data.l).unwrap_or_default(),
528 c: Decimal::from_str(&data.c).unwrap_or_default(),
529 v: Decimal::from_str(&data.v).unwrap_or_default(),
530 };
531 WsInbound::Candle(vec![candle])
532 }
533 Err(e) => {
534 tracing::error!("Failed to parse candle interval '{}': {}", data.i, e);
535 WsInbound::Unknown
536 }
537 },
538 HyperliquidWsMessage::Notification { data } => WsInbound::Notification(Notice {
539 code: None,
540 msg: Some(data.notification.clone()),
541 }),
542 HyperliquidWsMessage::Pong => WsInbound::Pong(Some(chrono::Utc::now().timestamp_millis())),
543 _ => WsInbound::Unknown,
544 }
545}