nautilus_hyperliquid/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parsing helpers for Hyperliquid WebSocket payloads.
17
18use std::str::FromStr;
19
20use anyhow::Context;
21use nautilus_core::{nanos::UnixNanos, uuid::UUID4};
22use nautilus_model::{
23    data::{
24        Bar, BarType, BookOrder, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate,
25        OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick,
26    },
27    enums::{
28        AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType, RecordFlag,
29        TimeInForce,
30    },
31    identifiers::{AccountId, ClientOrderId, TradeId, VenueOrderId},
32    instruments::{Instrument, InstrumentAny},
33    reports::{FillReport, OrderStatusReport},
34    types::{Currency, Money, Price, Quantity},
35};
36use rust_decimal::{
37    Decimal,
38    prelude::{FromPrimitive, ToPrimitive},
39};
40
41use super::messages::{
42    CandleData, WsActiveAssetCtxData, WsBboData, WsBookData, WsFillData, WsOrderData, WsTradeData,
43};
44use crate::common::parse::{
45    is_conditional_order_data, parse_millis_to_nanos, parse_trigger_order_type,
46};
47
48/// Helper to parse a price string with instrument precision.
49fn parse_price(
50    price_str: &str,
51    instrument: &InstrumentAny,
52    field_name: &str,
53) -> anyhow::Result<Price> {
54    let decimal = Decimal::from_str(price_str)
55        .with_context(|| format!("Failed to parse price from '{price_str}' for {field_name}"))?;
56
57    let value = decimal.to_f64().ok_or_else(|| {
58        anyhow::anyhow!(
59            "Failed to convert price '{price_str}' to f64 for {field_name} (out of range or too much precision)"
60        )
61    })?;
62
63    Ok(Price::new(value, instrument.price_precision()))
64}
65
66/// Helper to parse a quantity string with instrument precision.
67fn parse_quantity(
68    quantity_str: &str,
69    instrument: &InstrumentAny,
70    field_name: &str,
71) -> anyhow::Result<Quantity> {
72    let decimal = Decimal::from_str(quantity_str).with_context(|| {
73        format!("Failed to parse quantity from '{quantity_str}' for {field_name}")
74    })?;
75
76    let value = decimal.abs().to_f64().ok_or_else(|| {
77        anyhow::anyhow!(
78            "Failed to convert quantity '{quantity_str}' to f64 for {field_name} (out of range or too much precision)"
79        )
80    })?;
81
82    Ok(Quantity::new(value, instrument.size_precision()))
83}
84
85/// Parses a WebSocket trade frame into a [`TradeTick`].
86pub fn parse_ws_trade_tick(
87    trade: &WsTradeData,
88    instrument: &InstrumentAny,
89    ts_init: UnixNanos,
90) -> anyhow::Result<TradeTick> {
91    let price = parse_price(&trade.px, instrument, "trade.px")?;
92    let size = parse_quantity(&trade.sz, instrument, "trade.sz")?;
93    let aggressor = AggressorSide::from(trade.side);
94    let trade_id = TradeId::new_checked(trade.tid.to_string())
95        .context("invalid trade identifier in Hyperliquid trade message")?;
96    let ts_event = parse_millis_to_nanos(trade.time);
97
98    TradeTick::new_checked(
99        instrument.id(),
100        price,
101        size,
102        aggressor,
103        trade_id,
104        ts_event,
105        ts_init,
106    )
107    .context("failed to construct TradeTick from Hyperliquid trade message")
108}
109
110/// Parses a WebSocket L2 order book message into [`OrderBookDeltas`].
111pub fn parse_ws_order_book_deltas(
112    book: &WsBookData,
113    instrument: &InstrumentAny,
114    ts_init: UnixNanos,
115) -> anyhow::Result<OrderBookDeltas> {
116    let ts_event = parse_millis_to_nanos(book.time);
117    let mut deltas = Vec::new();
118
119    // Treat every book payload as a snapshot: clear existing depth and rebuild it
120    deltas.push(OrderBookDelta::clear(instrument.id(), 0, ts_event, ts_init));
121
122    // Parse bids
123    for level in &book.levels[0] {
124        let price = parse_price(&level.px, instrument, "book.bid.px")?;
125        let size = parse_quantity(&level.sz, instrument, "book.bid.sz")?;
126
127        if !size.is_positive() {
128            continue;
129        }
130
131        let order = BookOrder::new(OrderSide::Buy, price, size, 0);
132
133        let delta = OrderBookDelta::new(
134            instrument.id(),
135            BookAction::Add,
136            order,
137            RecordFlag::F_LAST as u8,
138            0, // sequence
139            ts_event,
140            ts_init,
141        );
142
143        deltas.push(delta);
144    }
145
146    // Parse asks
147    for level in &book.levels[1] {
148        let price = parse_price(&level.px, instrument, "book.ask.px")?;
149        let size = parse_quantity(&level.sz, instrument, "book.ask.sz")?;
150
151        if !size.is_positive() {
152            continue;
153        }
154
155        let order = BookOrder::new(OrderSide::Sell, price, size, 0);
156
157        let delta = OrderBookDelta::new(
158            instrument.id(),
159            BookAction::Add,
160            order,
161            RecordFlag::F_LAST as u8,
162            0, // sequence
163            ts_event,
164            ts_init,
165        );
166
167        deltas.push(delta);
168    }
169
170    Ok(OrderBookDeltas::new(instrument.id(), deltas))
171}
172
173/// Parses a WebSocket BBO (best bid/offer) message into a [`QuoteTick`].
174pub fn parse_ws_quote_tick(
175    bbo: &WsBboData,
176    instrument: &InstrumentAny,
177    ts_init: UnixNanos,
178) -> anyhow::Result<QuoteTick> {
179    let bid_level = bbo.bbo[0]
180        .as_ref()
181        .context("BBO message missing bid level")?;
182    let ask_level = bbo.bbo[1]
183        .as_ref()
184        .context("BBO message missing ask level")?;
185
186    let bid_price = parse_price(&bid_level.px, instrument, "bbo.bid.px")?;
187    let ask_price = parse_price(&ask_level.px, instrument, "bbo.ask.px")?;
188    let bid_size = parse_quantity(&bid_level.sz, instrument, "bbo.bid.sz")?;
189    let ask_size = parse_quantity(&ask_level.sz, instrument, "bbo.ask.sz")?;
190
191    let ts_event = parse_millis_to_nanos(bbo.time);
192
193    QuoteTick::new_checked(
194        instrument.id(),
195        bid_price,
196        ask_price,
197        bid_size,
198        ask_size,
199        ts_event,
200        ts_init,
201    )
202    .context("failed to construct QuoteTick from Hyperliquid BBO message")
203}
204
205/// Parses a WebSocket candle message into a [`Bar`].
206pub fn parse_ws_candle(
207    candle: &CandleData,
208    instrument: &InstrumentAny,
209    bar_type: &BarType,
210    ts_init: UnixNanos,
211) -> anyhow::Result<Bar> {
212    let open = parse_price(&candle.o, instrument, "candle.o")?;
213    let high = parse_price(&candle.h, instrument, "candle.h")?;
214    let low = parse_price(&candle.l, instrument, "candle.l")?;
215    let close = parse_price(&candle.c, instrument, "candle.c")?;
216    let volume = parse_quantity(&candle.v, instrument, "candle.v")?;
217
218    let ts_event = parse_millis_to_nanos(candle.t);
219
220    Ok(Bar::new(
221        *bar_type, open, high, low, close, volume, ts_event, ts_init,
222    ))
223}
224
225/// Parses a WebSocket order update message into an [`OrderStatusReport`].
226///
227/// This converts Hyperliquid order data from WebSocket into Nautilus order status reports.
228/// Handles both regular and conditional orders (stop/limit-if-touched).
229pub fn parse_ws_order_status_report(
230    order: &WsOrderData,
231    instrument: &InstrumentAny,
232    account_id: AccountId,
233    ts_init: UnixNanos,
234) -> anyhow::Result<OrderStatusReport> {
235    let instrument_id = instrument.id();
236    let venue_order_id = VenueOrderId::new(order.order.oid.to_string());
237    let order_side = OrderSide::from(order.order.side);
238
239    // Determine order type based on trigger info
240    let order_type = if is_conditional_order_data(
241        order.order.trigger_px.as_deref(),
242        order.order.tpsl.as_ref(),
243    ) {
244        if let (Some(is_market), Some(tpsl)) = (order.order.is_market, order.order.tpsl.as_ref()) {
245            parse_trigger_order_type(is_market, tpsl)
246        } else {
247            OrderType::Limit // fallback
248        }
249    } else {
250        OrderType::Limit // Regular limit order
251    };
252
253    let time_in_force = TimeInForce::Gtc;
254    let order_status = OrderStatus::from(order.status);
255    let quantity = parse_quantity(&order.order.sz, instrument, "order.sz")?;
256
257    // Calculate filled quantity (orig_sz - sz)
258    let orig_qty = parse_quantity(&order.order.orig_sz, instrument, "order.orig_sz")?;
259    let filled_qty = Quantity::from_raw(
260        orig_qty.raw.saturating_sub(quantity.raw),
261        instrument.size_precision(),
262    );
263
264    let price = parse_price(&order.order.limit_px, instrument, "order.limitPx")?;
265
266    let ts_accepted = parse_millis_to_nanos(order.order.timestamp);
267    let ts_last = parse_millis_to_nanos(order.status_timestamp);
268
269    let mut report = OrderStatusReport::new(
270        account_id,
271        instrument_id,
272        None, // venue_order_id_modified
273        venue_order_id,
274        order_side,
275        order_type,
276        time_in_force,
277        order_status,
278        quantity,
279        filled_qty,
280        ts_accepted,
281        ts_last,
282        ts_init,
283        Some(UUID4::new()),
284    );
285
286    if let Some(ref cloid) = order.order.cloid {
287        report = report.with_client_order_id(ClientOrderId::new(cloid.as_str()));
288    }
289
290    report = report.with_price(price);
291
292    if let Some(ref trigger_px_str) = order.order.trigger_px {
293        let trigger_price = parse_price(trigger_px_str, instrument, "order.triggerPx")?;
294        report = report.with_trigger_price(trigger_price);
295    }
296
297    Ok(report)
298}
299
300/// Parses a WebSocket fill message into a [`FillReport`].
301///
302/// This converts Hyperliquid fill data from WebSocket user events into Nautilus fill reports.
303pub fn parse_ws_fill_report(
304    fill: &WsFillData,
305    instrument: &InstrumentAny,
306    account_id: AccountId,
307    ts_init: UnixNanos,
308) -> anyhow::Result<FillReport> {
309    let instrument_id = instrument.id();
310    let venue_order_id = VenueOrderId::new(fill.oid.to_string());
311    let trade_id = TradeId::new_checked(fill.tid.to_string())
312        .context("invalid trade identifier in Hyperliquid fill message")?;
313
314    let order_side = OrderSide::from(fill.side);
315    let last_qty = parse_quantity(&fill.sz, instrument, "fill.sz")?;
316    let last_px = parse_price(&fill.px, instrument, "fill.px")?;
317    let liquidity_side = if fill.crossed {
318        LiquiditySide::Taker
319    } else {
320        LiquiditySide::Maker
321    };
322
323    let commission_amount = Decimal::from_str(&fill.fee)
324        .with_context(|| format!("Failed to parse fee='{}' as decimal", fill.fee))?
325        .abs()
326        .to_string()
327        .parse::<f64>()
328        .unwrap_or(0.0);
329
330    let commission_currency = if fill.fee_token == "USDC" {
331        Currency::from("USDC")
332    } else {
333        // Default to quote currency if fee_token is something else
334        instrument.quote_currency()
335    };
336
337    let commission = Money::new(commission_amount, commission_currency);
338    let ts_event = parse_millis_to_nanos(fill.time);
339
340    // No client order ID available in fill data directly
341    let client_order_id = None;
342
343    Ok(FillReport::new(
344        account_id,
345        instrument_id,
346        venue_order_id,
347        trade_id,
348        order_side,
349        last_qty,
350        last_px,
351        commission,
352        liquidity_side,
353        client_order_id,
354        None, // venue_position_id
355        ts_event,
356        ts_init,
357        None, // report_id
358    ))
359}
360
361/// Parses a WebSocket ActiveAssetCtx message into mark price, index price, and funding rate updates.
362///
363/// This converts Hyperliquid asset context data into Nautilus price and funding rate updates.
364/// Returns a tuple of (`MarkPriceUpdate`, `Option<IndexPriceUpdate>`, `Option<FundingRateUpdate>`).
365/// Index price and funding rate are only present for perpetual contracts.
366pub fn parse_ws_asset_context(
367    ctx: &WsActiveAssetCtxData,
368    instrument: &InstrumentAny,
369    ts_init: UnixNanos,
370) -> anyhow::Result<(
371    MarkPriceUpdate,
372    Option<IndexPriceUpdate>,
373    Option<FundingRateUpdate>,
374)> {
375    let instrument_id = instrument.id();
376
377    match ctx {
378        WsActiveAssetCtxData::Perp { coin: _, ctx } => {
379            let mark_px_f64 = ctx
380                .shared
381                .mark_px
382                .parse::<f64>()
383                .context("Failed to parse mark_px as f64")?;
384            let mark_price = parse_f64_price(mark_px_f64, instrument, "ctx.mark_px")?;
385            let mark_price_update =
386                MarkPriceUpdate::new(instrument_id, mark_price, ts_init, ts_init);
387
388            let oracle_px_f64 = ctx
389                .oracle_px
390                .parse::<f64>()
391                .context("Failed to parse oracle_px as f64")?;
392            let index_price = parse_f64_price(oracle_px_f64, instrument, "ctx.oracle_px")?;
393            let index_price_update =
394                IndexPriceUpdate::new(instrument_id, index_price, ts_init, ts_init);
395
396            let funding_f64 = ctx
397                .funding
398                .parse::<f64>()
399                .context("Failed to parse funding as f64")?;
400            let funding_rate_decimal = Decimal::from_f64(funding_f64)
401                .context("Failed to convert funding rate to Decimal")?;
402            let funding_rate_update = FundingRateUpdate::new(
403                instrument_id,
404                funding_rate_decimal,
405                None, // Hyperliquid doesn't provide next funding time in this message
406                ts_init,
407                ts_init,
408            );
409
410            Ok((
411                mark_price_update,
412                Some(index_price_update),
413                Some(funding_rate_update),
414            ))
415        }
416        WsActiveAssetCtxData::Spot { coin: _, ctx } => {
417            let mark_px_f64 = ctx
418                .shared
419                .mark_px
420                .parse::<f64>()
421                .context("Failed to parse mark_px as f64")?;
422            let mark_price = parse_f64_price(mark_px_f64, instrument, "ctx.mark_px")?;
423            let mark_price_update =
424                MarkPriceUpdate::new(instrument_id, mark_price, ts_init, ts_init);
425
426            Ok((mark_price_update, None, None))
427        }
428    }
429}
430
431/// Helper to parse an f64 price into a Price with instrument precision.
432fn parse_f64_price(
433    price: f64,
434    instrument: &InstrumentAny,
435    field_name: &str,
436) -> anyhow::Result<Price> {
437    if !price.is_finite() {
438        anyhow::bail!("Invalid price value for {field_name}: {price} (must be finite)");
439    }
440    Ok(Price::new(price, instrument.price_precision()))
441}
442
443#[cfg(test)]
444mod tests {
445    use nautilus_model::{
446        identifiers::{InstrumentId, Symbol, Venue},
447        instruments::CryptoPerpetual,
448        types::currency::Currency,
449    };
450    use rstest::rstest;
451    use ustr::Ustr;
452
453    use super::*;
454    use crate::{
455        common::enums::{
456            HyperliquidFillDirection, HyperliquidOrderStatus as HyperliquidOrderStatusEnum,
457            HyperliquidSide,
458        },
459        websocket::messages::{
460            PerpsAssetCtx, SharedAssetCtx, SpotAssetCtx, WsBasicOrderData, WsBookData, WsLevelData,
461        },
462    };
463
464    fn create_test_instrument() -> InstrumentAny {
465        let instrument_id = InstrumentId::new(Symbol::new("BTC-PERP"), Venue::new("HYPERLIQUID"));
466
467        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
468            instrument_id,
469            Symbol::new("BTC-PERP"),
470            Currency::from("BTC"),
471            Currency::from("USDC"),
472            Currency::from("USDC"),
473            false, // is_inverse
474            2,     // price_precision
475            3,     // size_precision
476            Price::from("0.01"),
477            Quantity::from("0.001"),
478            None, // multiplier
479            None, // lot_size
480            None, // max_quantity
481            None, // min_quantity
482            None, // max_notional
483            None, // min_notional
484            None, // max_price
485            None, // min_price
486            None, // margin_init
487            None, // margin_maint
488            None, // maker_fee
489            None, // taker_fee
490            UnixNanos::default(),
491            UnixNanos::default(),
492        ))
493    }
494
495    #[rstest]
496    fn test_parse_ws_order_status_report_basic() {
497        let instrument = create_test_instrument();
498        let account_id = AccountId::new("HYPERLIQUID-001");
499        let ts_init = UnixNanos::default();
500
501        let order_data = WsOrderData {
502            order: WsBasicOrderData {
503                coin: Ustr::from("BTC"),
504                side: HyperliquidSide::Buy,
505                limit_px: "50000.0".to_string(),
506                sz: "0.5".to_string(),
507                oid: 12345,
508                timestamp: 1704470400000,
509                orig_sz: "1.0".to_string(),
510                cloid: Some("test-order-1".to_string()),
511                trigger_px: None,
512                is_market: None,
513                tpsl: None,
514                trigger_activated: None,
515                trailing_stop: None,
516            },
517            status: HyperliquidOrderStatusEnum::Open,
518            status_timestamp: 1704470400000,
519        };
520
521        let result = parse_ws_order_status_report(&order_data, &instrument, account_id, ts_init);
522        assert!(result.is_ok());
523
524        let report = result.unwrap();
525        assert_eq!(report.order_side, OrderSide::Buy);
526        assert_eq!(report.order_type, OrderType::Limit);
527        assert_eq!(report.order_status, OrderStatus::Accepted);
528    }
529
530    #[rstest]
531    fn test_parse_ws_fill_report_basic() {
532        let instrument = create_test_instrument();
533        let account_id = AccountId::new("HYPERLIQUID-001");
534        let ts_init = UnixNanos::default();
535
536        let fill_data = WsFillData {
537            coin: Ustr::from("BTC"),
538            px: "50000.0".to_string(),
539            sz: "0.1".to_string(),
540            side: HyperliquidSide::Buy,
541            time: 1704470400000,
542            start_position: "0.0".to_string(),
543            dir: HyperliquidFillDirection::OpenLong,
544            closed_pnl: "0.0".to_string(),
545            hash: "0xabc123".to_string(),
546            oid: 12345,
547            crossed: true,
548            fee: "0.05".to_string(),
549            tid: 98765,
550            liquidation: None,
551            fee_token: "USDC".to_string(),
552            builder_fee: None,
553        };
554
555        let result = parse_ws_fill_report(&fill_data, &instrument, account_id, ts_init);
556        assert!(result.is_ok());
557
558        let report = result.unwrap();
559        assert_eq!(report.order_side, OrderSide::Buy);
560        assert_eq!(report.liquidity_side, LiquiditySide::Taker);
561    }
562
563    #[rstest]
564    fn test_parse_ws_order_book_deltas_snapshot_behavior() {
565        let instrument = create_test_instrument();
566        let ts_init = UnixNanos::default();
567
568        let book = WsBookData {
569            coin: Ustr::from("BTC"),
570            levels: [
571                vec![WsLevelData {
572                    px: "50000.0".to_string(),
573                    sz: "1.0".to_string(),
574                    n: 1,
575                }],
576                vec![WsLevelData {
577                    px: "50001.0".to_string(),
578                    sz: "2.0".to_string(),
579                    n: 1,
580                }],
581            ],
582            time: 1_704_470_400_000,
583        };
584
585        let deltas = parse_ws_order_book_deltas(&book, &instrument, ts_init).unwrap();
586
587        assert_eq!(deltas.deltas.len(), 3); // clear + bid + ask
588        assert_eq!(deltas.deltas[0].action, BookAction::Clear);
589
590        let bid_delta = &deltas.deltas[1];
591        assert_eq!(bid_delta.action, BookAction::Add);
592        assert_eq!(bid_delta.order.side, OrderSide::Buy);
593        assert!(bid_delta.order.size.is_positive());
594        assert_eq!(bid_delta.order.order_id, 0);
595
596        let ask_delta = &deltas.deltas[2];
597        assert_eq!(ask_delta.action, BookAction::Add);
598        assert_eq!(ask_delta.order.side, OrderSide::Sell);
599        assert!(ask_delta.order.size.is_positive());
600        assert_eq!(ask_delta.order.order_id, 0);
601    }
602
603    #[rstest]
604    fn test_parse_ws_asset_context_perp() {
605        let instrument = create_test_instrument();
606        let ts_init = UnixNanos::default();
607
608        let ctx_data = WsActiveAssetCtxData::Perp {
609            coin: Ustr::from("BTC"),
610            ctx: PerpsAssetCtx {
611                shared: SharedAssetCtx {
612                    day_ntl_vlm: "1000000.0".to_string(),
613                    prev_day_px: "49000.0".to_string(),
614                    mark_px: "50000.0".to_string(),
615                    mid_px: Some("50001.0".to_string()),
616                    impact_pxs: Some(vec!["50000.0".to_string(), "50002.0".to_string()]),
617                    day_base_vlm: Some("100.0".to_string()),
618                },
619                funding: "0.0001".to_string(),
620                open_interest: "100000.0".to_string(),
621                oracle_px: "50005.0".to_string(),
622                premium: Some("-0.0001".to_string()),
623            },
624        };
625
626        let result = parse_ws_asset_context(&ctx_data, &instrument, ts_init);
627        assert!(result.is_ok());
628
629        let (mark_price, index_price, funding_rate) = result.unwrap();
630
631        assert_eq!(mark_price.instrument_id, instrument.id());
632        assert_eq!(mark_price.value.as_f64(), 50_000.0);
633
634        assert!(index_price.is_some());
635        let index = index_price.unwrap();
636        assert_eq!(index.instrument_id, instrument.id());
637        assert_eq!(index.value.as_f64(), 50_005.0);
638
639        assert!(funding_rate.is_some());
640        let funding = funding_rate.unwrap();
641        assert_eq!(funding.instrument_id, instrument.id());
642        assert_eq!(funding.rate.to_string(), "0.0001");
643    }
644
645    #[rstest]
646    fn test_parse_ws_asset_context_spot() {
647        let instrument = create_test_instrument();
648        let ts_init = UnixNanos::default();
649
650        let ctx_data = WsActiveAssetCtxData::Spot {
651            coin: Ustr::from("BTC"),
652            ctx: SpotAssetCtx {
653                shared: SharedAssetCtx {
654                    day_ntl_vlm: "1000000.0".to_string(),
655                    prev_day_px: "49000.0".to_string(),
656                    mark_px: "50000.0".to_string(),
657                    mid_px: Some("50001.0".to_string()),
658                    impact_pxs: Some(vec!["50000.0".to_string(), "50002.0".to_string()]),
659                    day_base_vlm: Some("100.0".to_string()),
660                },
661                circulating_supply: "19000000.0".to_string(),
662            },
663        };
664
665        let result = parse_ws_asset_context(&ctx_data, &instrument, ts_init);
666        assert!(result.is_ok());
667
668        let (mark_price, index_price, funding_rate) = result.unwrap();
669
670        assert_eq!(mark_price.instrument_id, instrument.id());
671        assert_eq!(mark_price.value.as_f64(), 50_000.0);
672        assert!(index_price.is_none());
673        assert!(funding_rate.is_none());
674    }
675}