1use std::str::FromStr;
19
20use anyhow::Context;
21use nautilus_core::{nanos::UnixNanos, uuid::UUID4};
22use nautilus_model::{
23 data::{
24 Bar, BarType, BookOrder, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate,
25 OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick,
26 },
27 enums::{
28 AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType, RecordFlag,
29 TimeInForce,
30 },
31 identifiers::{AccountId, ClientOrderId, TradeId, VenueOrderId},
32 instruments::{Instrument, InstrumentAny},
33 reports::{FillReport, OrderStatusReport},
34 types::{Currency, Money, Price, Quantity},
35};
36use rust_decimal::{
37 Decimal,
38 prelude::{FromPrimitive, ToPrimitive},
39};
40
41use super::messages::{
42 CandleData, WsActiveAssetCtxData, WsBboData, WsBookData, WsFillData, WsOrderData, WsTradeData,
43};
44use crate::common::parse::{
45 is_conditional_order_data, parse_millis_to_nanos, parse_trigger_order_type,
46};
47
48fn parse_price(
50 price_str: &str,
51 instrument: &InstrumentAny,
52 field_name: &str,
53) -> anyhow::Result<Price> {
54 let decimal = Decimal::from_str(price_str)
55 .with_context(|| format!("Failed to parse price from '{price_str}' for {field_name}"))?;
56
57 let value = decimal.to_f64().ok_or_else(|| {
58 anyhow::anyhow!(
59 "Failed to convert price '{price_str}' to f64 for {field_name} (out of range or too much precision)"
60 )
61 })?;
62
63 Ok(Price::new(value, instrument.price_precision()))
64}
65
66fn parse_quantity(
68 quantity_str: &str,
69 instrument: &InstrumentAny,
70 field_name: &str,
71) -> anyhow::Result<Quantity> {
72 let decimal = Decimal::from_str(quantity_str).with_context(|| {
73 format!("Failed to parse quantity from '{quantity_str}' for {field_name}")
74 })?;
75
76 let value = decimal.abs().to_f64().ok_or_else(|| {
77 anyhow::anyhow!(
78 "Failed to convert quantity '{quantity_str}' to f64 for {field_name} (out of range or too much precision)"
79 )
80 })?;
81
82 Ok(Quantity::new(value, instrument.size_precision()))
83}
84
85pub fn parse_ws_trade_tick(
87 trade: &WsTradeData,
88 instrument: &InstrumentAny,
89 ts_init: UnixNanos,
90) -> anyhow::Result<TradeTick> {
91 let price = parse_price(&trade.px, instrument, "trade.px")?;
92 let size = parse_quantity(&trade.sz, instrument, "trade.sz")?;
93 let aggressor = AggressorSide::from(trade.side);
94 let trade_id = TradeId::new_checked(trade.tid.to_string())
95 .context("invalid trade identifier in Hyperliquid trade message")?;
96 let ts_event = parse_millis_to_nanos(trade.time);
97
98 TradeTick::new_checked(
99 instrument.id(),
100 price,
101 size,
102 aggressor,
103 trade_id,
104 ts_event,
105 ts_init,
106 )
107 .context("failed to construct TradeTick from Hyperliquid trade message")
108}
109
110pub fn parse_ws_order_book_deltas(
112 book: &WsBookData,
113 instrument: &InstrumentAny,
114 ts_init: UnixNanos,
115) -> anyhow::Result<OrderBookDeltas> {
116 let ts_event = parse_millis_to_nanos(book.time);
117 let mut deltas = Vec::new();
118
119 deltas.push(OrderBookDelta::clear(instrument.id(), 0, ts_event, ts_init));
121
122 for level in &book.levels[0] {
124 let price = parse_price(&level.px, instrument, "book.bid.px")?;
125 let size = parse_quantity(&level.sz, instrument, "book.bid.sz")?;
126
127 if !size.is_positive() {
128 continue;
129 }
130
131 let order = BookOrder::new(OrderSide::Buy, price, size, 0);
132
133 let delta = OrderBookDelta::new(
134 instrument.id(),
135 BookAction::Add,
136 order,
137 RecordFlag::F_LAST as u8,
138 0, ts_event,
140 ts_init,
141 );
142
143 deltas.push(delta);
144 }
145
146 for level in &book.levels[1] {
148 let price = parse_price(&level.px, instrument, "book.ask.px")?;
149 let size = parse_quantity(&level.sz, instrument, "book.ask.sz")?;
150
151 if !size.is_positive() {
152 continue;
153 }
154
155 let order = BookOrder::new(OrderSide::Sell, price, size, 0);
156
157 let delta = OrderBookDelta::new(
158 instrument.id(),
159 BookAction::Add,
160 order,
161 RecordFlag::F_LAST as u8,
162 0, ts_event,
164 ts_init,
165 );
166
167 deltas.push(delta);
168 }
169
170 Ok(OrderBookDeltas::new(instrument.id(), deltas))
171}
172
173pub fn parse_ws_quote_tick(
175 bbo: &WsBboData,
176 instrument: &InstrumentAny,
177 ts_init: UnixNanos,
178) -> anyhow::Result<QuoteTick> {
179 let bid_level = bbo.bbo[0]
180 .as_ref()
181 .context("BBO message missing bid level")?;
182 let ask_level = bbo.bbo[1]
183 .as_ref()
184 .context("BBO message missing ask level")?;
185
186 let bid_price = parse_price(&bid_level.px, instrument, "bbo.bid.px")?;
187 let ask_price = parse_price(&ask_level.px, instrument, "bbo.ask.px")?;
188 let bid_size = parse_quantity(&bid_level.sz, instrument, "bbo.bid.sz")?;
189 let ask_size = parse_quantity(&ask_level.sz, instrument, "bbo.ask.sz")?;
190
191 let ts_event = parse_millis_to_nanos(bbo.time);
192
193 QuoteTick::new_checked(
194 instrument.id(),
195 bid_price,
196 ask_price,
197 bid_size,
198 ask_size,
199 ts_event,
200 ts_init,
201 )
202 .context("failed to construct QuoteTick from Hyperliquid BBO message")
203}
204
205pub fn parse_ws_candle(
207 candle: &CandleData,
208 instrument: &InstrumentAny,
209 bar_type: &BarType,
210 ts_init: UnixNanos,
211) -> anyhow::Result<Bar> {
212 let open = parse_price(&candle.o, instrument, "candle.o")?;
213 let high = parse_price(&candle.h, instrument, "candle.h")?;
214 let low = parse_price(&candle.l, instrument, "candle.l")?;
215 let close = parse_price(&candle.c, instrument, "candle.c")?;
216 let volume = parse_quantity(&candle.v, instrument, "candle.v")?;
217
218 let ts_event = parse_millis_to_nanos(candle.t);
219
220 Ok(Bar::new(
221 *bar_type, open, high, low, close, volume, ts_event, ts_init,
222 ))
223}
224
225pub fn parse_ws_order_status_report(
230 order: &WsOrderData,
231 instrument: &InstrumentAny,
232 account_id: AccountId,
233 ts_init: UnixNanos,
234) -> anyhow::Result<OrderStatusReport> {
235 let instrument_id = instrument.id();
236 let venue_order_id = VenueOrderId::new(order.order.oid.to_string());
237 let order_side = OrderSide::from(order.order.side);
238
239 let order_type = if is_conditional_order_data(
241 order.order.trigger_px.as_deref(),
242 order.order.tpsl.as_ref(),
243 ) {
244 if let (Some(is_market), Some(tpsl)) = (order.order.is_market, order.order.tpsl.as_ref()) {
245 parse_trigger_order_type(is_market, tpsl)
246 } else {
247 OrderType::Limit }
249 } else {
250 OrderType::Limit };
252
253 let time_in_force = TimeInForce::Gtc;
254 let order_status = OrderStatus::from(order.status);
255 let quantity = parse_quantity(&order.order.sz, instrument, "order.sz")?;
256
257 let orig_qty = parse_quantity(&order.order.orig_sz, instrument, "order.orig_sz")?;
259 let filled_qty = Quantity::from_raw(
260 orig_qty.raw.saturating_sub(quantity.raw),
261 instrument.size_precision(),
262 );
263
264 let price = parse_price(&order.order.limit_px, instrument, "order.limitPx")?;
265
266 let ts_accepted = parse_millis_to_nanos(order.order.timestamp);
267 let ts_last = parse_millis_to_nanos(order.status_timestamp);
268
269 let mut report = OrderStatusReport::new(
270 account_id,
271 instrument_id,
272 None, venue_order_id,
274 order_side,
275 order_type,
276 time_in_force,
277 order_status,
278 quantity,
279 filled_qty,
280 ts_accepted,
281 ts_last,
282 ts_init,
283 Some(UUID4::new()),
284 );
285
286 if let Some(ref cloid) = order.order.cloid {
287 report = report.with_client_order_id(ClientOrderId::new(cloid.as_str()));
288 }
289
290 report = report.with_price(price);
291
292 if let Some(ref trigger_px_str) = order.order.trigger_px {
293 let trigger_price = parse_price(trigger_px_str, instrument, "order.triggerPx")?;
294 report = report.with_trigger_price(trigger_price);
295 }
296
297 Ok(report)
298}
299
300pub fn parse_ws_fill_report(
304 fill: &WsFillData,
305 instrument: &InstrumentAny,
306 account_id: AccountId,
307 ts_init: UnixNanos,
308) -> anyhow::Result<FillReport> {
309 let instrument_id = instrument.id();
310 let venue_order_id = VenueOrderId::new(fill.oid.to_string());
311 let trade_id = TradeId::new_checked(fill.tid.to_string())
312 .context("invalid trade identifier in Hyperliquid fill message")?;
313
314 let order_side = OrderSide::from(fill.side);
315 let last_qty = parse_quantity(&fill.sz, instrument, "fill.sz")?;
316 let last_px = parse_price(&fill.px, instrument, "fill.px")?;
317 let liquidity_side = if fill.crossed {
318 LiquiditySide::Taker
319 } else {
320 LiquiditySide::Maker
321 };
322
323 let commission_amount = Decimal::from_str(&fill.fee)
324 .with_context(|| format!("Failed to parse fee='{}' as decimal", fill.fee))?
325 .abs()
326 .to_string()
327 .parse::<f64>()
328 .unwrap_or(0.0);
329
330 let commission_currency = if fill.fee_token == "USDC" {
331 Currency::from("USDC")
332 } else {
333 instrument.quote_currency()
335 };
336
337 let commission = Money::new(commission_amount, commission_currency);
338 let ts_event = parse_millis_to_nanos(fill.time);
339
340 let client_order_id = None;
342
343 Ok(FillReport::new(
344 account_id,
345 instrument_id,
346 venue_order_id,
347 trade_id,
348 order_side,
349 last_qty,
350 last_px,
351 commission,
352 liquidity_side,
353 client_order_id,
354 None, ts_event,
356 ts_init,
357 None, ))
359}
360
361pub fn parse_ws_asset_context(
367 ctx: &WsActiveAssetCtxData,
368 instrument: &InstrumentAny,
369 ts_init: UnixNanos,
370) -> anyhow::Result<(
371 MarkPriceUpdate,
372 Option<IndexPriceUpdate>,
373 Option<FundingRateUpdate>,
374)> {
375 let instrument_id = instrument.id();
376
377 match ctx {
378 WsActiveAssetCtxData::Perp { coin: _, ctx } => {
379 let mark_px_f64 = ctx
380 .shared
381 .mark_px
382 .parse::<f64>()
383 .context("Failed to parse mark_px as f64")?;
384 let mark_price = parse_f64_price(mark_px_f64, instrument, "ctx.mark_px")?;
385 let mark_price_update =
386 MarkPriceUpdate::new(instrument_id, mark_price, ts_init, ts_init);
387
388 let oracle_px_f64 = ctx
389 .oracle_px
390 .parse::<f64>()
391 .context("Failed to parse oracle_px as f64")?;
392 let index_price = parse_f64_price(oracle_px_f64, instrument, "ctx.oracle_px")?;
393 let index_price_update =
394 IndexPriceUpdate::new(instrument_id, index_price, ts_init, ts_init);
395
396 let funding_f64 = ctx
397 .funding
398 .parse::<f64>()
399 .context("Failed to parse funding as f64")?;
400 let funding_rate_decimal = Decimal::from_f64(funding_f64)
401 .context("Failed to convert funding rate to Decimal")?;
402 let funding_rate_update = FundingRateUpdate::new(
403 instrument_id,
404 funding_rate_decimal,
405 None, ts_init,
407 ts_init,
408 );
409
410 Ok((
411 mark_price_update,
412 Some(index_price_update),
413 Some(funding_rate_update),
414 ))
415 }
416 WsActiveAssetCtxData::Spot { coin: _, ctx } => {
417 let mark_px_f64 = ctx
418 .shared
419 .mark_px
420 .parse::<f64>()
421 .context("Failed to parse mark_px as f64")?;
422 let mark_price = parse_f64_price(mark_px_f64, instrument, "ctx.mark_px")?;
423 let mark_price_update =
424 MarkPriceUpdate::new(instrument_id, mark_price, ts_init, ts_init);
425
426 Ok((mark_price_update, None, None))
427 }
428 }
429}
430
431fn parse_f64_price(
433 price: f64,
434 instrument: &InstrumentAny,
435 field_name: &str,
436) -> anyhow::Result<Price> {
437 if !price.is_finite() {
438 anyhow::bail!("Invalid price value for {field_name}: {price} (must be finite)");
439 }
440 Ok(Price::new(price, instrument.price_precision()))
441}
442
443#[cfg(test)]
444mod tests {
445 use nautilus_model::{
446 identifiers::{InstrumentId, Symbol, Venue},
447 instruments::CryptoPerpetual,
448 types::currency::Currency,
449 };
450 use rstest::rstest;
451 use ustr::Ustr;
452
453 use super::*;
454 use crate::{
455 common::enums::{
456 HyperliquidFillDirection, HyperliquidOrderStatus as HyperliquidOrderStatusEnum,
457 HyperliquidSide,
458 },
459 websocket::messages::{
460 PerpsAssetCtx, SharedAssetCtx, SpotAssetCtx, WsBasicOrderData, WsBookData, WsLevelData,
461 },
462 };
463
464 fn create_test_instrument() -> InstrumentAny {
465 let instrument_id = InstrumentId::new(Symbol::new("BTC-PERP"), Venue::new("HYPERLIQUID"));
466
467 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
468 instrument_id,
469 Symbol::new("BTC-PERP"),
470 Currency::from("BTC"),
471 Currency::from("USDC"),
472 Currency::from("USDC"),
473 false, 2, 3, Price::from("0.01"),
477 Quantity::from("0.001"),
478 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
491 UnixNanos::default(),
492 ))
493 }
494
495 #[rstest]
496 fn test_parse_ws_order_status_report_basic() {
497 let instrument = create_test_instrument();
498 let account_id = AccountId::new("HYPERLIQUID-001");
499 let ts_init = UnixNanos::default();
500
501 let order_data = WsOrderData {
502 order: WsBasicOrderData {
503 coin: Ustr::from("BTC"),
504 side: HyperliquidSide::Buy,
505 limit_px: "50000.0".to_string(),
506 sz: "0.5".to_string(),
507 oid: 12345,
508 timestamp: 1704470400000,
509 orig_sz: "1.0".to_string(),
510 cloid: Some("test-order-1".to_string()),
511 trigger_px: None,
512 is_market: None,
513 tpsl: None,
514 trigger_activated: None,
515 trailing_stop: None,
516 },
517 status: HyperliquidOrderStatusEnum::Open,
518 status_timestamp: 1704470400000,
519 };
520
521 let result = parse_ws_order_status_report(&order_data, &instrument, account_id, ts_init);
522 assert!(result.is_ok());
523
524 let report = result.unwrap();
525 assert_eq!(report.order_side, OrderSide::Buy);
526 assert_eq!(report.order_type, OrderType::Limit);
527 assert_eq!(report.order_status, OrderStatus::Accepted);
528 }
529
530 #[rstest]
531 fn test_parse_ws_fill_report_basic() {
532 let instrument = create_test_instrument();
533 let account_id = AccountId::new("HYPERLIQUID-001");
534 let ts_init = UnixNanos::default();
535
536 let fill_data = WsFillData {
537 coin: Ustr::from("BTC"),
538 px: "50000.0".to_string(),
539 sz: "0.1".to_string(),
540 side: HyperliquidSide::Buy,
541 time: 1704470400000,
542 start_position: "0.0".to_string(),
543 dir: HyperliquidFillDirection::OpenLong,
544 closed_pnl: "0.0".to_string(),
545 hash: "0xabc123".to_string(),
546 oid: 12345,
547 crossed: true,
548 fee: "0.05".to_string(),
549 tid: 98765,
550 liquidation: None,
551 fee_token: "USDC".to_string(),
552 builder_fee: None,
553 };
554
555 let result = parse_ws_fill_report(&fill_data, &instrument, account_id, ts_init);
556 assert!(result.is_ok());
557
558 let report = result.unwrap();
559 assert_eq!(report.order_side, OrderSide::Buy);
560 assert_eq!(report.liquidity_side, LiquiditySide::Taker);
561 }
562
563 #[rstest]
564 fn test_parse_ws_order_book_deltas_snapshot_behavior() {
565 let instrument = create_test_instrument();
566 let ts_init = UnixNanos::default();
567
568 let book = WsBookData {
569 coin: Ustr::from("BTC"),
570 levels: [
571 vec![WsLevelData {
572 px: "50000.0".to_string(),
573 sz: "1.0".to_string(),
574 n: 1,
575 }],
576 vec![WsLevelData {
577 px: "50001.0".to_string(),
578 sz: "2.0".to_string(),
579 n: 1,
580 }],
581 ],
582 time: 1_704_470_400_000,
583 };
584
585 let deltas = parse_ws_order_book_deltas(&book, &instrument, ts_init).unwrap();
586
587 assert_eq!(deltas.deltas.len(), 3); assert_eq!(deltas.deltas[0].action, BookAction::Clear);
589
590 let bid_delta = &deltas.deltas[1];
591 assert_eq!(bid_delta.action, BookAction::Add);
592 assert_eq!(bid_delta.order.side, OrderSide::Buy);
593 assert!(bid_delta.order.size.is_positive());
594 assert_eq!(bid_delta.order.order_id, 0);
595
596 let ask_delta = &deltas.deltas[2];
597 assert_eq!(ask_delta.action, BookAction::Add);
598 assert_eq!(ask_delta.order.side, OrderSide::Sell);
599 assert!(ask_delta.order.size.is_positive());
600 assert_eq!(ask_delta.order.order_id, 0);
601 }
602
603 #[rstest]
604 fn test_parse_ws_asset_context_perp() {
605 let instrument = create_test_instrument();
606 let ts_init = UnixNanos::default();
607
608 let ctx_data = WsActiveAssetCtxData::Perp {
609 coin: Ustr::from("BTC"),
610 ctx: PerpsAssetCtx {
611 shared: SharedAssetCtx {
612 day_ntl_vlm: "1000000.0".to_string(),
613 prev_day_px: "49000.0".to_string(),
614 mark_px: "50000.0".to_string(),
615 mid_px: Some("50001.0".to_string()),
616 impact_pxs: Some(vec!["50000.0".to_string(), "50002.0".to_string()]),
617 day_base_vlm: Some("100.0".to_string()),
618 },
619 funding: "0.0001".to_string(),
620 open_interest: "100000.0".to_string(),
621 oracle_px: "50005.0".to_string(),
622 premium: Some("-0.0001".to_string()),
623 },
624 };
625
626 let result = parse_ws_asset_context(&ctx_data, &instrument, ts_init);
627 assert!(result.is_ok());
628
629 let (mark_price, index_price, funding_rate) = result.unwrap();
630
631 assert_eq!(mark_price.instrument_id, instrument.id());
632 assert_eq!(mark_price.value.as_f64(), 50_000.0);
633
634 assert!(index_price.is_some());
635 let index = index_price.unwrap();
636 assert_eq!(index.instrument_id, instrument.id());
637 assert_eq!(index.value.as_f64(), 50_005.0);
638
639 assert!(funding_rate.is_some());
640 let funding = funding_rate.unwrap();
641 assert_eq!(funding.instrument_id, instrument.id());
642 assert_eq!(funding.rate.to_string(), "0.0001");
643 }
644
645 #[rstest]
646 fn test_parse_ws_asset_context_spot() {
647 let instrument = create_test_instrument();
648 let ts_init = UnixNanos::default();
649
650 let ctx_data = WsActiveAssetCtxData::Spot {
651 coin: Ustr::from("BTC"),
652 ctx: SpotAssetCtx {
653 shared: SharedAssetCtx {
654 day_ntl_vlm: "1000000.0".to_string(),
655 prev_day_px: "49000.0".to_string(),
656 mark_px: "50000.0".to_string(),
657 mid_px: Some("50001.0".to_string()),
658 impact_pxs: Some(vec!["50000.0".to_string(), "50002.0".to_string()]),
659 day_base_vlm: Some("100.0".to_string()),
660 },
661 circulating_supply: "19000000.0".to_string(),
662 },
663 };
664
665 let result = parse_ws_asset_context(&ctx_data, &instrument, ts_init);
666 assert!(result.is_ok());
667
668 let (mark_price, index_price, funding_rate) = result.unwrap();
669
670 assert_eq!(mark_price.instrument_id, instrument.id());
671 assert_eq!(mark_price.value.as_f64(), 50_000.0);
672 assert!(index_price.is_none());
673 assert!(funding_rate.is_none());
674 }
675}