1use std::str::FromStr;
17
18use rust_decimal::Decimal;
19use serde::{Deserialize, Deserializer, Serialize, Serializer};
20use ustr::Ustr;
21
22use crate::{
23 common::enums::{
24 HyperliquidBarInterval::{self, OneMinute},
25 HyperliquidSide,
26 },
27 websocket::{
28 HyperliquidWsChannel, HyperliquidWsError,
29 messages::{
30 HyperliquidWsMessage, HyperliquidWsRequest, PostRequest, SubscriptionRequest,
31 WsLevelData,
32 },
33 },
34};
35
36#[derive(Debug, Default)]
41pub struct HyperliquidCodec;
42
43impl HyperliquidCodec {
44 pub fn new() -> Self {
46 Self
47 }
48
49 pub fn validate_url(url: &str) -> Result<(), HyperliquidWsError> {
51 if url.starts_with("ws://") || url.starts_with("wss://") {
52 Ok(())
53 } else {
54 Err(HyperliquidWsError::UrlParsing(format!(
55 "URL must start with ws:// or wss://, was: {}",
56 url
57 )))
58 }
59 }
60
61 pub fn encode(&self, request: &HyperliquidWsRequest) -> Result<Vec<u8>, HyperliquidWsError> {
63 serde_json::to_vec(request).map_err(|e| {
64 HyperliquidWsError::MessageSerialization(format!("Failed to serialize request: {e}"))
65 })
66 }
67
68 pub fn decode(&self, data: &[u8]) -> Result<HyperliquidWsMessage, HyperliquidWsError> {
70 serde_json::from_slice(data).map_err(|e| {
71 HyperliquidWsError::MessageDeserialization(format!(
72 "Failed to deserialize message: {}",
73 e
74 ))
75 })
76 }
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81#[serde(tag = "op", rename_all = "camelCase")]
82pub enum WsOutbound {
83 Subscribe {
84 args: Vec<SubArg>,
85 id: Option<String>,
86 },
87 Unsubscribe {
88 args: Vec<SubArg>,
89 id: Option<String>,
90 },
91 Ping,
92 Post {
93 id: String,
94 path: String,
95 body: serde_json::Value,
96 },
97 Auth {
98 payload: serde_json::Value,
99 },
100}
101
102pub type SubRequest = SubArg;
104pub type TradeSide = Side;
105
106#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
107pub struct SubArg {
108 pub channel: HyperliquidWsChannel,
109 #[serde(default)]
110 pub symbol: Option<Ustr>, #[serde(default)]
112 pub params: Option<serde_json::Value>, }
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117#[serde(tag = "channel", content = "data", rename_all = "camelCase")]
118pub enum WsInbound {
119 Trades(Vec<WsTrade>),
120 L2Book(WsBook),
121 Bbo(WsBbo),
122 Candle(Vec<WsCandle>),
123 AllMids(Vec<WsMid>),
124 UserFills(Vec<WsFill>),
125 UserFundings(Vec<WsFunding>),
126 UserEvents(Vec<WsUserEvent>),
127
128 SubscriptionResponse(SubResp),
129 Pong(Option<i64>),
130 Notification(Notice),
131 Post(PostAck),
132
133 #[serde(other)]
134 Unknown,
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct SubResp {
139 pub ok: bool,
140 pub id: Option<String>,
141 pub message: Option<String>,
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct Notice {
146 pub code: Option<String>,
147 pub msg: Option<String>,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct PostAck {
152 pub id: String,
153 pub ok: bool,
154 pub error: Option<String>,
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct WsTrade {
159 pub instrument: Ustr,
160 #[serde(
161 serialize_with = "serialize_decimal",
162 deserialize_with = "deserialize_decimal"
163 )]
164 pub px: Decimal,
165 #[serde(
166 serialize_with = "serialize_decimal",
167 deserialize_with = "deserialize_decimal"
168 )]
169 pub qty: Decimal,
170 pub side: Side,
171 pub ts: i64, pub id: Option<String>,
173}
174
175#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
176#[serde(rename_all = "lowercase")]
177pub enum Side {
178 Buy,
179 Sell,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct WsBbo {
184 pub instrument: Ustr,
185 #[serde(
186 serialize_with = "serialize_decimal",
187 deserialize_with = "deserialize_decimal"
188 )]
189 pub bid_px: Decimal,
190 #[serde(
191 serialize_with = "serialize_decimal",
192 deserialize_with = "deserialize_decimal"
193 )]
194 pub bid_qty: Decimal,
195 #[serde(
196 serialize_with = "serialize_decimal",
197 deserialize_with = "deserialize_decimal"
198 )]
199 pub ask_px: Decimal,
200 #[serde(
201 serialize_with = "serialize_decimal",
202 deserialize_with = "deserialize_decimal"
203 )]
204 pub ask_qty: Decimal,
205 pub ts: i64, }
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct WsCandle {
210 pub instrument: Ustr,
211 pub interval: HyperliquidBarInterval,
212 pub open_ts: i64,
213 #[serde(
214 serialize_with = "serialize_decimal",
215 deserialize_with = "deserialize_decimal"
216 )]
217 pub o: Decimal,
218 #[serde(
219 serialize_with = "serialize_decimal",
220 deserialize_with = "deserialize_decimal"
221 )]
222 pub h: Decimal,
223 #[serde(
224 serialize_with = "serialize_decimal",
225 deserialize_with = "deserialize_decimal"
226 )]
227 pub l: Decimal,
228 #[serde(
229 serialize_with = "serialize_decimal",
230 deserialize_with = "deserialize_decimal"
231 )]
232 pub c: Decimal,
233 #[serde(
234 serialize_with = "serialize_decimal",
235 deserialize_with = "deserialize_decimal"
236 )]
237 pub v: Decimal,
238}
239
240#[derive(Debug, Clone, Serialize, Deserialize)]
241pub struct WsBook {
242 pub instrument: Ustr,
243 pub is_snapshot: bool,
244 pub seq: Option<u64>,
245 pub checksum: Option<u32>,
246 pub bids: Vec<Level>,
247 pub asks: Vec<Level>,
248 pub ts: i64, }
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct Level {
253 #[serde(
254 serialize_with = "serialize_decimal",
255 deserialize_with = "deserialize_decimal"
256 )]
257 pub px: Decimal,
258 #[serde(
259 serialize_with = "serialize_decimal",
260 deserialize_with = "deserialize_decimal"
261 )]
262 pub qty: Decimal,
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct WsMid {
267 pub symbol: String,
268 #[serde(
269 serialize_with = "serialize_decimal",
270 deserialize_with = "deserialize_decimal"
271 )]
272 pub mid: Decimal,
273 pub ts: Option<i64>,
274}
275
276#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct WsFill {
278 pub symbol: String,
279 pub order_id: String,
280 pub trade_id: String,
281 #[serde(
282 serialize_with = "serialize_decimal",
283 deserialize_with = "deserialize_decimal"
284 )]
285 pub px: Decimal,
286 #[serde(
287 serialize_with = "serialize_decimal",
288 deserialize_with = "deserialize_decimal"
289 )]
290 pub qty: Decimal,
291 pub side: Side,
292 pub ts: i64,
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
296pub struct WsFunding {
297 pub symbol: String,
298 #[serde(
299 serialize_with = "serialize_decimal",
300 deserialize_with = "deserialize_decimal"
301 )]
302 pub rate: Decimal,
303 pub ts: i64,
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize)]
307pub struct WsUserEvent {
308 pub event_type: String,
309 pub data: serde_json::Value,
310 pub ts: i64,
311}
312
313fn serialize_decimal<S: Serializer>(d: &Decimal, s: S) -> Result<S::Ok, S::Error> {
314 s.serialize_str(&d.normalize().to_string())
315}
316
317fn deserialize_decimal<'de, D: Deserializer<'de>>(d: D) -> Result<Decimal, D::Error> {
318 let v = serde_json::Value::deserialize(d)?;
319 match v {
320 serde_json::Value::String(s) => Decimal::from_str(&s).map_err(serde::de::Error::custom),
321 serde_json::Value::Number(n) => {
322 Decimal::from_str(&n.to_string()).map_err(serde::de::Error::custom)
323 }
324 _ => Err(serde::de::Error::custom(
325 "expected decimal string or number",
326 )),
327 }
328}
329
330pub fn encode_outbound(msg: &WsOutbound) -> HyperliquidWsRequest {
332 match msg {
333 WsOutbound::Subscribe { args, id: _ } => {
334 if let Some(arg) = args.first() {
336 let subscription = match arg.channel {
337 HyperliquidWsChannel::Trades => SubscriptionRequest::Trades {
338 coin: arg.symbol.unwrap_or_default(),
339 },
340 HyperliquidWsChannel::L2Book => SubscriptionRequest::L2Book {
341 coin: arg.symbol.unwrap_or_default(),
342 n_sig_figs: arg
343 .params
344 .as_ref()
345 .and_then(|p| p.get("nSigFigs"))
346 .and_then(|v| v.as_u64())
347 .map(|u| u as u32),
348 mantissa: arg
349 .params
350 .as_ref()
351 .and_then(|p| p.get("mantissa"))
352 .and_then(|v| v.as_u64())
353 .map(|u| u as u32),
354 },
355 HyperliquidWsChannel::Bbo => SubscriptionRequest::Bbo {
356 coin: arg.symbol.unwrap_or_default(),
357 },
358 HyperliquidWsChannel::Candle => SubscriptionRequest::Candle {
359 coin: arg.symbol.unwrap_or_default(),
360 interval: arg
361 .params
362 .as_ref()
363 .and_then(|p| p.get("interval"))
364 .and_then(|v| v.as_str())
365 .and_then(|s| s.parse::<HyperliquidBarInterval>().ok())
366 .unwrap_or(OneMinute),
367 },
368 HyperliquidWsChannel::AllMids => SubscriptionRequest::AllMids {
369 dex: arg
370 .params
371 .as_ref()
372 .and_then(|p| p.get("dex"))
373 .and_then(|v| v.as_str())
374 .map(|s| s.to_string()),
375 },
376 HyperliquidWsChannel::Notification => SubscriptionRequest::Notification {
377 user: arg
378 .params
379 .as_ref()
380 .and_then(|p| p.get("user"))
381 .and_then(|v| v.as_str())
382 .unwrap_or_default()
383 .to_string(),
384 },
385 _ => SubscriptionRequest::AllMids { dex: None }, };
387
388 HyperliquidWsRequest::Subscribe { subscription }
389 } else {
390 HyperliquidWsRequest::Ping }
392 }
393 WsOutbound::Unsubscribe { args, id: _ } => {
394 if let Some(arg) = args.first() {
395 let subscription = match arg.channel {
396 HyperliquidWsChannel::Trades => SubscriptionRequest::Trades {
397 coin: arg.symbol.unwrap_or_default(),
398 },
399 HyperliquidWsChannel::L2Book => SubscriptionRequest::L2Book {
400 coin: arg.symbol.unwrap_or_default(),
401 n_sig_figs: None,
402 mantissa: None,
403 },
404 HyperliquidWsChannel::Bbo => SubscriptionRequest::Bbo {
405 coin: arg.symbol.unwrap_or_default(),
406 },
407 HyperliquidWsChannel::Candle => SubscriptionRequest::Candle {
408 coin: arg.symbol.unwrap_or_default(),
409 interval: arg
410 .params
411 .as_ref()
412 .and_then(|p| p.get("interval"))
413 .and_then(|v| v.as_str())
414 .and_then(|s| s.parse::<HyperliquidBarInterval>().ok())
415 .unwrap_or(OneMinute),
416 },
417 _ => SubscriptionRequest::AllMids { dex: None },
418 };
419
420 HyperliquidWsRequest::Unsubscribe { subscription }
421 } else {
422 HyperliquidWsRequest::Ping
423 }
424 }
425 WsOutbound::Ping => HyperliquidWsRequest::Ping,
426 WsOutbound::Post { id, path: _, body } => HyperliquidWsRequest::Post {
427 id: id.parse::<u64>().unwrap_or(1),
428 request: PostRequest::Info {
429 payload: body.clone(),
430 },
431 },
432 WsOutbound::Auth { payload } => HyperliquidWsRequest::Post {
433 id: 1,
434 request: PostRequest::Info {
435 payload: payload.clone(),
436 }, },
438 }
439}
440
441pub fn decode_inbound(msg: &HyperliquidWsMessage) -> WsInbound {
443 match msg {
444 HyperliquidWsMessage::SubscriptionResponse { data } => {
445 WsInbound::SubscriptionResponse(SubResp {
446 ok: true,
447 id: None,
448 message: Some(format!("Subscribed to {:?}", data)),
449 })
450 }
451 HyperliquidWsMessage::Post { data } => WsInbound::Post(PostAck {
452 id: data.id.to_string(),
453 ok: true,
454 error: None,
455 }),
456 HyperliquidWsMessage::Trades { data } => {
457 let trades = data
458 .iter()
459 .map(|t| WsTrade {
460 instrument: t.coin,
461 px: Decimal::from_str(&t.px).unwrap_or_default(),
462 qty: Decimal::from_str(&t.sz).unwrap_or_default(),
463 side: match t.side {
464 HyperliquidSide::Sell => Side::Sell,
465 HyperliquidSide::Buy => Side::Buy,
466 },
467 ts: t.time as i64,
468 id: Some(t.tid.to_string()),
469 })
470 .collect();
471 WsInbound::Trades(trades)
472 }
473 HyperliquidWsMessage::L2Book { data } => {
474 let bids = data.levels[0]
475 .iter()
476 .filter(|l| l.n > 0) .map(|l| Level {
478 px: Decimal::from_str(&l.px).unwrap_or_default(),
479 qty: Decimal::from_str(&l.sz).unwrap_or_default(),
480 })
481 .collect();
482
483 let asks = data.levels[1]
484 .iter()
485 .filter(|l| l.n > 0) .map(|l| Level {
487 px: Decimal::from_str(&l.px).unwrap_or_default(),
488 qty: Decimal::from_str(&l.sz).unwrap_or_default(),
489 })
490 .collect();
491
492 WsInbound::L2Book(WsBook {
493 instrument: data.coin,
494 is_snapshot: true, seq: Some(data.time),
496 checksum: None,
497 bids,
498 asks,
499 ts: data.time as i64,
500 })
501 }
502 HyperliquidWsMessage::Bbo { data } => {
503 let default_level = WsLevelData {
505 px: "0".to_string(),
506 sz: "0".to_string(),
507 n: 0,
508 };
509 let bid = data.bbo[0].as_ref().unwrap_or(&default_level);
510 let ask = data.bbo[1].as_ref().unwrap_or(&default_level);
511
512 WsInbound::Bbo(WsBbo {
513 instrument: data.coin,
514 bid_px: Decimal::from_str(&bid.px).unwrap_or_default(),
515 bid_qty: Decimal::from_str(&bid.sz).unwrap_or_default(),
516 ask_px: Decimal::from_str(&ask.px).unwrap_or_default(),
517 ask_qty: Decimal::from_str(&ask.sz).unwrap_or_default(),
518 ts: data.time as i64,
519 })
520 }
521 HyperliquidWsMessage::Candle { data } => match HyperliquidBarInterval::from_str(&data.i) {
522 Ok(interval) => {
523 let candle = WsCandle {
524 instrument: data.s,
525 interval,
526 open_ts: data.t as i64,
527 o: Decimal::from_str(&data.o).unwrap_or_default(),
528 h: Decimal::from_str(&data.h).unwrap_or_default(),
529 l: Decimal::from_str(&data.l).unwrap_or_default(),
530 c: Decimal::from_str(&data.c).unwrap_or_default(),
531 v: Decimal::from_str(&data.v).unwrap_or_default(),
532 };
533 WsInbound::Candle(vec![candle])
534 }
535 Err(e) => {
536 tracing::error!("Failed to parse candle interval '{}': {}", data.i, e);
537 WsInbound::Unknown
538 }
539 },
540 HyperliquidWsMessage::Notification { data } => WsInbound::Notification(Notice {
541 code: None,
542 msg: Some(data.notification.clone()),
543 }),
544 HyperliquidWsMessage::Pong => WsInbound::Pong(Some(chrono::Utc::now().timestamp_millis())),
545 _ => WsInbound::Unknown,
546 }
547}