1use std::str::FromStr;
19
20use anyhow::Context;
21use nautilus_core::{nanos::UnixNanos, uuid::UUID4};
22use nautilus_model::{
23 data::{
24 Bar, BarType, BookOrder, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate,
25 OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick,
26 },
27 enums::{
28 AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType, RecordFlag,
29 TimeInForce,
30 },
31 identifiers::{AccountId, ClientOrderId, TradeId, VenueOrderId},
32 instruments::{Instrument, InstrumentAny},
33 reports::{FillReport, OrderStatusReport},
34 types::{Currency, Money, Price, Quantity},
35};
36use rust_decimal::{
37 Decimal,
38 prelude::{FromPrimitive, ToPrimitive},
39};
40
41use super::messages::{
42 CandleData, WsActiveAssetCtxData, WsBboData, WsBookData, WsFillData, WsOrderData, WsTradeData,
43};
44use crate::common::parse::{
45 is_conditional_order_data, parse_millis_to_nanos, parse_trigger_order_type,
46};
47
48fn parse_price(
50 price_str: &str,
51 instrument: &InstrumentAny,
52 field_name: &str,
53) -> anyhow::Result<Price> {
54 let decimal = Decimal::from_str(price_str)
55 .with_context(|| format!("Failed to parse price from '{price_str}' for {field_name}"))?;
56
57 let value = decimal.to_f64().ok_or_else(|| {
58 anyhow::anyhow!(
59 "Failed to convert price '{}' to f64 for {} (out of range or too much precision)",
60 price_str,
61 field_name
62 )
63 })?;
64
65 Ok(Price::new(value, instrument.price_precision()))
66}
67
68fn parse_quantity(
70 quantity_str: &str,
71 instrument: &InstrumentAny,
72 field_name: &str,
73) -> anyhow::Result<Quantity> {
74 let decimal = Decimal::from_str(quantity_str).with_context(|| {
75 format!("Failed to parse quantity from '{quantity_str}' for {field_name}")
76 })?;
77
78 let value = decimal.abs().to_f64().ok_or_else(|| {
79 anyhow::anyhow!(
80 "Failed to convert quantity '{}' to f64 for {} (out of range or too much precision)",
81 quantity_str,
82 field_name
83 )
84 })?;
85
86 Ok(Quantity::new(value, instrument.size_precision()))
87}
88
89pub fn parse_ws_trade_tick(
91 trade: &WsTradeData,
92 instrument: &InstrumentAny,
93 ts_init: UnixNanos,
94) -> anyhow::Result<TradeTick> {
95 let price = parse_price(&trade.px, instrument, "trade.px")?;
96 let size = parse_quantity(&trade.sz, instrument, "trade.sz")?;
97 let aggressor = AggressorSide::from(trade.side);
98 let trade_id = TradeId::new_checked(trade.tid.to_string())
99 .context("invalid trade identifier in Hyperliquid trade message")?;
100 let ts_event = parse_millis_to_nanos(trade.time);
101
102 TradeTick::new_checked(
103 instrument.id(),
104 price,
105 size,
106 aggressor,
107 trade_id,
108 ts_event,
109 ts_init,
110 )
111 .context("failed to construct TradeTick from Hyperliquid trade message")
112}
113
114pub fn parse_ws_order_book_deltas(
116 book: &WsBookData,
117 instrument: &InstrumentAny,
118 ts_init: UnixNanos,
119) -> anyhow::Result<OrderBookDeltas> {
120 let ts_event = parse_millis_to_nanos(book.time);
121 let mut deltas = Vec::new();
122
123 deltas.push(OrderBookDelta::clear(instrument.id(), 0, ts_event, ts_init));
125
126 for level in &book.levels[0] {
128 let price = parse_price(&level.px, instrument, "book.bid.px")?;
129 let size = parse_quantity(&level.sz, instrument, "book.bid.sz")?;
130
131 if !size.is_positive() {
132 continue;
133 }
134
135 let order = BookOrder::new(OrderSide::Buy, price, size, 0);
136
137 let delta = OrderBookDelta::new(
138 instrument.id(),
139 BookAction::Add,
140 order,
141 RecordFlag::F_LAST as u8,
142 0, ts_event,
144 ts_init,
145 );
146
147 deltas.push(delta);
148 }
149
150 for level in &book.levels[1] {
152 let price = parse_price(&level.px, instrument, "book.ask.px")?;
153 let size = parse_quantity(&level.sz, instrument, "book.ask.sz")?;
154
155 if !size.is_positive() {
156 continue;
157 }
158
159 let order = BookOrder::new(OrderSide::Sell, price, size, 0);
160
161 let delta = OrderBookDelta::new(
162 instrument.id(),
163 BookAction::Add,
164 order,
165 RecordFlag::F_LAST as u8,
166 0, ts_event,
168 ts_init,
169 );
170
171 deltas.push(delta);
172 }
173
174 Ok(OrderBookDeltas::new(instrument.id(), deltas))
175}
176
177pub fn parse_ws_quote_tick(
179 bbo: &WsBboData,
180 instrument: &InstrumentAny,
181 ts_init: UnixNanos,
182) -> anyhow::Result<QuoteTick> {
183 let bid_level = bbo.bbo[0]
184 .as_ref()
185 .context("BBO message missing bid level")?;
186 let ask_level = bbo.bbo[1]
187 .as_ref()
188 .context("BBO message missing ask level")?;
189
190 let bid_price = parse_price(&bid_level.px, instrument, "bbo.bid.px")?;
191 let ask_price = parse_price(&ask_level.px, instrument, "bbo.ask.px")?;
192 let bid_size = parse_quantity(&bid_level.sz, instrument, "bbo.bid.sz")?;
193 let ask_size = parse_quantity(&ask_level.sz, instrument, "bbo.ask.sz")?;
194
195 let ts_event = parse_millis_to_nanos(bbo.time);
196
197 QuoteTick::new_checked(
198 instrument.id(),
199 bid_price,
200 ask_price,
201 bid_size,
202 ask_size,
203 ts_event,
204 ts_init,
205 )
206 .context("failed to construct QuoteTick from Hyperliquid BBO message")
207}
208
209pub fn parse_ws_candle(
211 candle: &CandleData,
212 instrument: &InstrumentAny,
213 bar_type: &BarType,
214 ts_init: UnixNanos,
215) -> anyhow::Result<Bar> {
216 let open = parse_price(&candle.o, instrument, "candle.o")?;
217 let high = parse_price(&candle.h, instrument, "candle.h")?;
218 let low = parse_price(&candle.l, instrument, "candle.l")?;
219 let close = parse_price(&candle.c, instrument, "candle.c")?;
220 let volume = parse_quantity(&candle.v, instrument, "candle.v")?;
221
222 let ts_event = parse_millis_to_nanos(candle.t);
223
224 Ok(Bar::new(
225 *bar_type, open, high, low, close, volume, ts_event, ts_init,
226 ))
227}
228
229pub fn parse_ws_order_status_report(
234 order: &WsOrderData,
235 instrument: &InstrumentAny,
236 account_id: AccountId,
237 ts_init: UnixNanos,
238) -> anyhow::Result<OrderStatusReport> {
239 let instrument_id = instrument.id();
240 let venue_order_id = VenueOrderId::new(order.order.oid.to_string());
241 let order_side = OrderSide::from(order.order.side);
242
243 let order_type = if is_conditional_order_data(
245 order.order.trigger_px.as_deref(),
246 order.order.tpsl.as_ref(),
247 ) {
248 if let (Some(is_market), Some(tpsl)) = (order.order.is_market, order.order.tpsl.as_ref()) {
249 parse_trigger_order_type(is_market, tpsl)
250 } else {
251 OrderType::Limit }
253 } else {
254 OrderType::Limit };
256
257 let time_in_force = TimeInForce::Gtc;
258 let order_status = OrderStatus::from(order.status);
259 let quantity = parse_quantity(&order.order.sz, instrument, "order.sz")?;
260
261 let orig_qty = parse_quantity(&order.order.orig_sz, instrument, "order.orig_sz")?;
263 let filled_qty = Quantity::from_raw(
264 orig_qty.raw.saturating_sub(quantity.raw),
265 instrument.size_precision(),
266 );
267
268 let price = parse_price(&order.order.limit_px, instrument, "order.limitPx")?;
269
270 let ts_accepted = parse_millis_to_nanos(order.order.timestamp);
271 let ts_last = parse_millis_to_nanos(order.status_timestamp);
272
273 let mut report = OrderStatusReport::new(
274 account_id,
275 instrument_id,
276 None, venue_order_id,
278 order_side,
279 order_type,
280 time_in_force,
281 order_status,
282 quantity,
283 filled_qty,
284 ts_accepted,
285 ts_last,
286 ts_init,
287 Some(UUID4::new()),
288 );
289
290 if let Some(ref cloid) = order.order.cloid {
291 report = report.with_client_order_id(ClientOrderId::new(cloid.as_str()));
292 }
293
294 report = report.with_price(price);
295
296 if let Some(ref trigger_px_str) = order.order.trigger_px {
297 let trigger_price = parse_price(trigger_px_str, instrument, "order.triggerPx")?;
298 report = report.with_trigger_price(trigger_price);
299 }
300
301 Ok(report)
302}
303
304pub fn parse_ws_fill_report(
308 fill: &WsFillData,
309 instrument: &InstrumentAny,
310 account_id: AccountId,
311 ts_init: UnixNanos,
312) -> anyhow::Result<FillReport> {
313 let instrument_id = instrument.id();
314 let venue_order_id = VenueOrderId::new(fill.oid.to_string());
315 let trade_id = TradeId::new_checked(fill.tid.to_string())
316 .context("invalid trade identifier in Hyperliquid fill message")?;
317
318 let order_side = OrderSide::from(fill.side);
319 let last_qty = parse_quantity(&fill.sz, instrument, "fill.sz")?;
320 let last_px = parse_price(&fill.px, instrument, "fill.px")?;
321 let liquidity_side = if fill.crossed {
322 LiquiditySide::Taker
323 } else {
324 LiquiditySide::Maker
325 };
326
327 let commission_amount = Decimal::from_str(&fill.fee)
328 .with_context(|| format!("Failed to parse fee='{}' as decimal", fill.fee))?
329 .abs()
330 .to_string()
331 .parse::<f64>()
332 .unwrap_or(0.0);
333
334 let commission_currency = if fill.fee_token == "USDC" {
335 Currency::from("USDC")
336 } else {
337 instrument.quote_currency()
339 };
340
341 let commission = Money::new(commission_amount, commission_currency);
342 let ts_event = parse_millis_to_nanos(fill.time);
343
344 let client_order_id = None;
346
347 Ok(FillReport::new(
348 account_id,
349 instrument_id,
350 venue_order_id,
351 trade_id,
352 order_side,
353 last_qty,
354 last_px,
355 commission,
356 liquidity_side,
357 client_order_id,
358 None, ts_event,
360 ts_init,
361 None, ))
363}
364
365pub fn parse_ws_asset_context(
371 ctx: &WsActiveAssetCtxData,
372 instrument: &InstrumentAny,
373 ts_init: UnixNanos,
374) -> anyhow::Result<(
375 MarkPriceUpdate,
376 Option<IndexPriceUpdate>,
377 Option<FundingRateUpdate>,
378)> {
379 let instrument_id = instrument.id();
380
381 match ctx {
382 WsActiveAssetCtxData::Perp { coin: _, ctx } => {
383 let mark_px_f64 = ctx
384 .shared
385 .mark_px
386 .parse::<f64>()
387 .context("Failed to parse mark_px as f64")?;
388 let mark_price = parse_f64_price(mark_px_f64, instrument, "ctx.mark_px")?;
389 let mark_price_update =
390 MarkPriceUpdate::new(instrument_id, mark_price, ts_init, ts_init);
391
392 let oracle_px_f64 = ctx
393 .oracle_px
394 .parse::<f64>()
395 .context("Failed to parse oracle_px as f64")?;
396 let index_price = parse_f64_price(oracle_px_f64, instrument, "ctx.oracle_px")?;
397 let index_price_update =
398 IndexPriceUpdate::new(instrument_id, index_price, ts_init, ts_init);
399
400 let funding_f64 = ctx
401 .funding
402 .parse::<f64>()
403 .context("Failed to parse funding as f64")?;
404 let funding_rate_decimal = Decimal::from_f64(funding_f64)
405 .context("Failed to convert funding rate to Decimal")?;
406 let funding_rate_update = FundingRateUpdate::new(
407 instrument_id,
408 funding_rate_decimal,
409 None, ts_init,
411 ts_init,
412 );
413
414 Ok((
415 mark_price_update,
416 Some(index_price_update),
417 Some(funding_rate_update),
418 ))
419 }
420 WsActiveAssetCtxData::Spot { coin: _, ctx } => {
421 let mark_px_f64 = ctx
422 .shared
423 .mark_px
424 .parse::<f64>()
425 .context("Failed to parse mark_px as f64")?;
426 let mark_price = parse_f64_price(mark_px_f64, instrument, "ctx.mark_px")?;
427 let mark_price_update =
428 MarkPriceUpdate::new(instrument_id, mark_price, ts_init, ts_init);
429
430 Ok((mark_price_update, None, None))
431 }
432 }
433}
434
435fn parse_f64_price(
437 price: f64,
438 instrument: &InstrumentAny,
439 field_name: &str,
440) -> anyhow::Result<Price> {
441 if !price.is_finite() {
442 anyhow::bail!(
443 "Invalid price value for {}: {} (must be finite)",
444 field_name,
445 price
446 );
447 }
448 Ok(Price::new(price, instrument.price_precision()))
449}
450
451#[cfg(test)]
456mod tests {
457 use nautilus_model::{
458 identifiers::{InstrumentId, Symbol, Venue},
459 instruments::CryptoPerpetual,
460 types::currency::Currency,
461 };
462 use rstest::rstest;
463 use ustr::Ustr;
464
465 use super::*;
466 use crate::{
467 common::enums::{
468 HyperliquidFillDirection, HyperliquidOrderStatus as HyperliquidOrderStatusEnum,
469 HyperliquidSide,
470 },
471 websocket::messages::{
472 PerpsAssetCtx, SharedAssetCtx, SpotAssetCtx, WsBasicOrderData, WsBookData, WsLevelData,
473 },
474 };
475
476 fn create_test_instrument() -> InstrumentAny {
477 let instrument_id = InstrumentId::new(Symbol::new("BTC-PERP"), Venue::new("HYPERLIQUID"));
478
479 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
480 instrument_id,
481 Symbol::new("BTC-PERP"),
482 Currency::from("BTC"),
483 Currency::from("USDC"),
484 Currency::from("USDC"),
485 false, 2, 3, Price::from("0.01"),
489 Quantity::from("0.001"),
490 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
503 UnixNanos::default(),
504 ))
505 }
506
507 #[rstest]
508 fn test_parse_ws_order_status_report_basic() {
509 let instrument = create_test_instrument();
510 let account_id = AccountId::new("HYPERLIQUID-001");
511 let ts_init = UnixNanos::default();
512
513 let order_data = WsOrderData {
514 order: WsBasicOrderData {
515 coin: Ustr::from("BTC"),
516 side: HyperliquidSide::Buy,
517 limit_px: "50000.0".to_string(),
518 sz: "0.5".to_string(),
519 oid: 12345,
520 timestamp: 1704470400000,
521 orig_sz: "1.0".to_string(),
522 cloid: Some("test-order-1".to_string()),
523 trigger_px: None,
524 is_market: None,
525 tpsl: None,
526 trigger_activated: None,
527 trailing_stop: None,
528 },
529 status: HyperliquidOrderStatusEnum::Open,
530 status_timestamp: 1704470400000,
531 };
532
533 let result = parse_ws_order_status_report(&order_data, &instrument, account_id, ts_init);
534 assert!(result.is_ok());
535
536 let report = result.unwrap();
537 assert_eq!(report.order_side, OrderSide::Buy);
538 assert_eq!(report.order_type, OrderType::Limit);
539 assert_eq!(
540 report.order_status,
541 nautilus_model::enums::OrderStatus::Accepted
542 );
543 }
544
545 #[rstest]
546 fn test_parse_ws_fill_report_basic() {
547 let instrument = create_test_instrument();
548 let account_id = AccountId::new("HYPERLIQUID-001");
549 let ts_init = UnixNanos::default();
550
551 let fill_data = WsFillData {
552 coin: Ustr::from("BTC"),
553 px: "50000.0".to_string(),
554 sz: "0.1".to_string(),
555 side: HyperliquidSide::Buy,
556 time: 1704470400000,
557 start_position: "0.0".to_string(),
558 dir: HyperliquidFillDirection::OpenLong,
559 closed_pnl: "0.0".to_string(),
560 hash: "0xabc123".to_string(),
561 oid: 12345,
562 crossed: true,
563 fee: "0.05".to_string(),
564 tid: 98765,
565 liquidation: None,
566 fee_token: "USDC".to_string(),
567 builder_fee: None,
568 };
569
570 let result = parse_ws_fill_report(&fill_data, &instrument, account_id, ts_init);
571 assert!(result.is_ok());
572
573 let report = result.unwrap();
574 assert_eq!(report.order_side, OrderSide::Buy);
575 assert_eq!(report.liquidity_side, LiquiditySide::Taker);
576 }
577
578 #[rstest]
579 fn test_parse_ws_order_book_deltas_snapshot_behavior() {
580 let instrument = create_test_instrument();
581 let ts_init = UnixNanos::default();
582
583 let book = WsBookData {
584 coin: Ustr::from("BTC"),
585 levels: [
586 vec![WsLevelData {
587 px: "50000.0".to_string(),
588 sz: "1.0".to_string(),
589 n: 1,
590 }],
591 vec![WsLevelData {
592 px: "50001.0".to_string(),
593 sz: "2.0".to_string(),
594 n: 1,
595 }],
596 ],
597 time: 1_704_470_400_000,
598 };
599
600 let deltas = parse_ws_order_book_deltas(&book, &instrument, ts_init).unwrap();
601
602 assert_eq!(deltas.deltas.len(), 3); assert_eq!(deltas.deltas[0].action, BookAction::Clear);
604
605 let bid_delta = &deltas.deltas[1];
606 assert_eq!(bid_delta.action, BookAction::Add);
607 assert_eq!(bid_delta.order.side, OrderSide::Buy);
608 assert!(bid_delta.order.size.is_positive());
609 assert_eq!(bid_delta.order.order_id, 0);
610
611 let ask_delta = &deltas.deltas[2];
612 assert_eq!(ask_delta.action, BookAction::Add);
613 assert_eq!(ask_delta.order.side, OrderSide::Sell);
614 assert!(ask_delta.order.size.is_positive());
615 assert_eq!(ask_delta.order.order_id, 0);
616 }
617
618 #[rstest]
619 fn test_parse_ws_asset_context_perp() {
620 let instrument = create_test_instrument();
621 let ts_init = UnixNanos::default();
622
623 let ctx_data = WsActiveAssetCtxData::Perp {
624 coin: Ustr::from("BTC"),
625 ctx: PerpsAssetCtx {
626 shared: SharedAssetCtx {
627 day_ntl_vlm: "1000000.0".to_string(),
628 prev_day_px: "49000.0".to_string(),
629 mark_px: "50000.0".to_string(),
630 mid_px: Some("50001.0".to_string()),
631 impact_pxs: Some(vec!["50000.0".to_string(), "50002.0".to_string()]),
632 day_base_vlm: Some("100.0".to_string()),
633 },
634 funding: "0.0001".to_string(),
635 open_interest: "100000.0".to_string(),
636 oracle_px: "50005.0".to_string(),
637 premium: Some("-0.0001".to_string()),
638 },
639 };
640
641 let result = parse_ws_asset_context(&ctx_data, &instrument, ts_init);
642 assert!(result.is_ok());
643
644 let (mark_price, index_price, funding_rate) = result.unwrap();
645
646 assert_eq!(mark_price.instrument_id, instrument.id());
647 assert_eq!(mark_price.value.as_f64(), 50_000.0);
648
649 assert!(index_price.is_some());
650 let index = index_price.unwrap();
651 assert_eq!(index.instrument_id, instrument.id());
652 assert_eq!(index.value.as_f64(), 50_005.0);
653
654 assert!(funding_rate.is_some());
655 let funding = funding_rate.unwrap();
656 assert_eq!(funding.instrument_id, instrument.id());
657 assert_eq!(funding.rate.to_string(), "0.0001");
658 }
659
660 #[rstest]
661 fn test_parse_ws_asset_context_spot() {
662 let instrument = create_test_instrument();
663 let ts_init = UnixNanos::default();
664
665 let ctx_data = WsActiveAssetCtxData::Spot {
666 coin: Ustr::from("BTC"),
667 ctx: SpotAssetCtx {
668 shared: SharedAssetCtx {
669 day_ntl_vlm: "1000000.0".to_string(),
670 prev_day_px: "49000.0".to_string(),
671 mark_px: "50000.0".to_string(),
672 mid_px: Some("50001.0".to_string()),
673 impact_pxs: Some(vec!["50000.0".to_string(), "50002.0".to_string()]),
674 day_base_vlm: Some("100.0".to_string()),
675 },
676 circulating_supply: "19000000.0".to_string(),
677 },
678 };
679
680 let result = parse_ws_asset_context(&ctx_data, &instrument, ts_init);
681 assert!(result.is_ok());
682
683 let (mark_price, index_price, funding_rate) = result.unwrap();
684
685 assert_eq!(mark_price.instrument_id, instrument.id());
686 assert_eq!(mark_price.value.as_f64(), 50_000.0);
687 assert!(index_price.is_none());
688 assert!(funding_rate.is_none());
689 }
690}