Skip to main content

nautilus_hyperliquid/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parsing helpers for Hyperliquid WebSocket payloads.
17
18use std::str::FromStr;
19
20use anyhow::Context;
21use nautilus_core::{nanos::UnixNanos, uuid::UUID4};
22use nautilus_model::{
23    data::{
24        Bar, BarType, BookOrder, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate,
25        OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick,
26    },
27    enums::{
28        AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType, RecordFlag,
29        TimeInForce,
30    },
31    identifiers::{AccountId, ClientOrderId, TradeId, VenueOrderId},
32    instruments::{Instrument, InstrumentAny},
33    reports::{FillReport, OrderStatusReport},
34    types::{Currency, Money, Price, Quantity},
35};
36use rust_decimal::{Decimal, prelude::FromPrimitive};
37
38use super::messages::{
39    CandleData, WsActiveAssetCtxData, WsBboData, WsBookData, WsFillData, WsOrderData, WsTradeData,
40};
41use crate::common::parse::{is_conditional_order_data, millis_to_nanos, parse_trigger_order_type};
42
43/// Helper to parse a price string with instrument precision.
44fn parse_price(
45    price_str: &str,
46    instrument: &InstrumentAny,
47    field_name: &str,
48) -> anyhow::Result<Price> {
49    let decimal = Decimal::from_str(price_str)
50        .with_context(|| format!("Failed to parse price from '{price_str}' for {field_name}"))?;
51
52    Price::from_decimal_dp(decimal, instrument.price_precision())
53        .with_context(|| format!("Failed to create price from '{price_str}' for {field_name}"))
54}
55
56/// Helper to parse a quantity string with instrument precision.
57fn parse_quantity(
58    quantity_str: &str,
59    instrument: &InstrumentAny,
60    field_name: &str,
61) -> anyhow::Result<Quantity> {
62    let decimal = Decimal::from_str(quantity_str).with_context(|| {
63        format!("Failed to parse quantity from '{quantity_str}' for {field_name}")
64    })?;
65
66    Quantity::from_decimal_dp(decimal.abs(), instrument.size_precision()).with_context(|| {
67        format!("Failed to create quantity from '{quantity_str}' for {field_name}")
68    })
69}
70
71/// Parses a WebSocket trade frame into a [`TradeTick`].
72pub fn parse_ws_trade_tick(
73    trade: &WsTradeData,
74    instrument: &InstrumentAny,
75    ts_init: UnixNanos,
76) -> anyhow::Result<TradeTick> {
77    let price = parse_price(&trade.px, instrument, "trade.px")?;
78    let size = parse_quantity(&trade.sz, instrument, "trade.sz")?;
79    let aggressor = AggressorSide::from(trade.side);
80    let trade_id = TradeId::new_checked(trade.tid.to_string())
81        .context("invalid trade identifier in Hyperliquid trade message")?;
82    let ts_event = millis_to_nanos(trade.time)?;
83
84    TradeTick::new_checked(
85        instrument.id(),
86        price,
87        size,
88        aggressor,
89        trade_id,
90        ts_event,
91        ts_init,
92    )
93    .context("failed to construct TradeTick from Hyperliquid trade message")
94}
95
96/// Parses a WebSocket L2 order book message into [`OrderBookDeltas`].
97pub fn parse_ws_order_book_deltas(
98    book: &WsBookData,
99    instrument: &InstrumentAny,
100    ts_init: UnixNanos,
101) -> anyhow::Result<OrderBookDeltas> {
102    let ts_event = millis_to_nanos(book.time)?;
103    let mut deltas = Vec::new();
104
105    // Treat every book payload as a snapshot: clear existing depth and rebuild it
106    deltas.push(OrderBookDelta::clear(instrument.id(), 0, ts_event, ts_init));
107
108    // Parse bids
109    for level in &book.levels[0] {
110        let price = parse_price(&level.px, instrument, "book.bid.px")?;
111        let size = parse_quantity(&level.sz, instrument, "book.bid.sz")?;
112
113        if !size.is_positive() {
114            continue;
115        }
116
117        let order = BookOrder::new(OrderSide::Buy, price, size, 0);
118
119        let delta = OrderBookDelta::new(
120            instrument.id(),
121            BookAction::Add,
122            order,
123            RecordFlag::F_LAST as u8,
124            0, // sequence
125            ts_event,
126            ts_init,
127        );
128
129        deltas.push(delta);
130    }
131
132    // Parse asks
133    for level in &book.levels[1] {
134        let price = parse_price(&level.px, instrument, "book.ask.px")?;
135        let size = parse_quantity(&level.sz, instrument, "book.ask.sz")?;
136
137        if !size.is_positive() {
138            continue;
139        }
140
141        let order = BookOrder::new(OrderSide::Sell, price, size, 0);
142
143        let delta = OrderBookDelta::new(
144            instrument.id(),
145            BookAction::Add,
146            order,
147            RecordFlag::F_LAST as u8,
148            0, // sequence
149            ts_event,
150            ts_init,
151        );
152
153        deltas.push(delta);
154    }
155
156    Ok(OrderBookDeltas::new(instrument.id(), deltas))
157}
158
159/// Parses a WebSocket BBO (best bid/offer) message into a [`QuoteTick`].
160pub fn parse_ws_quote_tick(
161    bbo: &WsBboData,
162    instrument: &InstrumentAny,
163    ts_init: UnixNanos,
164) -> anyhow::Result<QuoteTick> {
165    let bid_level = bbo.bbo[0]
166        .as_ref()
167        .context("BBO message missing bid level")?;
168    let ask_level = bbo.bbo[1]
169        .as_ref()
170        .context("BBO message missing ask level")?;
171
172    let bid_price = parse_price(&bid_level.px, instrument, "bbo.bid.px")?;
173    let ask_price = parse_price(&ask_level.px, instrument, "bbo.ask.px")?;
174    let bid_size = parse_quantity(&bid_level.sz, instrument, "bbo.bid.sz")?;
175    let ask_size = parse_quantity(&ask_level.sz, instrument, "bbo.ask.sz")?;
176
177    let ts_event = millis_to_nanos(bbo.time)?;
178
179    QuoteTick::new_checked(
180        instrument.id(),
181        bid_price,
182        ask_price,
183        bid_size,
184        ask_size,
185        ts_event,
186        ts_init,
187    )
188    .context("failed to construct QuoteTick from Hyperliquid BBO message")
189}
190
191/// Parses a WebSocket candle message into a [`Bar`].
192pub fn parse_ws_candle(
193    candle: &CandleData,
194    instrument: &InstrumentAny,
195    bar_type: &BarType,
196    ts_init: UnixNanos,
197) -> anyhow::Result<Bar> {
198    let open = parse_price(&candle.o, instrument, "candle.o")?;
199    let high = parse_price(&candle.h, instrument, "candle.h")?;
200    let low = parse_price(&candle.l, instrument, "candle.l")?;
201    let close = parse_price(&candle.c, instrument, "candle.c")?;
202    let volume = parse_quantity(&candle.v, instrument, "candle.v")?;
203
204    let ts_event = millis_to_nanos(candle.t)?;
205
206    Ok(Bar::new(
207        *bar_type, open, high, low, close, volume, ts_event, ts_init,
208    ))
209}
210
211/// Parses a WebSocket order update message into an [`OrderStatusReport`].
212///
213/// This converts Hyperliquid order data from WebSocket into Nautilus order status reports.
214/// Handles both regular and conditional orders (stop/limit-if-touched).
215pub fn parse_ws_order_status_report(
216    order: &WsOrderData,
217    instrument: &InstrumentAny,
218    account_id: AccountId,
219    ts_init: UnixNanos,
220) -> anyhow::Result<OrderStatusReport> {
221    let instrument_id = instrument.id();
222    let venue_order_id = VenueOrderId::new(order.order.oid.to_string());
223    let order_side = OrderSide::from(order.order.side);
224
225    // Determine order type based on trigger info
226    let order_type = if is_conditional_order_data(
227        order.order.trigger_px.as_deref(),
228        order.order.tpsl.as_ref(),
229    ) {
230        if let (Some(is_market), Some(tpsl)) = (order.order.is_market, order.order.tpsl.as_ref()) {
231            parse_trigger_order_type(is_market, tpsl)
232        } else {
233            OrderType::Limit // fallback
234        }
235    } else {
236        OrderType::Limit // Regular limit order
237    };
238
239    let time_in_force = TimeInForce::Gtc;
240    let order_status = OrderStatus::from(order.status);
241
242    // orig_sz is the original order quantity, sz is the remaining quantity
243    let orig_qty = parse_quantity(&order.order.orig_sz, instrument, "order.orig_sz")?;
244    let remaining_qty = parse_quantity(&order.order.sz, instrument, "order.sz")?;
245    let filled_qty = Quantity::from_raw(
246        orig_qty.raw.saturating_sub(remaining_qty.raw),
247        instrument.size_precision(),
248    );
249
250    let price = parse_price(&order.order.limit_px, instrument, "order.limitPx")?;
251
252    let ts_accepted = millis_to_nanos(order.order.timestamp)?;
253    let ts_last = millis_to_nanos(order.status_timestamp)?;
254
255    let mut report = OrderStatusReport::new(
256        account_id,
257        instrument_id,
258        None, // venue_order_id_modified
259        venue_order_id,
260        order_side,
261        order_type,
262        time_in_force,
263        order_status,
264        orig_qty, // Use original quantity, not remaining
265        filled_qty,
266        ts_accepted,
267        ts_last,
268        ts_init,
269        Some(UUID4::new()),
270    );
271
272    if let Some(ref cloid) = order.order.cloid {
273        report = report.with_client_order_id(ClientOrderId::new(cloid.as_str()));
274    }
275
276    report = report.with_price(price);
277
278    if let Some(ref trigger_px_str) = order.order.trigger_px {
279        let trigger_price = parse_price(trigger_px_str, instrument, "order.triggerPx")?;
280        report = report.with_trigger_price(trigger_price);
281    }
282
283    Ok(report)
284}
285
286/// Parses a WebSocket fill message into a [`FillReport`].
287///
288/// This converts Hyperliquid fill data from WebSocket user events into Nautilus fill reports.
289pub fn parse_ws_fill_report(
290    fill: &WsFillData,
291    instrument: &InstrumentAny,
292    account_id: AccountId,
293    ts_init: UnixNanos,
294) -> anyhow::Result<FillReport> {
295    let instrument_id = instrument.id();
296    let venue_order_id = VenueOrderId::new(fill.oid.to_string());
297    let trade_id = TradeId::new_checked(fill.tid.to_string())
298        .context("invalid trade identifier in Hyperliquid fill message")?;
299
300    let order_side = OrderSide::from(fill.side);
301    let last_qty = parse_quantity(&fill.sz, instrument, "fill.sz")?;
302    let last_px = parse_price(&fill.px, instrument, "fill.px")?;
303    let liquidity_side = if fill.crossed {
304        LiquiditySide::Taker
305    } else {
306        LiquiditySide::Maker
307    };
308
309    let fee_amount = Decimal::from_str(&fill.fee)
310        .with_context(|| format!("Failed to parse fee='{}' as decimal", fill.fee))?;
311
312    let commission_currency = if fill.fee_token == "USDC" {
313        Currency::from("USDC")
314    } else {
315        instrument.quote_currency()
316    };
317
318    let commission = Money::from_decimal(fee_amount, commission_currency)
319        .with_context(|| format!("Failed to create commission from fee='{}'", fill.fee))?;
320    let ts_event = millis_to_nanos(fill.time)?;
321
322    // No client order ID available in fill data directly
323    let client_order_id = None;
324
325    Ok(FillReport::new(
326        account_id,
327        instrument_id,
328        venue_order_id,
329        trade_id,
330        order_side,
331        last_qty,
332        last_px,
333        commission,
334        liquidity_side,
335        client_order_id,
336        None, // venue_position_id
337        ts_event,
338        ts_init,
339        None, // report_id
340    ))
341}
342
343/// Parses a WebSocket ActiveAssetCtx message into mark price, index price, and funding rate updates.
344///
345/// This converts Hyperliquid asset context data into Nautilus price and funding rate updates.
346/// Returns a tuple of (`MarkPriceUpdate`, `Option<IndexPriceUpdate>`, `Option<FundingRateUpdate>`).
347/// Index price and funding rate are only present for perpetual contracts.
348pub fn parse_ws_asset_context(
349    ctx: &WsActiveAssetCtxData,
350    instrument: &InstrumentAny,
351    ts_init: UnixNanos,
352) -> anyhow::Result<(
353    MarkPriceUpdate,
354    Option<IndexPriceUpdate>,
355    Option<FundingRateUpdate>,
356)> {
357    let instrument_id = instrument.id();
358
359    match ctx {
360        WsActiveAssetCtxData::Perp { coin: _, ctx } => {
361            let mark_px_f64 = ctx
362                .shared
363                .mark_px
364                .parse::<f64>()
365                .context("Failed to parse mark_px as f64")?;
366            let mark_price = parse_f64_price(mark_px_f64, instrument, "ctx.mark_px")?;
367            let mark_price_update =
368                MarkPriceUpdate::new(instrument_id, mark_price, ts_init, ts_init);
369
370            let oracle_px_f64 = ctx
371                .oracle_px
372                .parse::<f64>()
373                .context("Failed to parse oracle_px as f64")?;
374            let index_price = parse_f64_price(oracle_px_f64, instrument, "ctx.oracle_px")?;
375            let index_price_update =
376                IndexPriceUpdate::new(instrument_id, index_price, ts_init, ts_init);
377
378            let funding_f64 = ctx
379                .funding
380                .parse::<f64>()
381                .context("Failed to parse funding as f64")?;
382            let funding_rate_decimal = Decimal::from_f64(funding_f64)
383                .context("Failed to convert funding rate to Decimal")?;
384            let funding_rate_update = FundingRateUpdate::new(
385                instrument_id,
386                funding_rate_decimal,
387                None, // Hyperliquid doesn't provide next funding time in this message
388                ts_init,
389                ts_init,
390            );
391
392            Ok((
393                mark_price_update,
394                Some(index_price_update),
395                Some(funding_rate_update),
396            ))
397        }
398        WsActiveAssetCtxData::Spot { coin: _, ctx } => {
399            let mark_px_f64 = ctx
400                .shared
401                .mark_px
402                .parse::<f64>()
403                .context("Failed to parse mark_px as f64")?;
404            let mark_price = parse_f64_price(mark_px_f64, instrument, "ctx.mark_px")?;
405            let mark_price_update =
406                MarkPriceUpdate::new(instrument_id, mark_price, ts_init, ts_init);
407
408            Ok((mark_price_update, None, None))
409        }
410    }
411}
412
413/// Helper to parse an f64 price into a Price with instrument precision.
414fn parse_f64_price(
415    price: f64,
416    instrument: &InstrumentAny,
417    field_name: &str,
418) -> anyhow::Result<Price> {
419    if !price.is_finite() {
420        anyhow::bail!("Invalid price value for {field_name}: {price} (must be finite)");
421    }
422    Ok(Price::new(price, instrument.price_precision()))
423}
424
425#[cfg(test)]
426mod tests {
427    use nautilus_model::{
428        identifiers::{InstrumentId, Symbol, Venue},
429        instruments::CryptoPerpetual,
430        types::currency::Currency,
431    };
432    use rstest::rstest;
433    use ustr::Ustr;
434
435    use super::*;
436    use crate::{
437        common::enums::{
438            HyperliquidFillDirection, HyperliquidOrderStatus as HyperliquidOrderStatusEnum,
439            HyperliquidSide,
440        },
441        websocket::messages::{
442            PerpsAssetCtx, SharedAssetCtx, SpotAssetCtx, WsBasicOrderData, WsBookData, WsLevelData,
443        },
444    };
445
446    fn create_test_instrument() -> InstrumentAny {
447        let instrument_id = InstrumentId::new(Symbol::new("BTC-PERP"), Venue::new("HYPERLIQUID"));
448
449        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
450            instrument_id,
451            Symbol::new("BTC-PERP"),
452            Currency::from("BTC"),
453            Currency::from("USDC"),
454            Currency::from("USDC"),
455            false, // is_inverse
456            2,     // price_precision
457            3,     // size_precision
458            Price::from("0.01"),
459            Quantity::from("0.001"),
460            None, // multiplier
461            None, // lot_size
462            None, // max_quantity
463            None, // min_quantity
464            None, // max_notional
465            None, // min_notional
466            None, // max_price
467            None, // min_price
468            None, // margin_init
469            None, // margin_maint
470            None, // maker_fee
471            None, // taker_fee
472            UnixNanos::default(),
473            UnixNanos::default(),
474        ))
475    }
476
477    #[rstest]
478    fn test_parse_ws_order_status_report_basic() {
479        let instrument = create_test_instrument();
480        let account_id = AccountId::new("HYPERLIQUID-001");
481        let ts_init = UnixNanos::default();
482
483        let order_data = WsOrderData {
484            order: WsBasicOrderData {
485                coin: Ustr::from("BTC"),
486                side: HyperliquidSide::Buy,
487                limit_px: "50000.0".to_string(),
488                sz: "0.5".to_string(),
489                oid: 12345,
490                timestamp: 1704470400000,
491                orig_sz: "1.0".to_string(),
492                cloid: Some("test-order-1".to_string()),
493                trigger_px: None,
494                is_market: None,
495                tpsl: None,
496                trigger_activated: None,
497                trailing_stop: None,
498            },
499            status: HyperliquidOrderStatusEnum::Open,
500            status_timestamp: 1704470400000,
501        };
502
503        let result = parse_ws_order_status_report(&order_data, &instrument, account_id, ts_init);
504        assert!(result.is_ok());
505
506        let report = result.unwrap();
507        assert_eq!(report.order_side, OrderSide::Buy);
508        assert_eq!(report.order_type, OrderType::Limit);
509        assert_eq!(report.order_status, OrderStatus::Accepted);
510    }
511
512    #[rstest]
513    fn test_parse_ws_fill_report_basic() {
514        let instrument = create_test_instrument();
515        let account_id = AccountId::new("HYPERLIQUID-001");
516        let ts_init = UnixNanos::default();
517
518        let fill_data = WsFillData {
519            coin: Ustr::from("BTC"),
520            px: "50000.0".to_string(),
521            sz: "0.1".to_string(),
522            side: HyperliquidSide::Buy,
523            time: 1704470400000,
524            start_position: "0.0".to_string(),
525            dir: HyperliquidFillDirection::OpenLong,
526            closed_pnl: "0.0".to_string(),
527            hash: "0xabc123".to_string(),
528            oid: 12345,
529            crossed: true,
530            fee: "0.05".to_string(),
531            tid: 98765,
532            liquidation: None,
533            fee_token: "USDC".to_string(),
534            builder_fee: None,
535            cloid: Some("0xd211f1c27288259290850338d22132a0".to_string()),
536            twap_id: None,
537        };
538
539        let result = parse_ws_fill_report(&fill_data, &instrument, account_id, ts_init);
540        assert!(result.is_ok());
541
542        let report = result.unwrap();
543        assert_eq!(report.order_side, OrderSide::Buy);
544        assert_eq!(report.liquidity_side, LiquiditySide::Taker);
545    }
546
547    #[rstest]
548    fn test_parse_ws_order_book_deltas_snapshot_behavior() {
549        let instrument = create_test_instrument();
550        let ts_init = UnixNanos::default();
551
552        let book = WsBookData {
553            coin: Ustr::from("BTC"),
554            levels: [
555                vec![WsLevelData {
556                    px: "50000.0".to_string(),
557                    sz: "1.0".to_string(),
558                    n: 1,
559                }],
560                vec![WsLevelData {
561                    px: "50001.0".to_string(),
562                    sz: "2.0".to_string(),
563                    n: 1,
564                }],
565            ],
566            time: 1_704_470_400_000,
567        };
568
569        let deltas = parse_ws_order_book_deltas(&book, &instrument, ts_init).unwrap();
570
571        assert_eq!(deltas.deltas.len(), 3); // clear + bid + ask
572        assert_eq!(deltas.deltas[0].action, BookAction::Clear);
573
574        let bid_delta = &deltas.deltas[1];
575        assert_eq!(bid_delta.action, BookAction::Add);
576        assert_eq!(bid_delta.order.side, OrderSide::Buy);
577        assert!(bid_delta.order.size.is_positive());
578        assert_eq!(bid_delta.order.order_id, 0);
579
580        let ask_delta = &deltas.deltas[2];
581        assert_eq!(ask_delta.action, BookAction::Add);
582        assert_eq!(ask_delta.order.side, OrderSide::Sell);
583        assert!(ask_delta.order.size.is_positive());
584        assert_eq!(ask_delta.order.order_id, 0);
585    }
586
587    #[rstest]
588    fn test_parse_ws_asset_context_perp() {
589        let instrument = create_test_instrument();
590        let ts_init = UnixNanos::default();
591
592        let ctx_data = WsActiveAssetCtxData::Perp {
593            coin: Ustr::from("BTC"),
594            ctx: PerpsAssetCtx {
595                shared: SharedAssetCtx {
596                    day_ntl_vlm: "1000000.0".to_string(),
597                    prev_day_px: "49000.0".to_string(),
598                    mark_px: "50000.0".to_string(),
599                    mid_px: Some("50001.0".to_string()),
600                    impact_pxs: Some(vec!["50000.0".to_string(), "50002.0".to_string()]),
601                    day_base_vlm: Some("100.0".to_string()),
602                },
603                funding: "0.0001".to_string(),
604                open_interest: "100000.0".to_string(),
605                oracle_px: "50005.0".to_string(),
606                premium: Some("-0.0001".to_string()),
607            },
608        };
609
610        let result = parse_ws_asset_context(&ctx_data, &instrument, ts_init);
611        assert!(result.is_ok());
612
613        let (mark_price, index_price, funding_rate) = result.unwrap();
614
615        assert_eq!(mark_price.instrument_id, instrument.id());
616        assert_eq!(mark_price.value.as_f64(), 50_000.0);
617
618        assert!(index_price.is_some());
619        let index = index_price.unwrap();
620        assert_eq!(index.instrument_id, instrument.id());
621        assert_eq!(index.value.as_f64(), 50_005.0);
622
623        assert!(funding_rate.is_some());
624        let funding = funding_rate.unwrap();
625        assert_eq!(funding.instrument_id, instrument.id());
626        assert_eq!(funding.rate.to_string(), "0.0001");
627    }
628
629    #[rstest]
630    fn test_parse_ws_asset_context_spot() {
631        let instrument = create_test_instrument();
632        let ts_init = UnixNanos::default();
633
634        let ctx_data = WsActiveAssetCtxData::Spot {
635            coin: Ustr::from("BTC"),
636            ctx: SpotAssetCtx {
637                shared: SharedAssetCtx {
638                    day_ntl_vlm: "1000000.0".to_string(),
639                    prev_day_px: "49000.0".to_string(),
640                    mark_px: "50000.0".to_string(),
641                    mid_px: Some("50001.0".to_string()),
642                    impact_pxs: Some(vec!["50000.0".to_string(), "50002.0".to_string()]),
643                    day_base_vlm: Some("100.0".to_string()),
644                },
645                circulating_supply: "19000000.0".to_string(),
646            },
647        };
648
649        let result = parse_ws_asset_context(&ctx_data, &instrument, ts_init);
650        assert!(result.is_ok());
651
652        let (mark_price, index_price, funding_rate) = result.unwrap();
653
654        assert_eq!(mark_price.instrument_id, instrument.id());
655        assert_eq!(mark_price.value.as_f64(), 50_000.0);
656        assert!(index_price.is_none());
657        assert!(funding_rate.is_none());
658    }
659}