1use std::str::FromStr;
17
18use nautilus_core::serialization::{
19 deserialize_decimal, serialize_decimal_as_str as serialize_decimal,
20};
21use rust_decimal::Decimal;
22use serde::{Deserialize, Serialize};
23use ustr::Ustr;
24
25use crate::{
26 common::enums::{
27 HyperliquidBarInterval::{self, OneMinute},
28 HyperliquidSide,
29 },
30 websocket::{
31 HyperliquidWsChannel, HyperliquidWsError,
32 messages::{
33 HyperliquidWsMessage, HyperliquidWsRequest, PostRequest, SubscriptionRequest,
34 WsLevelData,
35 },
36 },
37};
38
39#[derive(Debug, Default)]
44pub struct HyperliquidCodec;
45
46impl HyperliquidCodec {
47 pub fn new() -> Self {
49 Self
50 }
51
52 pub fn validate_url(url: &str) -> Result<(), HyperliquidWsError> {
54 if url.starts_with("ws://") || url.starts_with("wss://") {
55 Ok(())
56 } else {
57 Err(HyperliquidWsError::UrlParsing(format!(
58 "URL must start with ws:// or wss://, was: {url}"
59 )))
60 }
61 }
62
63 pub fn encode(&self, request: &HyperliquidWsRequest) -> Result<Vec<u8>, HyperliquidWsError> {
65 serde_json::to_vec(request).map_err(|e| {
66 HyperliquidWsError::MessageSerialization(format!("Failed to serialize request: {e}"))
67 })
68 }
69
70 pub fn decode(&self, data: &[u8]) -> Result<HyperliquidWsMessage, HyperliquidWsError> {
72 serde_json::from_slice(data).map_err(|e| {
73 HyperliquidWsError::MessageDeserialization(format!(
74 "Failed to deserialize message: {e}"
75 ))
76 })
77 }
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(tag = "op", rename_all = "camelCase")]
83pub enum WsOutbound {
84 Subscribe {
85 args: Vec<SubArg>,
86 id: Option<String>,
87 },
88 Unsubscribe {
89 args: Vec<SubArg>,
90 id: Option<String>,
91 },
92 Ping,
93 Post {
94 id: String,
95 path: String,
96 body: serde_json::Value,
97 },
98 Auth {
99 payload: serde_json::Value,
100 },
101}
102
103pub type SubRequest = SubArg;
105pub type TradeSide = Side;
106
107#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
108pub struct SubArg {
109 pub channel: HyperliquidWsChannel,
110 #[serde(default)]
111 pub symbol: Option<Ustr>, #[serde(default)]
113 pub params: Option<serde_json::Value>, }
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
118#[serde(tag = "channel", content = "data", rename_all = "camelCase")]
119pub enum WsInbound {
120 Trades(Vec<WsTrade>),
121 L2Book(WsBook),
122 Bbo(WsBbo),
123 Candle(Vec<WsCandle>),
124 AllMids(Vec<WsMid>),
125 UserFills(Vec<WsFill>),
126 UserFundings(Vec<WsFunding>),
127 UserEvents(Vec<WsUserEvent>),
128 SubscriptionResponse(SubResp),
129 Pong(Option<i64>),
130 Notification(Notice),
131 Post(PostAck),
132 #[serde(other)]
133 Unknown,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct SubResp {
138 pub ok: bool,
139 pub id: Option<String>,
140 pub message: Option<String>,
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct Notice {
145 pub code: Option<String>,
146 pub msg: Option<String>,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct PostAck {
151 pub id: String,
152 pub ok: bool,
153 pub error: Option<String>,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct WsTrade {
158 pub instrument: Ustr,
159 #[serde(
160 serialize_with = "serialize_decimal",
161 deserialize_with = "deserialize_decimal"
162 )]
163 pub px: Decimal,
164 #[serde(
165 serialize_with = "serialize_decimal",
166 deserialize_with = "deserialize_decimal"
167 )]
168 pub qty: Decimal,
169 pub side: Side,
170 pub ts: i64, pub id: Option<String>,
172}
173
174#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
175#[serde(rename_all = "lowercase")]
176pub enum Side {
177 Buy,
178 Sell,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct WsBbo {
183 pub instrument: Ustr,
184 #[serde(
185 serialize_with = "serialize_decimal",
186 deserialize_with = "deserialize_decimal"
187 )]
188 pub bid_px: Decimal,
189 #[serde(
190 serialize_with = "serialize_decimal",
191 deserialize_with = "deserialize_decimal"
192 )]
193 pub bid_qty: Decimal,
194 #[serde(
195 serialize_with = "serialize_decimal",
196 deserialize_with = "deserialize_decimal"
197 )]
198 pub ask_px: Decimal,
199 #[serde(
200 serialize_with = "serialize_decimal",
201 deserialize_with = "deserialize_decimal"
202 )]
203 pub ask_qty: Decimal,
204 pub ts: i64, }
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct WsCandle {
209 pub instrument: Ustr,
210 pub interval: HyperliquidBarInterval,
211 pub open_ts: i64,
212 #[serde(
213 serialize_with = "serialize_decimal",
214 deserialize_with = "deserialize_decimal"
215 )]
216 pub o: Decimal,
217 #[serde(
218 serialize_with = "serialize_decimal",
219 deserialize_with = "deserialize_decimal"
220 )]
221 pub h: Decimal,
222 #[serde(
223 serialize_with = "serialize_decimal",
224 deserialize_with = "deserialize_decimal"
225 )]
226 pub l: Decimal,
227 #[serde(
228 serialize_with = "serialize_decimal",
229 deserialize_with = "deserialize_decimal"
230 )]
231 pub c: Decimal,
232 #[serde(
233 serialize_with = "serialize_decimal",
234 deserialize_with = "deserialize_decimal"
235 )]
236 pub v: Decimal,
237}
238
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct WsBook {
241 pub instrument: Ustr,
242 pub is_snapshot: bool,
243 pub seq: Option<u64>,
244 pub checksum: Option<u32>,
245 pub bids: Vec<Level>,
246 pub asks: Vec<Level>,
247 pub ts: i64, }
249
250#[derive(Debug, Clone, Serialize, Deserialize)]
251pub struct Level {
252 #[serde(
253 serialize_with = "serialize_decimal",
254 deserialize_with = "deserialize_decimal"
255 )]
256 pub px: Decimal,
257 #[serde(
258 serialize_with = "serialize_decimal",
259 deserialize_with = "deserialize_decimal"
260 )]
261 pub qty: Decimal,
262}
263
264#[derive(Debug, Clone, Serialize, Deserialize)]
265pub struct WsMid {
266 pub symbol: String,
267 #[serde(
268 serialize_with = "serialize_decimal",
269 deserialize_with = "deserialize_decimal"
270 )]
271 pub mid: Decimal,
272 pub ts: Option<i64>,
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
276pub struct WsFill {
277 pub symbol: String,
278 pub order_id: String,
279 pub trade_id: String,
280 #[serde(
281 serialize_with = "serialize_decimal",
282 deserialize_with = "deserialize_decimal"
283 )]
284 pub px: Decimal,
285 #[serde(
286 serialize_with = "serialize_decimal",
287 deserialize_with = "deserialize_decimal"
288 )]
289 pub qty: Decimal,
290 pub side: Side,
291 pub ts: i64,
292}
293
294#[derive(Debug, Clone, Serialize, Deserialize)]
295pub struct WsFunding {
296 pub symbol: String,
297 #[serde(
298 serialize_with = "serialize_decimal",
299 deserialize_with = "deserialize_decimal"
300 )]
301 pub rate: Decimal,
302 pub ts: i64,
303}
304
305#[derive(Debug, Clone, Serialize, Deserialize)]
306pub struct WsUserEvent {
307 pub event_type: String,
308 pub data: serde_json::Value,
309 pub ts: i64,
310}
311
312pub fn encode_outbound(msg: &WsOutbound) -> HyperliquidWsRequest {
314 match msg {
315 WsOutbound::Subscribe { args, id: _ } => {
316 if let Some(arg) = args.first() {
318 let subscription = match arg.channel {
319 HyperliquidWsChannel::Trades => SubscriptionRequest::Trades {
320 coin: arg.symbol.unwrap_or_default(),
321 },
322 HyperliquidWsChannel::L2Book => SubscriptionRequest::L2Book {
323 coin: arg.symbol.unwrap_or_default(),
324 n_sig_figs: arg
325 .params
326 .as_ref()
327 .and_then(|p| p.get("nSigFigs"))
328 .and_then(|v| v.as_u64())
329 .map(|u| u as u32),
330 mantissa: arg
331 .params
332 .as_ref()
333 .and_then(|p| p.get("mantissa"))
334 .and_then(|v| v.as_u64())
335 .map(|u| u as u32),
336 },
337 HyperliquidWsChannel::Bbo => SubscriptionRequest::Bbo {
338 coin: arg.symbol.unwrap_or_default(),
339 },
340 HyperliquidWsChannel::Candle => SubscriptionRequest::Candle {
341 coin: arg.symbol.unwrap_or_default(),
342 interval: arg
343 .params
344 .as_ref()
345 .and_then(|p| p.get("interval"))
346 .and_then(|v| v.as_str())
347 .and_then(|s| s.parse::<HyperliquidBarInterval>().ok())
348 .unwrap_or(OneMinute),
349 },
350 HyperliquidWsChannel::AllMids => SubscriptionRequest::AllMids {
351 dex: arg
352 .params
353 .as_ref()
354 .and_then(|p| p.get("dex"))
355 .and_then(|v| v.as_str())
356 .map(|s| s.to_string()),
357 },
358 HyperliquidWsChannel::Notification => SubscriptionRequest::Notification {
359 user: arg
360 .params
361 .as_ref()
362 .and_then(|p| p.get("user"))
363 .and_then(|v| v.as_str())
364 .unwrap_or_default()
365 .to_string(),
366 },
367 _ => SubscriptionRequest::AllMids { dex: None }, };
369
370 HyperliquidWsRequest::Subscribe { subscription }
371 } else {
372 HyperliquidWsRequest::Ping }
374 }
375 WsOutbound::Unsubscribe { args, id: _ } => {
376 if let Some(arg) = args.first() {
377 let subscription = match arg.channel {
378 HyperliquidWsChannel::Trades => SubscriptionRequest::Trades {
379 coin: arg.symbol.unwrap_or_default(),
380 },
381 HyperliquidWsChannel::L2Book => SubscriptionRequest::L2Book {
382 coin: arg.symbol.unwrap_or_default(),
383 n_sig_figs: None,
384 mantissa: None,
385 },
386 HyperliquidWsChannel::Bbo => SubscriptionRequest::Bbo {
387 coin: arg.symbol.unwrap_or_default(),
388 },
389 HyperliquidWsChannel::Candle => SubscriptionRequest::Candle {
390 coin: arg.symbol.unwrap_or_default(),
391 interval: arg
392 .params
393 .as_ref()
394 .and_then(|p| p.get("interval"))
395 .and_then(|v| v.as_str())
396 .and_then(|s| s.parse::<HyperliquidBarInterval>().ok())
397 .unwrap_or(OneMinute),
398 },
399 _ => SubscriptionRequest::AllMids { dex: None },
400 };
401
402 HyperliquidWsRequest::Unsubscribe { subscription }
403 } else {
404 HyperliquidWsRequest::Ping
405 }
406 }
407 WsOutbound::Ping => HyperliquidWsRequest::Ping,
408 WsOutbound::Post { id, path: _, body } => HyperliquidWsRequest::Post {
409 id: id.parse::<u64>().unwrap_or(1),
410 request: PostRequest::Info {
411 payload: body.clone(),
412 },
413 },
414 WsOutbound::Auth { payload } => HyperliquidWsRequest::Post {
415 id: 1,
416 request: PostRequest::Info {
417 payload: payload.clone(),
418 }, },
420 }
421}
422
423pub fn decode_inbound(msg: &HyperliquidWsMessage) -> WsInbound {
425 match msg {
426 HyperliquidWsMessage::SubscriptionResponse { data } => {
427 WsInbound::SubscriptionResponse(SubResp {
428 ok: true,
429 id: None,
430 message: Some(format!("Subscribed to {data:?}")),
431 })
432 }
433 HyperliquidWsMessage::Post { data } => WsInbound::Post(PostAck {
434 id: data.id.to_string(),
435 ok: true,
436 error: None,
437 }),
438 HyperliquidWsMessage::Trades { data } => {
439 let trades = data
440 .iter()
441 .map(|t| WsTrade {
442 instrument: t.coin,
443 px: Decimal::from_str(&t.px).unwrap_or_default(),
444 qty: Decimal::from_str(&t.sz).unwrap_or_default(),
445 side: match t.side {
446 HyperliquidSide::Sell => Side::Sell,
447 HyperliquidSide::Buy => Side::Buy,
448 },
449 ts: t.time as i64,
450 id: Some(t.tid.to_string()),
451 })
452 .collect();
453 WsInbound::Trades(trades)
454 }
455 HyperliquidWsMessage::L2Book { data } => {
456 let bids = data.levels[0]
457 .iter()
458 .filter(|l| l.n > 0) .map(|l| Level {
460 px: Decimal::from_str(&l.px).unwrap_or_default(),
461 qty: Decimal::from_str(&l.sz).unwrap_or_default(),
462 })
463 .collect();
464
465 let asks = data.levels[1]
466 .iter()
467 .filter(|l| l.n > 0) .map(|l| Level {
469 px: Decimal::from_str(&l.px).unwrap_or_default(),
470 qty: Decimal::from_str(&l.sz).unwrap_or_default(),
471 })
472 .collect();
473
474 WsInbound::L2Book(WsBook {
475 instrument: data.coin,
476 is_snapshot: true, seq: Some(data.time),
478 checksum: None,
479 bids,
480 asks,
481 ts: data.time as i64,
482 })
483 }
484 HyperliquidWsMessage::Bbo { data } => {
485 let default_level = WsLevelData {
487 px: "0".to_string(),
488 sz: "0".to_string(),
489 n: 0,
490 };
491 let bid = data.bbo[0].as_ref().unwrap_or(&default_level);
492 let ask = data.bbo[1].as_ref().unwrap_or(&default_level);
493
494 WsInbound::Bbo(WsBbo {
495 instrument: data.coin,
496 bid_px: Decimal::from_str(&bid.px).unwrap_or_default(),
497 bid_qty: Decimal::from_str(&bid.sz).unwrap_or_default(),
498 ask_px: Decimal::from_str(&ask.px).unwrap_or_default(),
499 ask_qty: Decimal::from_str(&ask.sz).unwrap_or_default(),
500 ts: data.time as i64,
501 })
502 }
503 HyperliquidWsMessage::Candle { data } => match HyperliquidBarInterval::from_str(&data.i) {
504 Ok(interval) => {
505 let candle = WsCandle {
506 instrument: data.s,
507 interval,
508 open_ts: data.t as i64,
509 o: Decimal::from_str(&data.o).unwrap_or_default(),
510 h: Decimal::from_str(&data.h).unwrap_or_default(),
511 l: Decimal::from_str(&data.l).unwrap_or_default(),
512 c: Decimal::from_str(&data.c).unwrap_or_default(),
513 v: Decimal::from_str(&data.v).unwrap_or_default(),
514 };
515 WsInbound::Candle(vec![candle])
516 }
517 Err(e) => {
518 log::error!("Failed to parse candle interval '{}': {}", data.i, e);
519 WsInbound::Unknown
520 }
521 },
522 HyperliquidWsMessage::Notification { data } => WsInbound::Notification(Notice {
523 code: None,
524 msg: Some(data.notification.clone()),
525 }),
526 HyperliquidWsMessage::Pong => WsInbound::Pong(Some(chrono::Utc::now().timestamp_millis())),
527 _ => WsInbound::Unknown,
528 }
529}