1use std::str::FromStr;
19
20use anyhow::Context;
21use nautilus_core::{nanos::UnixNanos, uuid::UUID4};
22use nautilus_model::{
23 data::{
24 Bar, BarType, BookOrder, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate,
25 OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick,
26 },
27 enums::{
28 AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType, RecordFlag,
29 TimeInForce,
30 },
31 identifiers::{AccountId, ClientOrderId, TradeId, VenueOrderId},
32 instruments::{Instrument, InstrumentAny},
33 reports::{FillReport, OrderStatusReport},
34 types::{Currency, Money, Price, Quantity},
35};
36use rust_decimal::{Decimal, prelude::FromPrimitive};
37
38use super::messages::{
39 CandleData, WsActiveAssetCtxData, WsBboData, WsBookData, WsFillData, WsOrderData, WsTradeData,
40};
41use crate::common::parse::{is_conditional_order_data, millis_to_nanos, parse_trigger_order_type};
42
43fn parse_price(
45 price_str: &str,
46 instrument: &InstrumentAny,
47 field_name: &str,
48) -> anyhow::Result<Price> {
49 let decimal = Decimal::from_str(price_str)
50 .with_context(|| format!("Failed to parse price from '{price_str}' for {field_name}"))?;
51
52 Price::from_decimal_dp(decimal, instrument.price_precision())
53 .with_context(|| format!("Failed to create price from '{price_str}' for {field_name}"))
54}
55
56fn parse_quantity(
58 quantity_str: &str,
59 instrument: &InstrumentAny,
60 field_name: &str,
61) -> anyhow::Result<Quantity> {
62 let decimal = Decimal::from_str(quantity_str).with_context(|| {
63 format!("Failed to parse quantity from '{quantity_str}' for {field_name}")
64 })?;
65
66 Quantity::from_decimal_dp(decimal.abs(), instrument.size_precision()).with_context(|| {
67 format!("Failed to create quantity from '{quantity_str}' for {field_name}")
68 })
69}
70
71pub fn parse_ws_trade_tick(
73 trade: &WsTradeData,
74 instrument: &InstrumentAny,
75 ts_init: UnixNanos,
76) -> anyhow::Result<TradeTick> {
77 let price = parse_price(&trade.px, instrument, "trade.px")?;
78 let size = parse_quantity(&trade.sz, instrument, "trade.sz")?;
79 let aggressor = AggressorSide::from(trade.side);
80 let trade_id = TradeId::new_checked(trade.tid.to_string())
81 .context("invalid trade identifier in Hyperliquid trade message")?;
82 let ts_event = millis_to_nanos(trade.time)?;
83
84 TradeTick::new_checked(
85 instrument.id(),
86 price,
87 size,
88 aggressor,
89 trade_id,
90 ts_event,
91 ts_init,
92 )
93 .context("failed to construct TradeTick from Hyperliquid trade message")
94}
95
96pub fn parse_ws_order_book_deltas(
98 book: &WsBookData,
99 instrument: &InstrumentAny,
100 ts_init: UnixNanos,
101) -> anyhow::Result<OrderBookDeltas> {
102 let ts_event = millis_to_nanos(book.time)?;
103 let mut deltas = Vec::new();
104
105 deltas.push(OrderBookDelta::clear(instrument.id(), 0, ts_event, ts_init));
107
108 for level in &book.levels[0] {
110 let price = parse_price(&level.px, instrument, "book.bid.px")?;
111 let size = parse_quantity(&level.sz, instrument, "book.bid.sz")?;
112
113 if !size.is_positive() {
114 continue;
115 }
116
117 let order = BookOrder::new(OrderSide::Buy, price, size, 0);
118
119 let delta = OrderBookDelta::new(
120 instrument.id(),
121 BookAction::Add,
122 order,
123 RecordFlag::F_LAST as u8,
124 0, ts_event,
126 ts_init,
127 );
128
129 deltas.push(delta);
130 }
131
132 for level in &book.levels[1] {
134 let price = parse_price(&level.px, instrument, "book.ask.px")?;
135 let size = parse_quantity(&level.sz, instrument, "book.ask.sz")?;
136
137 if !size.is_positive() {
138 continue;
139 }
140
141 let order = BookOrder::new(OrderSide::Sell, price, size, 0);
142
143 let delta = OrderBookDelta::new(
144 instrument.id(),
145 BookAction::Add,
146 order,
147 RecordFlag::F_LAST as u8,
148 0, ts_event,
150 ts_init,
151 );
152
153 deltas.push(delta);
154 }
155
156 Ok(OrderBookDeltas::new(instrument.id(), deltas))
157}
158
159pub fn parse_ws_quote_tick(
161 bbo: &WsBboData,
162 instrument: &InstrumentAny,
163 ts_init: UnixNanos,
164) -> anyhow::Result<QuoteTick> {
165 let bid_level = bbo.bbo[0]
166 .as_ref()
167 .context("BBO message missing bid level")?;
168 let ask_level = bbo.bbo[1]
169 .as_ref()
170 .context("BBO message missing ask level")?;
171
172 let bid_price = parse_price(&bid_level.px, instrument, "bbo.bid.px")?;
173 let ask_price = parse_price(&ask_level.px, instrument, "bbo.ask.px")?;
174 let bid_size = parse_quantity(&bid_level.sz, instrument, "bbo.bid.sz")?;
175 let ask_size = parse_quantity(&ask_level.sz, instrument, "bbo.ask.sz")?;
176
177 let ts_event = millis_to_nanos(bbo.time)?;
178
179 QuoteTick::new_checked(
180 instrument.id(),
181 bid_price,
182 ask_price,
183 bid_size,
184 ask_size,
185 ts_event,
186 ts_init,
187 )
188 .context("failed to construct QuoteTick from Hyperliquid BBO message")
189}
190
191pub fn parse_ws_candle(
193 candle: &CandleData,
194 instrument: &InstrumentAny,
195 bar_type: &BarType,
196 ts_init: UnixNanos,
197) -> anyhow::Result<Bar> {
198 let open = parse_price(&candle.o, instrument, "candle.o")?;
199 let high = parse_price(&candle.h, instrument, "candle.h")?;
200 let low = parse_price(&candle.l, instrument, "candle.l")?;
201 let close = parse_price(&candle.c, instrument, "candle.c")?;
202 let volume = parse_quantity(&candle.v, instrument, "candle.v")?;
203
204 let ts_event = millis_to_nanos(candle.t)?;
205
206 Ok(Bar::new(
207 *bar_type, open, high, low, close, volume, ts_event, ts_init,
208 ))
209}
210
211pub fn parse_ws_order_status_report(
216 order: &WsOrderData,
217 instrument: &InstrumentAny,
218 account_id: AccountId,
219 ts_init: UnixNanos,
220) -> anyhow::Result<OrderStatusReport> {
221 let instrument_id = instrument.id();
222 let venue_order_id = VenueOrderId::new(order.order.oid.to_string());
223 let order_side = OrderSide::from(order.order.side);
224
225 let order_type = if is_conditional_order_data(
227 order.order.trigger_px.as_deref(),
228 order.order.tpsl.as_ref(),
229 ) {
230 if let (Some(is_market), Some(tpsl)) = (order.order.is_market, order.order.tpsl.as_ref()) {
231 parse_trigger_order_type(is_market, tpsl)
232 } else {
233 OrderType::Limit }
235 } else {
236 OrderType::Limit };
238
239 let time_in_force = TimeInForce::Gtc;
240 let order_status = OrderStatus::from(order.status);
241
242 let orig_qty = parse_quantity(&order.order.orig_sz, instrument, "order.orig_sz")?;
244 let remaining_qty = parse_quantity(&order.order.sz, instrument, "order.sz")?;
245 let filled_qty = Quantity::from_raw(
246 orig_qty.raw.saturating_sub(remaining_qty.raw),
247 instrument.size_precision(),
248 );
249
250 let price = parse_price(&order.order.limit_px, instrument, "order.limitPx")?;
251
252 let ts_accepted = millis_to_nanos(order.order.timestamp)?;
253 let ts_last = millis_to_nanos(order.status_timestamp)?;
254
255 let mut report = OrderStatusReport::new(
256 account_id,
257 instrument_id,
258 None, venue_order_id,
260 order_side,
261 order_type,
262 time_in_force,
263 order_status,
264 orig_qty, filled_qty,
266 ts_accepted,
267 ts_last,
268 ts_init,
269 Some(UUID4::new()),
270 );
271
272 if let Some(ref cloid) = order.order.cloid {
273 report = report.with_client_order_id(ClientOrderId::new(cloid.as_str()));
274 }
275
276 report = report.with_price(price);
277
278 if let Some(ref trigger_px_str) = order.order.trigger_px {
279 let trigger_price = parse_price(trigger_px_str, instrument, "order.triggerPx")?;
280 report = report.with_trigger_price(trigger_price);
281 }
282
283 Ok(report)
284}
285
286pub fn parse_ws_fill_report(
290 fill: &WsFillData,
291 instrument: &InstrumentAny,
292 account_id: AccountId,
293 ts_init: UnixNanos,
294) -> anyhow::Result<FillReport> {
295 let instrument_id = instrument.id();
296 let venue_order_id = VenueOrderId::new(fill.oid.to_string());
297 let trade_id = TradeId::new_checked(fill.tid.to_string())
298 .context("invalid trade identifier in Hyperliquid fill message")?;
299
300 let order_side = OrderSide::from(fill.side);
301 let last_qty = parse_quantity(&fill.sz, instrument, "fill.sz")?;
302 let last_px = parse_price(&fill.px, instrument, "fill.px")?;
303 let liquidity_side = if fill.crossed {
304 LiquiditySide::Taker
305 } else {
306 LiquiditySide::Maker
307 };
308
309 let fee_amount = Decimal::from_str(&fill.fee)
310 .with_context(|| format!("Failed to parse fee='{}' as decimal", fill.fee))?;
311
312 let commission_currency = if fill.fee_token == "USDC" {
313 Currency::from("USDC")
314 } else {
315 instrument.quote_currency()
316 };
317
318 let commission = Money::from_decimal(fee_amount, commission_currency)
319 .with_context(|| format!("Failed to create commission from fee='{}'", fill.fee))?;
320 let ts_event = millis_to_nanos(fill.time)?;
321
322 let client_order_id = None;
324
325 Ok(FillReport::new(
326 account_id,
327 instrument_id,
328 venue_order_id,
329 trade_id,
330 order_side,
331 last_qty,
332 last_px,
333 commission,
334 liquidity_side,
335 client_order_id,
336 None, ts_event,
338 ts_init,
339 None, ))
341}
342
343pub fn parse_ws_asset_context(
349 ctx: &WsActiveAssetCtxData,
350 instrument: &InstrumentAny,
351 ts_init: UnixNanos,
352) -> anyhow::Result<(
353 MarkPriceUpdate,
354 Option<IndexPriceUpdate>,
355 Option<FundingRateUpdate>,
356)> {
357 let instrument_id = instrument.id();
358
359 match ctx {
360 WsActiveAssetCtxData::Perp { coin: _, ctx } => {
361 let mark_px_f64 = ctx
362 .shared
363 .mark_px
364 .parse::<f64>()
365 .context("Failed to parse mark_px as f64")?;
366 let mark_price = parse_f64_price(mark_px_f64, instrument, "ctx.mark_px")?;
367 let mark_price_update =
368 MarkPriceUpdate::new(instrument_id, mark_price, ts_init, ts_init);
369
370 let oracle_px_f64 = ctx
371 .oracle_px
372 .parse::<f64>()
373 .context("Failed to parse oracle_px as f64")?;
374 let index_price = parse_f64_price(oracle_px_f64, instrument, "ctx.oracle_px")?;
375 let index_price_update =
376 IndexPriceUpdate::new(instrument_id, index_price, ts_init, ts_init);
377
378 let funding_f64 = ctx
379 .funding
380 .parse::<f64>()
381 .context("Failed to parse funding as f64")?;
382 let funding_rate_decimal = Decimal::from_f64(funding_f64)
383 .context("Failed to convert funding rate to Decimal")?;
384 let funding_rate_update = FundingRateUpdate::new(
385 instrument_id,
386 funding_rate_decimal,
387 None, ts_init,
389 ts_init,
390 );
391
392 Ok((
393 mark_price_update,
394 Some(index_price_update),
395 Some(funding_rate_update),
396 ))
397 }
398 WsActiveAssetCtxData::Spot { coin: _, ctx } => {
399 let mark_px_f64 = ctx
400 .shared
401 .mark_px
402 .parse::<f64>()
403 .context("Failed to parse mark_px as f64")?;
404 let mark_price = parse_f64_price(mark_px_f64, instrument, "ctx.mark_px")?;
405 let mark_price_update =
406 MarkPriceUpdate::new(instrument_id, mark_price, ts_init, ts_init);
407
408 Ok((mark_price_update, None, None))
409 }
410 }
411}
412
413fn parse_f64_price(
415 price: f64,
416 instrument: &InstrumentAny,
417 field_name: &str,
418) -> anyhow::Result<Price> {
419 if !price.is_finite() {
420 anyhow::bail!("Invalid price value for {field_name}: {price} (must be finite)");
421 }
422 Ok(Price::new(price, instrument.price_precision()))
423}
424
425#[cfg(test)]
426mod tests {
427 use nautilus_model::{
428 identifiers::{InstrumentId, Symbol, Venue},
429 instruments::CryptoPerpetual,
430 types::currency::Currency,
431 };
432 use rstest::rstest;
433 use ustr::Ustr;
434
435 use super::*;
436 use crate::{
437 common::enums::{
438 HyperliquidFillDirection, HyperliquidOrderStatus as HyperliquidOrderStatusEnum,
439 HyperliquidSide,
440 },
441 websocket::messages::{
442 PerpsAssetCtx, SharedAssetCtx, SpotAssetCtx, WsBasicOrderData, WsBookData, WsLevelData,
443 },
444 };
445
446 fn create_test_instrument() -> InstrumentAny {
447 let instrument_id = InstrumentId::new(Symbol::new("BTC-PERP"), Venue::new("HYPERLIQUID"));
448
449 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
450 instrument_id,
451 Symbol::new("BTC-PERP"),
452 Currency::from("BTC"),
453 Currency::from("USDC"),
454 Currency::from("USDC"),
455 false, 2, 3, Price::from("0.01"),
459 Quantity::from("0.001"),
460 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
473 UnixNanos::default(),
474 ))
475 }
476
477 #[rstest]
478 fn test_parse_ws_order_status_report_basic() {
479 let instrument = create_test_instrument();
480 let account_id = AccountId::new("HYPERLIQUID-001");
481 let ts_init = UnixNanos::default();
482
483 let order_data = WsOrderData {
484 order: WsBasicOrderData {
485 coin: Ustr::from("BTC"),
486 side: HyperliquidSide::Buy,
487 limit_px: "50000.0".to_string(),
488 sz: "0.5".to_string(),
489 oid: 12345,
490 timestamp: 1704470400000,
491 orig_sz: "1.0".to_string(),
492 cloid: Some("test-order-1".to_string()),
493 trigger_px: None,
494 is_market: None,
495 tpsl: None,
496 trigger_activated: None,
497 trailing_stop: None,
498 },
499 status: HyperliquidOrderStatusEnum::Open,
500 status_timestamp: 1704470400000,
501 };
502
503 let result = parse_ws_order_status_report(&order_data, &instrument, account_id, ts_init);
504 assert!(result.is_ok());
505
506 let report = result.unwrap();
507 assert_eq!(report.order_side, OrderSide::Buy);
508 assert_eq!(report.order_type, OrderType::Limit);
509 assert_eq!(report.order_status, OrderStatus::Accepted);
510 }
511
512 #[rstest]
513 fn test_parse_ws_fill_report_basic() {
514 let instrument = create_test_instrument();
515 let account_id = AccountId::new("HYPERLIQUID-001");
516 let ts_init = UnixNanos::default();
517
518 let fill_data = WsFillData {
519 coin: Ustr::from("BTC"),
520 px: "50000.0".to_string(),
521 sz: "0.1".to_string(),
522 side: HyperliquidSide::Buy,
523 time: 1704470400000,
524 start_position: "0.0".to_string(),
525 dir: HyperliquidFillDirection::OpenLong,
526 closed_pnl: "0.0".to_string(),
527 hash: "0xabc123".to_string(),
528 oid: 12345,
529 crossed: true,
530 fee: "0.05".to_string(),
531 tid: 98765,
532 liquidation: None,
533 fee_token: "USDC".to_string(),
534 builder_fee: None,
535 cloid: Some("0xd211f1c27288259290850338d22132a0".to_string()),
536 twap_id: None,
537 };
538
539 let result = parse_ws_fill_report(&fill_data, &instrument, account_id, ts_init);
540 assert!(result.is_ok());
541
542 let report = result.unwrap();
543 assert_eq!(report.order_side, OrderSide::Buy);
544 assert_eq!(report.liquidity_side, LiquiditySide::Taker);
545 }
546
547 #[rstest]
548 fn test_parse_ws_order_book_deltas_snapshot_behavior() {
549 let instrument = create_test_instrument();
550 let ts_init = UnixNanos::default();
551
552 let book = WsBookData {
553 coin: Ustr::from("BTC"),
554 levels: [
555 vec![WsLevelData {
556 px: "50000.0".to_string(),
557 sz: "1.0".to_string(),
558 n: 1,
559 }],
560 vec![WsLevelData {
561 px: "50001.0".to_string(),
562 sz: "2.0".to_string(),
563 n: 1,
564 }],
565 ],
566 time: 1_704_470_400_000,
567 };
568
569 let deltas = parse_ws_order_book_deltas(&book, &instrument, ts_init).unwrap();
570
571 assert_eq!(deltas.deltas.len(), 3); assert_eq!(deltas.deltas[0].action, BookAction::Clear);
573
574 let bid_delta = &deltas.deltas[1];
575 assert_eq!(bid_delta.action, BookAction::Add);
576 assert_eq!(bid_delta.order.side, OrderSide::Buy);
577 assert!(bid_delta.order.size.is_positive());
578 assert_eq!(bid_delta.order.order_id, 0);
579
580 let ask_delta = &deltas.deltas[2];
581 assert_eq!(ask_delta.action, BookAction::Add);
582 assert_eq!(ask_delta.order.side, OrderSide::Sell);
583 assert!(ask_delta.order.size.is_positive());
584 assert_eq!(ask_delta.order.order_id, 0);
585 }
586
587 #[rstest]
588 fn test_parse_ws_asset_context_perp() {
589 let instrument = create_test_instrument();
590 let ts_init = UnixNanos::default();
591
592 let ctx_data = WsActiveAssetCtxData::Perp {
593 coin: Ustr::from("BTC"),
594 ctx: PerpsAssetCtx {
595 shared: SharedAssetCtx {
596 day_ntl_vlm: "1000000.0".to_string(),
597 prev_day_px: "49000.0".to_string(),
598 mark_px: "50000.0".to_string(),
599 mid_px: Some("50001.0".to_string()),
600 impact_pxs: Some(vec!["50000.0".to_string(), "50002.0".to_string()]),
601 day_base_vlm: Some("100.0".to_string()),
602 },
603 funding: "0.0001".to_string(),
604 open_interest: "100000.0".to_string(),
605 oracle_px: "50005.0".to_string(),
606 premium: Some("-0.0001".to_string()),
607 },
608 };
609
610 let result = parse_ws_asset_context(&ctx_data, &instrument, ts_init);
611 assert!(result.is_ok());
612
613 let (mark_price, index_price, funding_rate) = result.unwrap();
614
615 assert_eq!(mark_price.instrument_id, instrument.id());
616 assert_eq!(mark_price.value.as_f64(), 50_000.0);
617
618 assert!(index_price.is_some());
619 let index = index_price.unwrap();
620 assert_eq!(index.instrument_id, instrument.id());
621 assert_eq!(index.value.as_f64(), 50_005.0);
622
623 assert!(funding_rate.is_some());
624 let funding = funding_rate.unwrap();
625 assert_eq!(funding.instrument_id, instrument.id());
626 assert_eq!(funding.rate.to_string(), "0.0001");
627 }
628
629 #[rstest]
630 fn test_parse_ws_asset_context_spot() {
631 let instrument = create_test_instrument();
632 let ts_init = UnixNanos::default();
633
634 let ctx_data = WsActiveAssetCtxData::Spot {
635 coin: Ustr::from("BTC"),
636 ctx: SpotAssetCtx {
637 shared: SharedAssetCtx {
638 day_ntl_vlm: "1000000.0".to_string(),
639 prev_day_px: "49000.0".to_string(),
640 mark_px: "50000.0".to_string(),
641 mid_px: Some("50001.0".to_string()),
642 impact_pxs: Some(vec!["50000.0".to_string(), "50002.0".to_string()]),
643 day_base_vlm: Some("100.0".to_string()),
644 },
645 circulating_supply: "19000000.0".to_string(),
646 },
647 };
648
649 let result = parse_ws_asset_context(&ctx_data, &instrument, ts_init);
650 assert!(result.is_ok());
651
652 let (mark_price, index_price, funding_rate) = result.unwrap();
653
654 assert_eq!(mark_price.instrument_id, instrument.id());
655 assert_eq!(mark_price.value.as_f64(), 50_000.0);
656 assert!(index_price.is_none());
657 assert!(funding_rate.is_none());
658 }
659}