1use std::{collections::HashSet, sync::Arc, time::Duration};
17
18use dashmap::DashMap;
19use futures_util::{Stream, future::BoxFuture};
20#[cfg(feature = "python")]
21use nautilus_core::python::to_pyruntime_err;
22use nautilus_core::time::get_atomic_clock_realtime;
23#[cfg(feature = "python")]
24use nautilus_model::{
25 data::{BarType, Data, OrderBookDeltas_API},
26 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
27};
28use nautilus_model::{
29 identifiers::{AccountId, InstrumentId},
30 instruments::{Instrument, InstrumentAny},
31};
32use nautilus_network::websocket::{WebSocketClient, WebSocketConfig, channel_message_handler};
33#[cfg(feature = "python")]
34use pyo3::{exceptions::PyRuntimeError, prelude::*};
35use tokio::sync::{RwLock, mpsc};
36use tokio_tungstenite::tungstenite::Message;
37use ustr::Ustr;
38
39#[cfg(feature = "python")]
40use crate::websocket::parse::{
41 parse_ws_candle, parse_ws_order_book_deltas, parse_ws_quote_tick, parse_ws_trade_tick,
42};
43use crate::{
44 http::error::{Error, Result as HyperliquidResult},
45 websocket::{
46 messages::{
47 ActionPayload, ExecutionReport, HyperliquidWsMessage, HyperliquidWsRequest,
48 NautilusWsMessage, PostRequest, PostResponsePayload, SubscriptionRequest,
49 },
50 parse::{parse_ws_fill_report, parse_ws_order_status_report},
51 post::{
52 PostBatcher, PostIds, PostLane, PostRouter, ScheduledPost, WsSender, lane_for_action,
53 },
54 },
55};
56
57#[derive(Debug, Clone, thiserror::Error)]
59pub enum HyperliquidError {
60 #[error("URL parsing failed: {0}")]
61 UrlParsing(String),
62
63 #[error("Message serialization failed: {0}")]
64 MessageSerialization(String),
65
66 #[error("Message deserialization failed: {0}")]
67 MessageDeserialization(String),
68
69 #[error("WebSocket connection failed: {0}")]
70 Connection(String),
71
72 #[error("Channel send failed: {0}")]
73 ChannelSend(String),
74}
75
76#[derive(Debug, Default)]
81pub struct HyperliquidCodec;
82
83impl HyperliquidCodec {
84 pub fn new() -> Self {
86 Self
87 }
88
89 pub fn validate_url(url: &str) -> Result<(), HyperliquidError> {
91 if url.starts_with("ws://") || url.starts_with("wss://") {
92 Ok(())
93 } else {
94 Err(HyperliquidError::UrlParsing(format!(
95 "URL must start with ws:// or wss://, was: {}",
96 url
97 )))
98 }
99 }
100
101 pub fn encode(&self, request: &HyperliquidWsRequest) -> Result<Vec<u8>, HyperliquidError> {
103 serde_json::to_vec(request).map_err(|e| {
104 HyperliquidError::MessageSerialization(format!("Failed to serialize request: {}", e))
105 })
106 }
107
108 pub fn decode(&self, data: &[u8]) -> Result<HyperliquidWsMessage, HyperliquidError> {
110 serde_json::from_slice(data).map_err(|e| {
111 HyperliquidError::MessageDeserialization(format!(
112 "Failed to deserialize message: {}",
113 e
114 ))
115 })
116 }
117}
118
119#[derive(Debug)]
124pub struct HyperliquidWebSocketInnerClient {
125 inner: Arc<WebSocketClient>,
126 rx_inbound: mpsc::Receiver<HyperliquidWsMessage>,
127 sent_subscriptions: HashSet<String>,
128 _reader_task: tokio::task::JoinHandle<()>,
129 post_router: Arc<PostRouter>,
130 post_ids: PostIds,
131 #[allow(dead_code, reason = "Reserved for future direct WebSocket operations")]
132 ws_sender: WsSender,
133 post_batcher: PostBatcher,
134}
135
136impl HyperliquidWebSocketInnerClient {
137 pub async fn connect(url: &str) -> anyhow::Result<Self> {
140 let (message_handler, mut raw_rx) = channel_message_handler();
142
143 let cfg = WebSocketConfig {
144 url: url.to_string(),
145 headers: vec![],
146 message_handler: Some(message_handler),
147 heartbeat: Some(20), heartbeat_msg: None, ping_handler: None,
150 reconnect_timeout_ms: Some(15_000),
151 reconnect_delay_initial_ms: Some(250),
152 reconnect_delay_max_ms: Some(5_000),
153 reconnect_backoff_factor: Some(2.0),
154 reconnect_jitter_ms: Some(200),
155 };
156
157 let client = Arc::new(WebSocketClient::connect(cfg, None, vec![], None).await?);
158 tracing::info!("Hyperliquid WebSocket connected: {}", url);
159
160 let post_router = PostRouter::new();
161 let post_ids = PostIds::new(1);
162 let (tx_inbound, rx_inbound) = mpsc::channel::<HyperliquidWsMessage>(1024);
163 let (tx_outbound, mut rx_outbound) = mpsc::channel::<HyperliquidWsRequest>(1024);
164
165 let ws_sender = WsSender::new(tx_outbound);
166
167 let post_router_for_reader = Arc::clone(&post_router);
169 let reader_task = tokio::spawn(async move {
170 while let Some(msg) = raw_rx.recv().await {
171 match msg {
172 Message::Text(txt) => {
173 tracing::debug!("Received WS text: {}", txt);
174 match serde_json::from_str::<HyperliquidWsMessage>(&txt) {
175 Ok(hl_msg) => {
176 if let HyperliquidWsMessage::Post { data } = &hl_msg {
177 post_router_for_reader.complete(data.clone()).await;
179 }
180 if let Err(e) = tx_inbound.send(hl_msg).await {
181 tracing::error!("Failed to send decoded message: {}", e);
182 break;
183 }
184 }
185 Err(err) => {
186 tracing::error!(
187 "Failed to decode Hyperliquid message: {} | text: {}",
188 err,
189 txt
190 );
191 }
192 }
193 }
194 Message::Binary(data) => {
195 tracing::debug!("Received binary message ({} bytes), ignoring", data.len())
196 }
197 Message::Ping(data) => {
198 tracing::debug!("Received ping frame ({} bytes)", data.len())
199 }
200 Message::Pong(data) => {
201 tracing::debug!("Received pong frame ({} bytes)", data.len())
202 }
203 Message::Close(close_frame) => {
204 tracing::info!("Received close frame: {:?}", close_frame);
205 break;
206 }
207 Message::Frame(_) => tracing::warn!("Received raw frame (unexpected)"),
208 }
209 }
210 tracing::info!("Hyperliquid WebSocket reader finished");
211 });
212
213 let client_for_sender = Arc::clone(&client);
215 tokio::spawn(async move {
216 while let Some(req) = rx_outbound.recv().await {
217 let json = match serde_json::to_string(&req) {
218 Ok(json) => json,
219 Err(e) => {
220 tracing::error!("Failed to serialize WS request: {}", e);
221 continue;
222 }
223 };
224 tracing::debug!("Sending WS message: {}", json);
225 if let Err(e) = client_for_sender.send_text(json, None).await {
226 tracing::error!("Failed to send WS message: {}", e);
227 break;
228 }
229 }
230 tracing::info!("WebSocket sender task finished");
231 });
232
233 let ws_sender_for_batcher = ws_sender.clone();
235
236 let send_fn =
237 move |req: HyperliquidWsRequest| -> BoxFuture<'static, HyperliquidResult<()>> {
238 let sender = ws_sender_for_batcher.clone();
239 Box::pin(async move { sender.send(req).await })
240 };
241
242 let post_batcher = PostBatcher::new(send_fn);
243
244 let hl_client = Self {
245 inner: client,
246 rx_inbound,
247 sent_subscriptions: HashSet::new(),
248 _reader_task: reader_task,
249 post_router,
250 post_ids,
251 ws_sender,
252 post_batcher,
253 };
254
255 Ok(hl_client)
256 }
257
258 pub async fn ws_send(&self, request: &HyperliquidWsRequest) -> anyhow::Result<()> {
260 let json = serde_json::to_string(request)?;
261 tracing::debug!("Sending WS message: {}", json);
262 self.inner
263 .send_text(json, None)
264 .await
265 .map_err(|e| anyhow::anyhow!(e))
266 }
267
268 pub async fn ws_send_once(&mut self, request: &HyperliquidWsRequest) -> anyhow::Result<()> {
270 let json = serde_json::to_string(request)?;
271 if self.sent_subscriptions.contains(&json) {
272 tracing::debug!("Skipping duplicate request: {}", json);
273 return Ok(());
274 }
275
276 tracing::debug!("Sending WS message: {}", json);
277 self.inner
278 .send_text(json.clone(), None)
279 .await
280 .map_err(|e| anyhow::anyhow!(e))?;
281
282 self.sent_subscriptions.insert(json);
283 Ok(())
284 }
285
286 pub async fn ws_subscribe(&mut self, subscription: SubscriptionRequest) -> anyhow::Result<()> {
288 let request = HyperliquidWsRequest::Subscribe { subscription };
289 self.ws_send_once(&request).await
290 }
291
292 pub async fn ws_unsubscribe(
294 &mut self,
295 subscription: SubscriptionRequest,
296 ) -> anyhow::Result<()> {
297 let request = HyperliquidWsRequest::Unsubscribe { subscription };
298 self.ws_send(&request).await
299 }
300
301 pub async fn ws_next_event(&mut self) -> Option<HyperliquidWsMessage> {
304 self.rx_inbound.recv().await
305 }
306
307 pub fn is_active(&self) -> bool {
309 self.inner.is_active()
310 }
311
312 pub fn is_reconnecting(&self) -> bool {
314 self.inner.is_reconnecting()
315 }
316
317 pub fn is_disconnecting(&self) -> bool {
319 self.inner.is_disconnecting()
320 }
321
322 pub fn is_closed(&self) -> bool {
324 self.inner.is_closed()
325 }
326
327 pub async fn ws_disconnect(&mut self) -> anyhow::Result<()> {
329 self.inner.disconnect().await;
330 Ok(())
331 }
332
333 async fn enqueue_post(
335 &self,
336 id: u64,
337 request: PostRequest,
338 lane: PostLane,
339 ) -> HyperliquidResult<()> {
340 self.post_batcher
341 .enqueue(ScheduledPost { id, request, lane })
342 .await
343 }
344
345 pub async fn post_info_raw(
347 &self,
348 payload: serde_json::Value,
349 timeout: Duration,
350 ) -> HyperliquidResult<PostResponsePayload> {
351 let id = self.post_ids.next();
352 let rx = self.post_router.register(id).await?;
353 self.enqueue_post(id, PostRequest::Info { payload }, PostLane::Normal)
354 .await?;
355 let resp = self.post_router.await_with_timeout(id, rx, timeout).await?;
356 Ok(resp.response)
357 }
358
359 pub async fn post_action_raw(
361 &self,
362 action: ActionPayload,
363 timeout: Duration,
364 ) -> HyperliquidResult<PostResponsePayload> {
365 let id = self.post_ids.next();
366 let rx = self.post_router.register(id).await?;
367 let lane = lane_for_action(&action.action);
368 self.enqueue_post(id, PostRequest::Action { payload: action }, lane)
369 .await?;
370 let resp = self.post_router.await_with_timeout(id, rx, timeout).await?;
371 Ok(resp.response)
372 }
373
374 pub async fn info_l2_book(
376 &self,
377 coin: &str,
378 timeout: Duration,
379 ) -> HyperliquidResult<crate::http::models::HyperliquidL2Book> {
380 let payload = match self
381 .post_info_raw(serde_json::json!({"type":"l2Book","coin":coin}), timeout)
382 .await?
383 {
384 PostResponsePayload::Info { payload } => payload,
385 PostResponsePayload::Error { payload } => return Err(Error::exchange(payload)),
386 PostResponsePayload::Action { .. } => {
387 return Err(Error::decode("expected info payload, was action"));
388 }
389 };
390 serde_json::from_value(payload).map_err(Error::Serde)
391 }
392}
393
394#[derive(Clone, Debug)]
399#[cfg_attr(
400 feature = "python",
401 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
402)]
403pub struct HyperliquidWebSocketClient {
404 inner: Arc<RwLock<Option<HyperliquidWebSocketInnerClient>>>,
405 url: String,
406 instruments: Arc<DashMap<InstrumentId, InstrumentAny>>,
407}
408
409impl HyperliquidWebSocketClient {
410 pub fn new(url: String) -> Self {
413 Self {
414 inner: Arc::new(RwLock::new(None)),
415 url,
416 instruments: Arc::new(DashMap::new()),
417 }
418 }
419
420 pub fn add_instrument(&self, instrument: InstrumentAny) {
422 self.instruments.insert(instrument.id(), instrument);
423 }
424
425 pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
427 self.instruments.get(id).map(|e| e.value().clone())
428 }
429
430 pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
432 self.instruments
433 .iter()
434 .find(|e| e.key().symbol == (*symbol).into())
435 .map(|e| e.value().clone())
436 }
437
438 pub async fn connect(url: &str) -> anyhow::Result<Self> {
440 let inner_client = HyperliquidWebSocketInnerClient::connect(url).await?;
441 Ok(Self {
442 inner: Arc::new(RwLock::new(Some(inner_client))),
443 url: url.to_string(),
444 instruments: Arc::new(DashMap::new()),
445 })
446 }
447
448 pub async fn ensure_connected(&self) -> anyhow::Result<()> {
450 let mut inner = self.inner.write().await;
451 if inner.is_none() {
452 let inner_client = HyperliquidWebSocketInnerClient::connect(&self.url).await?;
453 *inner = Some(inner_client);
454 }
455 Ok(())
456 }
457
458 pub async fn is_connected(&self) -> bool {
460 let inner = self.inner.read().await;
461 inner.is_some()
462 }
463
464 pub fn url(&self) -> &str {
466 &self.url
467 }
468
469 pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
473 self.ensure_connected().await?;
474 let subscription = SubscriptionRequest::OrderUpdates {
475 user: user.to_string(),
476 };
477 let mut inner = self.inner.write().await;
478 inner
479 .as_mut()
480 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
481 .ws_subscribe(subscription)
482 .await
483 }
484
485 pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
489 self.ensure_connected().await?;
490 let subscription = SubscriptionRequest::UserEvents {
491 user: user.to_string(),
492 };
493 let mut inner = self.inner.write().await;
494 inner
495 .as_mut()
496 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
497 .ws_subscribe(subscription)
498 .await
499 }
500
501 pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
503 self.subscribe_order_updates(user).await?;
504 self.subscribe_user_events(user).await?;
505 Ok(())
506 }
507
508 pub async fn subscribe_trades(&self, coin: Ustr) -> anyhow::Result<()> {
512 self.ensure_connected().await?;
513 let subscription = SubscriptionRequest::Trades { coin };
514 let mut inner = self.inner.write().await;
515 inner
516 .as_mut()
517 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
518 .ws_subscribe(subscription)
519 .await
520 }
521
522 pub async fn unsubscribe_trades(&self, coin: Ustr) -> anyhow::Result<()> {
526 self.ensure_connected().await?;
527 let subscription = SubscriptionRequest::Trades { coin };
528 let mut inner = self.inner.write().await;
529 inner
530 .as_mut()
531 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
532 .ws_unsubscribe(subscription)
533 .await
534 }
535
536 pub async fn subscribe_book(&self, coin: Ustr) -> anyhow::Result<()> {
540 self.ensure_connected().await?;
541 let subscription = SubscriptionRequest::L2Book {
542 coin,
543 n_sig_figs: None,
544 mantissa: None,
545 };
546 let mut inner = self.inner.write().await;
547 inner
548 .as_mut()
549 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
550 .ws_subscribe(subscription)
551 .await
552 }
553
554 pub async fn unsubscribe_book(&self, coin: Ustr) -> anyhow::Result<()> {
558 self.ensure_connected().await?;
559 let subscription = SubscriptionRequest::L2Book {
560 coin,
561 n_sig_figs: None,
562 mantissa: None,
563 };
564 let mut inner = self.inner.write().await;
565 inner
566 .as_mut()
567 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
568 .ws_unsubscribe(subscription)
569 .await
570 }
571
572 pub async fn subscribe_bbo(&self, coin: Ustr) -> anyhow::Result<()> {
576 self.ensure_connected().await?;
577 let subscription = SubscriptionRequest::Bbo { coin };
578 let mut inner = self.inner.write().await;
579 inner
580 .as_mut()
581 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
582 .ws_subscribe(subscription)
583 .await
584 }
585
586 pub async fn unsubscribe_bbo(&self, coin: Ustr) -> anyhow::Result<()> {
590 self.ensure_connected().await?;
591 let subscription = SubscriptionRequest::Bbo { coin };
592 let mut inner = self.inner.write().await;
593 inner
594 .as_mut()
595 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
596 .ws_unsubscribe(subscription)
597 .await
598 }
599
600 pub async fn subscribe_candle(&self, coin: Ustr, interval: String) -> anyhow::Result<()> {
604 self.ensure_connected().await?;
605 let subscription = SubscriptionRequest::Candle { coin, interval };
606 let mut inner = self.inner.write().await;
607 inner
608 .as_mut()
609 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
610 .ws_subscribe(subscription)
611 .await
612 }
613
614 pub async fn unsubscribe_candle(&self, coin: Ustr, interval: String) -> anyhow::Result<()> {
618 self.ensure_connected().await?;
619 let subscription = SubscriptionRequest::Candle { coin, interval };
620 let mut inner = self.inner.write().await;
621 inner
622 .as_mut()
623 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
624 .ws_unsubscribe(subscription)
625 .await
626 }
627
628 pub async fn next_event(&self) -> Option<HyperliquidWsMessage> {
631 let mut inner = self.inner.write().await;
632 if let Some(ref mut client) = *inner {
633 client.ws_next_event().await
634 } else {
635 None
636 }
637 }
638
639 pub async fn is_active(&self) -> bool {
641 let inner = self.inner.read().await;
642 inner.as_ref().is_some_and(|client| client.is_active())
643 }
644
645 pub async fn is_reconnecting(&self) -> bool {
647 let inner = self.inner.read().await;
648 inner
649 .as_ref()
650 .is_some_and(|client| client.is_reconnecting())
651 }
652
653 pub async fn is_disconnecting(&self) -> bool {
655 let inner = self.inner.read().await;
656 inner
657 .as_ref()
658 .is_some_and(|client| client.is_disconnecting())
659 }
660
661 pub async fn is_closed(&self) -> bool {
663 let inner = self.inner.read().await;
664 inner.as_ref().is_none_or(|client| client.is_closed())
665 }
666
667 pub async fn disconnect(&self) -> anyhow::Result<()> {
669 let mut inner = self.inner.write().await;
670 if let Some(ref mut client) = *inner {
671 client.ws_disconnect().await
672 } else {
673 Ok(())
674 }
675 }
676
677 pub async fn send_raw(&self, request: &HyperliquidWsRequest) -> anyhow::Result<()> {
681 self.ensure_connected().await?;
682 let mut inner = self.inner.write().await;
683 inner
684 .as_mut()
685 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
686 .ws_send(request)
687 .await
688 }
689
690 pub async fn info_l2_book(
694 &self,
695 coin: &str,
696 timeout: Duration,
697 ) -> HyperliquidResult<crate::http::models::HyperliquidL2Book> {
698 self.ensure_connected().await.map_err(|e| Error::Http {
699 status: 500,
700 message: e.to_string(),
701 })?;
702 let mut inner = self.inner.write().await;
703 inner
704 .as_mut()
705 .ok_or_else(|| Error::Http {
706 status: 500,
707 message: "Client not connected".to_string(),
708 })?
709 .info_l2_book(coin, timeout)
710 .await
711 }
712
713 pub async fn post_info_raw(
717 &self,
718 payload: serde_json::Value,
719 timeout: Duration,
720 ) -> HyperliquidResult<PostResponsePayload> {
721 self.ensure_connected().await.map_err(|e| Error::Http {
722 status: 500,
723 message: e.to_string(),
724 })?;
725 let mut inner = self.inner.write().await;
726 inner
727 .as_mut()
728 .ok_or_else(|| Error::Http {
729 status: 500,
730 message: "Client not connected".to_string(),
731 })?
732 .post_info_raw(payload, timeout)
733 .await
734 }
735
736 pub async fn post_action_raw(
740 &self,
741 action: ActionPayload,
742 timeout: Duration,
743 ) -> HyperliquidResult<PostResponsePayload> {
744 self.ensure_connected().await.map_err(|e| Error::Http {
745 status: 500,
746 message: e.to_string(),
747 })?;
748 let mut inner = self.inner.write().await;
749 inner
750 .as_mut()
751 .ok_or_else(|| Error::Http {
752 status: 500,
753 message: "Client not connected".to_string(),
754 })?
755 .post_action_raw(action, timeout)
756 .await
757 }
758
759 pub async fn stream_execution_messages(
779 &self,
780 account_id: AccountId,
781 user_address: String,
782 ) -> anyhow::Result<impl Stream<Item = NautilusWsMessage>> {
783 self.ensure_connected().await?;
785
786 self.subscribe_order_updates(&user_address).await?;
788 self.subscribe_user_events(&user_address).await?;
789
790 let client = self.clone();
791 let (tx, rx) = mpsc::unbounded_channel();
792
793 tokio::spawn(async move {
795 let clock = get_atomic_clock_realtime();
796
797 loop {
798 let event = client.next_event().await;
799
800 match event {
801 Some(msg) => {
802 match &msg {
803 HyperliquidWsMessage::OrderUpdates { data } => {
804 let mut exec_reports = Vec::new();
805
806 for order_update in data {
808 if let Some(instrument) =
809 client.get_instrument_by_symbol(&order_update.order.coin)
810 {
811 let ts_init = clock.get_time_ns();
812
813 match parse_ws_order_status_report(
814 order_update,
815 &instrument,
816 account_id,
817 ts_init,
818 ) {
819 Ok(report) => {
820 exec_reports.push(ExecutionReport::Order(report));
821 }
822 Err(e) => {
823 tracing::error!(
824 "Error parsing order update: {}",
825 e
826 );
827 }
828 }
829 } else {
830 tracing::warn!(
831 "No instrument found for symbol: {}",
832 order_update.order.coin
833 );
834 }
835 }
836
837 if !exec_reports.is_empty()
839 && let Err(e) =
840 tx.send(NautilusWsMessage::ExecutionReports(exec_reports))
841 {
842 tracing::error!("Failed to send execution reports: {}", e);
843 break;
844 }
845 }
846 HyperliquidWsMessage::UserEvents { data } => {
847 use crate::websocket::messages::WsUserEventData;
848
849 let ts_init = clock.get_time_ns();
850
851 match data {
852 WsUserEventData::Fills { fills } => {
853 let mut exec_reports = Vec::new();
854
855 for fill in fills {
857 if let Some(instrument) =
858 client.get_instrument_by_symbol(&fill.coin)
859 {
860 match parse_ws_fill_report(
861 fill,
862 &instrument,
863 account_id,
864 ts_init,
865 ) {
866 Ok(report) => {
867 exec_reports
868 .push(ExecutionReport::Fill(report));
869 }
870 Err(e) => {
871 tracing::error!(
872 "Error parsing fill: {}",
873 e
874 );
875 }
876 }
877 } else {
878 tracing::warn!(
879 "No instrument found for symbol: {}",
880 fill.coin
881 );
882 }
883 }
884
885 if !exec_reports.is_empty()
887 && let Err(e) = tx.send(
888 NautilusWsMessage::ExecutionReports(exec_reports),
889 )
890 {
891 tracing::error!("Failed to send fill reports: {}", e);
892 break;
893 }
894 }
895 _ => {
896 }
898 }
899 }
900 _ => {
901 }
903 }
904 }
905 None => {
906 break;
908 }
909 }
910 }
911 });
912
913 Ok(async_stream::stream! {
915 let mut rx = rx;
916 while let Some(msg) = rx.recv().await {
917 yield msg;
918 }
919 })
920 }
921}
922
923#[cfg(feature = "python")]
925#[pyo3::pymethods]
926impl HyperliquidWebSocketClient {
927 #[new]
928 #[pyo3(signature = (url))]
929 fn py_new(url: String) -> PyResult<Self> {
930 Ok(Self::new(url))
931 }
932
933 #[getter]
934 #[pyo3(name = "url")]
935 #[must_use]
936 pub fn py_url(&self) -> String {
937 self.url().to_string()
938 }
939
940 #[pyo3(name = "is_active")]
941 fn py_is_active<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
942 let client = self.clone();
943 pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(client.is_active().await) })
944 }
945
946 #[pyo3(name = "is_closed")]
947 fn py_is_closed<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
948 let client = self.clone();
949 pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(client.is_closed().await) })
950 }
951
952 #[pyo3(name = "connect")]
953 fn py_connect<'py>(
954 &self,
955 py: Python<'py>,
956 instruments: Vec<Py<PyAny>>,
957 callback: Py<PyAny>,
958 ) -> PyResult<Bound<'py, PyAny>> {
959 for inst in instruments {
961 let inst_any = pyobject_to_instrument_any(py, inst)?;
962 self.add_instrument(inst_any);
963 }
964
965 let client = self.clone();
966
967 pyo3_async_runtimes::tokio::future_into_py(py, async move {
968 client.ensure_connected().await.map_err(to_pyruntime_err)?;
969
970 tokio::spawn(async move {
972 let clock = get_atomic_clock_realtime();
973
974 loop {
975 let event = client.next_event().await;
976
977 match event {
978 Some(msg) => {
979 tracing::debug!("Received WebSocket message: {:?}", msg);
980
981 match msg {
983 HyperliquidWsMessage::Trades { data } => {
984 for trade in data {
985 if let Some(instrument) =
986 client.get_instrument_by_symbol(&trade.coin)
987 {
988 let ts_init = clock.get_time_ns();
989 match parse_ws_trade_tick(&trade, &instrument, ts_init)
990 {
991 Ok(tick) => {
992 Python::attach(|py| {
993 let py_obj = data_to_pycapsule(
994 py,
995 Data::Trade(tick),
996 );
997 if let Err(e) =
998 callback.bind(py).call1((py_obj,))
999 {
1000 tracing::error!(
1001 "Error calling Python callback: {}",
1002 e
1003 );
1004 }
1005 });
1006 }
1007 Err(e) => {
1008 tracing::error!(
1009 "Error parsing trade tick: {}",
1010 e
1011 );
1012 }
1013 }
1014 } else {
1015 tracing::warn!(
1016 "No instrument found for symbol: {}",
1017 trade.coin
1018 );
1019 }
1020 }
1021 }
1022 HyperliquidWsMessage::L2Book { data } => {
1023 if let Some(instrument) =
1024 client.get_instrument_by_symbol(&data.coin)
1025 {
1026 let ts_init = clock.get_time_ns();
1027 match parse_ws_order_book_deltas(
1028 &data,
1029 &instrument,
1030 ts_init,
1031 ) {
1032 Ok(deltas) => {
1033 Python::attach(|py| {
1034 let py_obj = data_to_pycapsule(
1035 py,
1036 Data::Deltas(OrderBookDeltas_API::new(
1037 deltas,
1038 )),
1039 );
1040 if let Err(e) =
1041 callback.bind(py).call1((py_obj,))
1042 {
1043 tracing::error!(
1044 "Error calling Python callback: {}",
1045 e
1046 );
1047 }
1048 });
1049 }
1050 Err(e) => {
1051 tracing::error!(
1052 "Error parsing order book deltas: {}",
1053 e
1054 );
1055 }
1056 }
1057 } else {
1058 tracing::warn!(
1059 "No instrument found for symbol: {}",
1060 data.coin
1061 );
1062 }
1063 }
1064 HyperliquidWsMessage::Bbo { data } => {
1065 if let Some(instrument) =
1066 client.get_instrument_by_symbol(&data.coin)
1067 {
1068 let ts_init = clock.get_time_ns();
1069 match parse_ws_quote_tick(&data, &instrument, ts_init) {
1070 Ok(quote) => {
1071 Python::attach(|py| {
1072 let py_obj =
1073 data_to_pycapsule(py, Data::Quote(quote));
1074 if let Err(e) =
1075 callback.bind(py).call1((py_obj,))
1076 {
1077 tracing::error!(
1078 "Error calling Python callback: {}",
1079 e
1080 );
1081 }
1082 });
1083 }
1084 Err(e) => {
1085 tracing::error!("Error parsing quote tick: {}", e);
1086 }
1087 }
1088 } else {
1089 tracing::warn!(
1090 "No instrument found for symbol: {}",
1091 data.coin
1092 );
1093 }
1094 }
1095 HyperliquidWsMessage::Candle { data } => {
1096 if let Some(instrument) =
1097 client.get_instrument_by_symbol(&data.s)
1098 {
1099 let ts_init = clock.get_time_ns();
1100 let bar_type_str =
1103 format!("{}-{}-LAST-EXTERNAL", instrument.id(), data.i);
1104 match bar_type_str.parse::<BarType>() {
1105 Ok(bar_type) => {
1106 match parse_ws_candle(
1107 &data,
1108 &instrument,
1109 &bar_type,
1110 ts_init,
1111 ) {
1112 Ok(bar) => {
1113 Python::attach(|py| {
1114 let py_obj = data_to_pycapsule(
1115 py,
1116 Data::Bar(bar),
1117 );
1118 if let Err(e) =
1119 callback.bind(py).call1((py_obj,))
1120 {
1121 tracing::error!(
1122 "Error calling Python callback: {}",
1123 e
1124 );
1125 }
1126 });
1127 }
1128 Err(e) => {
1129 tracing::error!(
1130 "Error parsing candle: {}",
1131 e
1132 );
1133 }
1134 }
1135 }
1136 Err(e) => {
1137 tracing::error!("Error creating bar type: {}", e);
1138 }
1139 }
1140 } else {
1141 tracing::warn!(
1142 "No instrument found for symbol: {}",
1143 data.s
1144 );
1145 }
1146 }
1147 HyperliquidWsMessage::OrderUpdates { data } => {
1148 for order_update in data {
1150 if let Some(instrument) = client
1151 .get_instrument_by_symbol(&order_update.order.coin)
1152 {
1153 let ts_init = clock.get_time_ns();
1154 let account_id =
1157 nautilus_model::identifiers::AccountId::new(
1158 "HYPERLIQUID-001",
1159 );
1160
1161 match parse_ws_order_status_report(
1162 &order_update,
1163 &instrument,
1164 account_id,
1165 ts_init,
1166 ) {
1167 Ok(report) => {
1168 tracing::info!(
1172 "Parsed order status report: order_id={}, status={:?}",
1173 report.venue_order_id,
1174 report.order_status
1175 );
1176 }
1177 Err(e) => {
1178 tracing::error!(
1179 "Error parsing order update: {}",
1180 e
1181 );
1182 }
1183 }
1184 } else {
1185 tracing::warn!(
1186 "No instrument found for symbol: {}",
1187 order_update.order.coin
1188 );
1189 }
1190 }
1191 }
1192 HyperliquidWsMessage::UserEvents { data } => {
1193 use crate::websocket::messages::WsUserEventData;
1194
1195 let account_id = nautilus_model::identifiers::AccountId::new(
1197 "HYPERLIQUID-001",
1198 );
1199 let ts_init = clock.get_time_ns();
1200
1201 match data {
1202 WsUserEventData::Fills { fills } => {
1203 for fill in fills {
1205 if let Some(instrument) =
1206 client.get_instrument_by_symbol(&fill.coin)
1207 {
1208 match parse_ws_fill_report(
1209 &fill,
1210 &instrument,
1211 account_id,
1212 ts_init,
1213 ) {
1214 Ok(report) => {
1215 tracing::info!(
1219 "Parsed fill report: trade_id={}, side={:?}, qty={}, price={}",
1220 report.trade_id,
1221 report.order_side,
1222 report.last_qty,
1223 report.last_px
1224 );
1225 }
1226 Err(e) => {
1227 tracing::error!(
1228 "Error parsing fill: {}",
1229 e
1230 );
1231 }
1232 }
1233 } else {
1234 tracing::warn!(
1235 "No instrument found for symbol: {}",
1236 fill.coin
1237 );
1238 }
1239 }
1240 }
1241 WsUserEventData::Funding { funding } => {
1242 tracing::debug!(
1243 "Received funding update: {:?}",
1244 funding
1245 );
1246 }
1249 WsUserEventData::Liquidation { liquidation } => {
1250 tracing::warn!(
1251 "Received liquidation event: {:?}",
1252 liquidation
1253 );
1254 }
1257 WsUserEventData::NonUserCancel { non_user_cancel } => {
1258 tracing::info!(
1259 "Received non-user cancel events: {:?}",
1260 non_user_cancel
1261 );
1262 }
1265 WsUserEventData::TriggerActivated { trigger_activated } => {
1266 tracing::debug!(
1267 "Trigger order activated: {:?}",
1268 trigger_activated
1269 );
1270 }
1273 WsUserEventData::TriggerTriggered { trigger_triggered } => {
1274 tracing::debug!(
1275 "Trigger order triggered: {:?}",
1276 trigger_triggered
1277 );
1278 }
1281 }
1282 }
1283 _ => {
1284 tracing::debug!("Unhandled message type: {:?}", msg);
1285 }
1286 }
1287 }
1288 None => {
1289 tracing::info!("WebSocket connection closed");
1290 break;
1291 }
1292 }
1293 }
1294 });
1295
1296 Ok(())
1297 })
1298 }
1299
1300 #[pyo3(name = "wait_until_active")]
1301 fn py_wait_until_active<'py>(
1302 &self,
1303 py: Python<'py>,
1304 timeout_secs: f64,
1305 ) -> PyResult<Bound<'py, PyAny>> {
1306 let client = self.clone();
1307
1308 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1309 let start = std::time::Instant::now();
1310 loop {
1311 if client.is_active().await {
1312 return Ok(());
1313 }
1314
1315 if start.elapsed().as_secs_f64() >= timeout_secs {
1316 return Err(PyRuntimeError::new_err(format!(
1317 "WebSocket connection did not become active within {} seconds",
1318 timeout_secs
1319 )));
1320 }
1321
1322 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1323 }
1324 })
1325 }
1326
1327 #[pyo3(name = "close")]
1328 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
1329 let client = self.clone();
1330
1331 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1332 if let Err(e) = client.disconnect().await {
1333 tracing::error!("Error on close: {e}");
1334 }
1335 Ok(())
1336 })
1337 }
1338
1339 #[pyo3(name = "subscribe_trades")]
1340 fn py_subscribe_trades<'py>(
1341 &self,
1342 py: Python<'py>,
1343 instrument_id: InstrumentId,
1344 ) -> PyResult<Bound<'py, PyAny>> {
1345 let client = self.clone();
1346 let coin = Ustr::from(instrument_id.symbol.as_str());
1347
1348 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1349 client
1350 .subscribe_trades(coin)
1351 .await
1352 .map_err(to_pyruntime_err)?;
1353 Ok(())
1354 })
1355 }
1356
1357 #[pyo3(name = "unsubscribe_trades")]
1358 fn py_unsubscribe_trades<'py>(
1359 &self,
1360 py: Python<'py>,
1361 instrument_id: InstrumentId,
1362 ) -> PyResult<Bound<'py, PyAny>> {
1363 let client = self.clone();
1364 let coin = Ustr::from(instrument_id.symbol.as_str());
1365
1366 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1367 client
1368 .unsubscribe_trades(coin)
1369 .await
1370 .map_err(to_pyruntime_err)?;
1371 Ok(())
1372 })
1373 }
1374
1375 #[pyo3(name = "subscribe_order_book_deltas")]
1376 fn py_subscribe_order_book_deltas<'py>(
1377 &self,
1378 py: Python<'py>,
1379 instrument_id: InstrumentId,
1380 _book_type: u8,
1381 _depth: u64,
1382 ) -> PyResult<Bound<'py, PyAny>> {
1383 let client = self.clone();
1384 let coin = Ustr::from(instrument_id.symbol.as_str());
1385
1386 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1387 client
1388 .subscribe_book(coin)
1389 .await
1390 .map_err(to_pyruntime_err)?;
1391 Ok(())
1392 })
1393 }
1394
1395 #[pyo3(name = "unsubscribe_order_book_deltas")]
1396 fn py_unsubscribe_order_book_deltas<'py>(
1397 &self,
1398 py: Python<'py>,
1399 instrument_id: InstrumentId,
1400 ) -> PyResult<Bound<'py, PyAny>> {
1401 let client = self.clone();
1402 let coin = Ustr::from(instrument_id.symbol.as_str());
1403
1404 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1405 client
1406 .unsubscribe_book(coin)
1407 .await
1408 .map_err(to_pyruntime_err)?;
1409 Ok(())
1410 })
1411 }
1412
1413 #[pyo3(name = "subscribe_order_book_snapshots")]
1414 fn py_subscribe_order_book_snapshots<'py>(
1415 &self,
1416 py: Python<'py>,
1417 instrument_id: InstrumentId,
1418 _book_type: u8,
1419 _depth: u64,
1420 ) -> PyResult<Bound<'py, PyAny>> {
1421 let client = self.clone();
1422 let coin = Ustr::from(instrument_id.symbol.as_str());
1423
1424 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1425 client
1426 .subscribe_book(coin)
1427 .await
1428 .map_err(to_pyruntime_err)?;
1429 Ok(())
1430 })
1431 }
1432
1433 #[pyo3(name = "subscribe_quotes")]
1434 fn py_subscribe_quotes<'py>(
1435 &self,
1436 py: Python<'py>,
1437 instrument_id: InstrumentId,
1438 ) -> PyResult<Bound<'py, PyAny>> {
1439 let client = self.clone();
1440 let coin = Ustr::from(instrument_id.symbol.as_str());
1441
1442 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1443 client.subscribe_bbo(coin).await.map_err(to_pyruntime_err)?;
1444 Ok(())
1445 })
1446 }
1447
1448 #[pyo3(name = "unsubscribe_quotes")]
1449 fn py_unsubscribe_quotes<'py>(
1450 &self,
1451 py: Python<'py>,
1452 instrument_id: InstrumentId,
1453 ) -> PyResult<Bound<'py, PyAny>> {
1454 let client = self.clone();
1455 let coin = Ustr::from(instrument_id.symbol.as_str());
1456
1457 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1458 client
1459 .unsubscribe_bbo(coin)
1460 .await
1461 .map_err(to_pyruntime_err)?;
1462 Ok(())
1463 })
1464 }
1465
1466 #[pyo3(name = "subscribe_bars")]
1467 fn py_subscribe_bars<'py>(
1468 &self,
1469 py: Python<'py>,
1470 bar_type: BarType,
1471 ) -> PyResult<Bound<'py, PyAny>> {
1472 let client = self.clone();
1473 let coin = Ustr::from(bar_type.instrument_id().symbol.as_str());
1474 let interval = "1m".to_string();
1475
1476 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1477 client
1478 .subscribe_candle(coin, interval)
1479 .await
1480 .map_err(to_pyruntime_err)?;
1481 Ok(())
1482 })
1483 }
1484
1485 #[pyo3(name = "unsubscribe_bars")]
1486 fn py_unsubscribe_bars<'py>(
1487 &self,
1488 py: Python<'py>,
1489 bar_type: BarType,
1490 ) -> PyResult<Bound<'py, PyAny>> {
1491 let client = self.clone();
1492 let coin = Ustr::from(bar_type.instrument_id().symbol.as_str());
1493 let interval = "1m".to_string();
1494
1495 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1496 client
1497 .unsubscribe_candle(coin, interval)
1498 .await
1499 .map_err(to_pyruntime_err)?;
1500 Ok(())
1501 })
1502 }
1503
1504 #[pyo3(name = "subscribe_order_updates")]
1505 fn py_subscribe_order_updates<'py>(
1506 &self,
1507 py: Python<'py>,
1508 user: String,
1509 ) -> PyResult<Bound<'py, PyAny>> {
1510 let client = self.clone();
1511
1512 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1513 client
1514 .subscribe_order_updates(&user)
1515 .await
1516 .map_err(to_pyruntime_err)?;
1517 Ok(())
1518 })
1519 }
1520
1521 #[pyo3(name = "subscribe_user_events")]
1522 fn py_subscribe_user_events<'py>(
1523 &self,
1524 py: Python<'py>,
1525 user: String,
1526 ) -> PyResult<Bound<'py, PyAny>> {
1527 let client = self.clone();
1528
1529 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1530 client
1531 .subscribe_user_events(&user)
1532 .await
1533 .map_err(to_pyruntime_err)?;
1534 Ok(())
1535 })
1536 }
1537}