1use std::{collections::HashSet, sync::Arc, time::Duration};
17
18use dashmap::DashMap;
19use futures_util::{Stream, future::BoxFuture};
20#[cfg(feature = "python")]
21use nautilus_core::python::to_pyruntime_err;
22use nautilus_core::time::get_atomic_clock_realtime;
23#[cfg(feature = "python")]
24use nautilus_model::{
25 data::{BarType, Data, OrderBookDeltas_API},
26 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
27};
28use nautilus_model::{
29 identifiers::{AccountId, InstrumentId},
30 instruments::{Instrument, InstrumentAny},
31};
32use nautilus_network::websocket::{WebSocketClient, WebSocketConfig, channel_message_handler};
33#[cfg(feature = "python")]
34use pyo3::{exceptions::PyRuntimeError, prelude::*};
35use tokio::sync::{RwLock, mpsc};
36use tokio_tungstenite::tungstenite::Message;
37use ustr::Ustr;
38
39#[cfg(feature = "python")]
40use crate::websocket::parse::{
41 parse_ws_candle, parse_ws_order_book_deltas, parse_ws_quote_tick, parse_ws_trade_tick,
42};
43use crate::{
44 http::error::{Error, Result as HyperliquidResult},
45 websocket::{
46 messages::{
47 ActionPayload, ExecutionReport, HyperliquidWsMessage, HyperliquidWsRequest,
48 NautilusWsMessage, PostRequest, PostResponsePayload, SubscriptionRequest,
49 },
50 parse::{parse_ws_fill_report, parse_ws_order_status_report},
51 post::{
52 PostBatcher, PostIds, PostLane, PostRouter, ScheduledPost, WsSender, lane_for_action,
53 },
54 },
55};
56
57#[derive(Debug, Clone, thiserror::Error)]
59pub enum HyperliquidError {
60 #[error("URL parsing failed: {0}")]
61 UrlParsing(String),
62
63 #[error("Message serialization failed: {0}")]
64 MessageSerialization(String),
65
66 #[error("Message deserialization failed: {0}")]
67 MessageDeserialization(String),
68
69 #[error("WebSocket connection failed: {0}")]
70 Connection(String),
71
72 #[error("Channel send failed: {0}")]
73 ChannelSend(String),
74}
75
76#[derive(Debug, Default)]
81pub struct HyperliquidCodec;
82
83impl HyperliquidCodec {
84 pub fn new() -> Self {
86 Self
87 }
88
89 pub fn validate_url(url: &str) -> Result<(), HyperliquidError> {
91 if url.starts_with("ws://") || url.starts_with("wss://") {
92 Ok(())
93 } else {
94 Err(HyperliquidError::UrlParsing(format!(
95 "URL must start with ws:// or wss://, was: {}",
96 url
97 )))
98 }
99 }
100
101 pub fn encode(&self, request: &HyperliquidWsRequest) -> Result<Vec<u8>, HyperliquidError> {
103 serde_json::to_vec(request).map_err(|e| {
104 HyperliquidError::MessageSerialization(format!("Failed to serialize request: {}", e))
105 })
106 }
107
108 pub fn decode(&self, data: &[u8]) -> Result<HyperliquidWsMessage, HyperliquidError> {
110 serde_json::from_slice(data).map_err(|e| {
111 HyperliquidError::MessageDeserialization(format!(
112 "Failed to deserialize message: {}",
113 e
114 ))
115 })
116 }
117}
118
119#[derive(Debug)]
124pub struct HyperliquidWebSocketInnerClient {
125 inner: Arc<WebSocketClient>,
126 rx_inbound: mpsc::Receiver<HyperliquidWsMessage>,
127 sent_subscriptions: HashSet<String>,
128 _reader_task: tokio::task::JoinHandle<()>,
129 post_router: Arc<PostRouter>,
130 post_ids: PostIds,
131 #[allow(dead_code, reason = "Reserved for future direct WebSocket operations")]
132 ws_sender: WsSender,
133 post_batcher: PostBatcher,
134}
135
136impl HyperliquidWebSocketInnerClient {
137 pub async fn connect(url: &str) -> anyhow::Result<Self> {
140 let (message_handler, mut raw_rx) = channel_message_handler();
142
143 let cfg = WebSocketConfig {
144 url: url.to_string(),
145 headers: vec![],
146 message_handler: Some(message_handler),
147 heartbeat: Some(20), heartbeat_msg: None, ping_handler: None,
150 reconnect_timeout_ms: Some(15_000),
151 reconnect_delay_initial_ms: Some(250),
152 reconnect_delay_max_ms: Some(5_000),
153 reconnect_backoff_factor: Some(2.0),
154 reconnect_jitter_ms: Some(200),
155 };
156
157 let client = Arc::new(WebSocketClient::connect(cfg, None, vec![], None).await?);
158 tracing::info!("Hyperliquid WebSocket connected: {}", url);
159
160 let post_router = PostRouter::new();
161 let post_ids = PostIds::new(1);
162 let (tx_inbound, rx_inbound) = mpsc::channel::<HyperliquidWsMessage>(1024);
163 let (tx_outbound, mut rx_outbound) = mpsc::channel::<HyperliquidWsRequest>(1024);
164
165 let ws_sender = WsSender::new(tx_outbound);
166
167 let post_router_for_reader = Arc::clone(&post_router);
169 let reader_task = tokio::spawn(async move {
170 while let Some(msg) = raw_rx.recv().await {
171 match msg {
172 Message::Text(txt) => {
173 tracing::debug!("Received WS text: {}", txt);
174 match serde_json::from_str::<HyperliquidWsMessage>(&txt) {
175 Ok(hl_msg) => {
176 if let HyperliquidWsMessage::Post { data } = &hl_msg {
177 post_router_for_reader.complete(data.clone()).await;
179 }
180 if let Err(e) = tx_inbound.send(hl_msg).await {
181 tracing::error!("Failed to send decoded message: {}", e);
182 break;
183 }
184 }
185 Err(e) => {
186 tracing::error!(
187 "Failed to decode Hyperliquid message: {} | text: {}",
188 e,
189 txt
190 );
191 }
192 }
193 }
194 Message::Binary(data) => {
195 tracing::debug!("Received binary message ({} bytes), ignoring", data.len());
196 }
197 Message::Ping(data) => {
198 tracing::debug!("Received ping frame ({} bytes)", data.len());
199 }
200 Message::Pong(data) => {
201 tracing::debug!("Received pong frame ({} bytes)", data.len());
202 }
203 Message::Close(close_frame) => {
204 tracing::info!("Received close frame: {:?}", close_frame);
205 break;
206 }
207 Message::Frame(_) => tracing::warn!("Received raw frame (unexpected)"),
208 }
209 }
210 tracing::info!("Hyperliquid WebSocket reader finished");
211 });
212
213 let client_for_sender = Arc::clone(&client);
215 tokio::spawn(async move {
216 while let Some(req) = rx_outbound.recv().await {
217 let json = match serde_json::to_string(&req) {
218 Ok(json) => json,
219 Err(e) => {
220 tracing::error!("Failed to serialize WS request: {}", e);
221 continue;
222 }
223 };
224 tracing::debug!("Sending WS message: {}", json);
225 if let Err(e) = client_for_sender.send_text(json, None).await {
226 tracing::error!("Failed to send WS message: {}", e);
227 break;
228 }
229 }
230 tracing::info!("WebSocket sender task finished");
231 });
232
233 let ws_sender_for_batcher = ws_sender.clone();
235
236 let send_fn =
237 move |req: HyperliquidWsRequest| -> BoxFuture<'static, HyperliquidResult<()>> {
238 let sender = ws_sender_for_batcher.clone();
239 Box::pin(async move { sender.send(req).await })
240 };
241
242 let post_batcher = PostBatcher::new(send_fn);
243
244 let hl_client = Self {
245 inner: client,
246 rx_inbound,
247 sent_subscriptions: HashSet::new(),
248 _reader_task: reader_task,
249 post_router,
250 post_ids,
251 ws_sender,
252 post_batcher,
253 };
254
255 Ok(hl_client)
256 }
257
258 pub async fn ws_send(&self, request: &HyperliquidWsRequest) -> anyhow::Result<()> {
260 let json = serde_json::to_string(request)?;
261 tracing::debug!("Sending WS message: {}", json);
262 self.inner
263 .send_text(json, None)
264 .await
265 .map_err(|e| anyhow::anyhow!(e))
266 }
267
268 pub async fn ws_send_once(&mut self, request: &HyperliquidWsRequest) -> anyhow::Result<()> {
270 let json = serde_json::to_string(request)?;
271 if self.sent_subscriptions.contains(&json) {
272 tracing::debug!("Skipping duplicate request: {}", json);
273 return Ok(());
274 }
275
276 tracing::debug!("Sending WS message: {}", json);
277 self.inner
278 .send_text(json.clone(), None)
279 .await
280 .map_err(|e| anyhow::anyhow!(e))?;
281
282 self.sent_subscriptions.insert(json);
283 Ok(())
284 }
285
286 pub async fn ws_subscribe(&mut self, subscription: SubscriptionRequest) -> anyhow::Result<()> {
288 let request = HyperliquidWsRequest::Subscribe { subscription };
289 self.ws_send_once(&request).await
290 }
291
292 pub async fn ws_unsubscribe(
294 &mut self,
295 subscription: SubscriptionRequest,
296 ) -> anyhow::Result<()> {
297 let request = HyperliquidWsRequest::Unsubscribe { subscription };
298 self.ws_send(&request).await
299 }
300
301 pub async fn ws_next_event(&mut self) -> Option<HyperliquidWsMessage> {
304 self.rx_inbound.recv().await
305 }
306
307 pub fn is_active(&self) -> bool {
309 self.inner.is_active()
310 }
311
312 pub fn is_reconnecting(&self) -> bool {
314 self.inner.is_reconnecting()
315 }
316
317 pub fn is_disconnecting(&self) -> bool {
319 self.inner.is_disconnecting()
320 }
321
322 pub fn is_closed(&self) -> bool {
324 self.inner.is_closed()
325 }
326
327 pub async fn ws_disconnect(&mut self) -> anyhow::Result<()> {
329 self.inner.disconnect().await;
330 Ok(())
331 }
332
333 async fn enqueue_post(
335 &self,
336 id: u64,
337 request: PostRequest,
338 lane: PostLane,
339 ) -> HyperliquidResult<()> {
340 self.post_batcher
341 .enqueue(ScheduledPost { id, request, lane })
342 .await
343 }
344
345 pub async fn post_info_raw(
347 &self,
348 payload: serde_json::Value,
349 timeout: Duration,
350 ) -> HyperliquidResult<PostResponsePayload> {
351 let id = self.post_ids.next();
352 let rx = self.post_router.register(id).await?;
353 self.enqueue_post(id, PostRequest::Info { payload }, PostLane::Normal)
354 .await?;
355 let resp = self.post_router.await_with_timeout(id, rx, timeout).await?;
356 Ok(resp.response)
357 }
358
359 pub async fn post_action_raw(
361 &self,
362 action: ActionPayload,
363 timeout: Duration,
364 ) -> HyperliquidResult<PostResponsePayload> {
365 let id = self.post_ids.next();
366 let rx = self.post_router.register(id).await?;
367 let lane = lane_for_action(&action.action);
368 self.enqueue_post(id, PostRequest::Action { payload: action }, lane)
369 .await?;
370 let resp = self.post_router.await_with_timeout(id, rx, timeout).await?;
371 Ok(resp.response)
372 }
373
374 pub async fn info_l2_book(
376 &self,
377 coin: &str,
378 timeout: Duration,
379 ) -> HyperliquidResult<crate::http::models::HyperliquidL2Book> {
380 let payload = match self
381 .post_info_raw(serde_json::json!({"type":"l2Book","coin":coin}), timeout)
382 .await?
383 {
384 PostResponsePayload::Info { payload } => payload,
385 PostResponsePayload::Error { payload } => return Err(Error::exchange(payload)),
386 PostResponsePayload::Action { .. } => {
387 return Err(Error::decode("expected info payload, was action"));
388 }
389 };
390 serde_json::from_value(payload).map_err(Error::Serde)
391 }
392}
393
394#[derive(Clone, Debug)]
399#[cfg_attr(
400 feature = "python",
401 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
402)]
403pub struct HyperliquidWebSocketClient {
404 inner: Arc<RwLock<Option<HyperliquidWebSocketInnerClient>>>,
405 url: String,
406 instruments: Arc<DashMap<InstrumentId, InstrumentAny>>,
407 instruments_by_symbol: Arc<DashMap<Ustr, InstrumentId>>,
408}
409
410impl HyperliquidWebSocketClient {
411 pub fn new(url: String) -> Self {
414 Self {
415 inner: Arc::new(RwLock::new(None)),
416 url,
417 instruments: Arc::new(DashMap::new()),
418 instruments_by_symbol: Arc::new(DashMap::new()),
419 }
420 }
421
422 pub fn add_instrument(&self, instrument: InstrumentAny) {
424 let instrument_id = instrument.id();
426 self.instruments.insert(instrument_id, instrument);
427
428 let symbol = instrument_id.symbol.as_str();
430 if let Some(coin) = symbol.split('-').next() {
431 self.instruments_by_symbol
432 .insert(Ustr::from(coin), instrument_id);
433 }
434 }
435
436 pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
438 self.instruments.get(id).map(|e| e.value().clone())
439 }
440
441 pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
443 if let Some(id_entry) = self.instruments_by_symbol.get(symbol) {
445 let instrument_id = *id_entry.value();
446 if let Some(inst_entry) = self.instruments.get(&instrument_id) {
447 return Some(inst_entry.value().clone());
448 }
449 }
450
451 self.instruments
453 .iter()
454 .find(|e| e.key().symbol == (*symbol).into())
455 .map(|e| e.value().clone())
456 }
457
458 pub async fn connect(url: &str) -> anyhow::Result<Self> {
460 let inner_client = HyperliquidWebSocketInnerClient::connect(url).await?;
461 Ok(Self {
462 inner: Arc::new(RwLock::new(Some(inner_client))),
463 url: url.to_string(),
464 instruments: Arc::new(DashMap::new()),
465 instruments_by_symbol: Arc::new(DashMap::new()),
466 })
467 }
468
469 pub async fn ensure_connected(&self) -> anyhow::Result<()> {
471 let mut inner = self.inner.write().await;
472 if inner.is_none() {
473 let inner_client = HyperliquidWebSocketInnerClient::connect(&self.url).await?;
474 *inner = Some(inner_client);
475 }
476 Ok(())
477 }
478
479 pub async fn is_connected(&self) -> bool {
481 let inner = self.inner.read().await;
482 inner.is_some()
483 }
484
485 pub fn url(&self) -> &str {
487 &self.url
488 }
489
490 pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
494 self.ensure_connected().await?;
495 let subscription = SubscriptionRequest::OrderUpdates {
496 user: user.to_string(),
497 };
498 let mut inner = self.inner.write().await;
499 inner
500 .as_mut()
501 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
502 .ws_subscribe(subscription)
503 .await
504 }
505
506 pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
510 self.ensure_connected().await?;
511 let subscription = SubscriptionRequest::UserEvents {
512 user: user.to_string(),
513 };
514 let mut inner = self.inner.write().await;
515 inner
516 .as_mut()
517 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
518 .ws_subscribe(subscription)
519 .await
520 }
521
522 pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
524 self.subscribe_order_updates(user).await?;
525 self.subscribe_user_events(user).await?;
526 Ok(())
527 }
528
529 pub async fn subscribe_trades(&self, coin: Ustr) -> anyhow::Result<()> {
533 self.ensure_connected().await?;
534 let subscription = SubscriptionRequest::Trades { coin };
535 let mut inner = self.inner.write().await;
536 inner
537 .as_mut()
538 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
539 .ws_subscribe(subscription)
540 .await
541 }
542
543 pub async fn unsubscribe_trades(&self, coin: Ustr) -> anyhow::Result<()> {
547 self.ensure_connected().await?;
548 let subscription = SubscriptionRequest::Trades { coin };
549 let mut inner = self.inner.write().await;
550 inner
551 .as_mut()
552 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
553 .ws_unsubscribe(subscription)
554 .await
555 }
556
557 pub async fn subscribe_book(&self, coin: Ustr) -> anyhow::Result<()> {
561 self.ensure_connected().await?;
562 let subscription = SubscriptionRequest::L2Book {
563 coin,
564 n_sig_figs: None,
565 mantissa: None,
566 };
567 let mut inner = self.inner.write().await;
568 inner
569 .as_mut()
570 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
571 .ws_subscribe(subscription)
572 .await
573 }
574
575 pub async fn unsubscribe_book(&self, coin: Ustr) -> anyhow::Result<()> {
579 self.ensure_connected().await?;
580 let subscription = SubscriptionRequest::L2Book {
581 coin,
582 n_sig_figs: None,
583 mantissa: None,
584 };
585 let mut inner = self.inner.write().await;
586 inner
587 .as_mut()
588 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
589 .ws_unsubscribe(subscription)
590 .await
591 }
592
593 pub async fn subscribe_bbo(&self, coin: Ustr) -> anyhow::Result<()> {
597 self.ensure_connected().await?;
598 tracing::info!("Subscribing to BBO for coin: {}", coin);
599 let subscription = SubscriptionRequest::Bbo { coin };
600 let mut inner = self.inner.write().await;
601 inner
602 .as_mut()
603 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
604 .ws_subscribe(subscription)
605 .await
606 }
607
608 pub async fn unsubscribe_bbo(&self, coin: Ustr) -> anyhow::Result<()> {
612 self.ensure_connected().await?;
613 let subscription = SubscriptionRequest::Bbo { coin };
614 let mut inner = self.inner.write().await;
615 inner
616 .as_mut()
617 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
618 .ws_unsubscribe(subscription)
619 .await
620 }
621
622 pub async fn subscribe_candle(&self, coin: Ustr, interval: String) -> anyhow::Result<()> {
626 self.ensure_connected().await?;
627 let subscription = SubscriptionRequest::Candle { coin, interval };
628 let mut inner = self.inner.write().await;
629 inner
630 .as_mut()
631 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
632 .ws_subscribe(subscription)
633 .await
634 }
635
636 pub async fn unsubscribe_candle(&self, coin: Ustr, interval: String) -> anyhow::Result<()> {
640 self.ensure_connected().await?;
641 let subscription = SubscriptionRequest::Candle { coin, interval };
642 let mut inner = self.inner.write().await;
643 inner
644 .as_mut()
645 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
646 .ws_unsubscribe(subscription)
647 .await
648 }
649
650 pub async fn next_event(&self) -> Option<HyperliquidWsMessage> {
653 let mut inner = self.inner.write().await;
654 if let Some(ref mut client) = *inner {
655 client.ws_next_event().await
656 } else {
657 None
658 }
659 }
660
661 pub async fn is_active(&self) -> bool {
663 let inner = self.inner.read().await;
664 inner.as_ref().is_some_and(|client| client.is_active())
665 }
666
667 pub async fn is_reconnecting(&self) -> bool {
669 let inner = self.inner.read().await;
670 inner
671 .as_ref()
672 .is_some_and(|client| client.is_reconnecting())
673 }
674
675 pub async fn is_disconnecting(&self) -> bool {
677 let inner = self.inner.read().await;
678 inner
679 .as_ref()
680 .is_some_and(|client| client.is_disconnecting())
681 }
682
683 pub async fn is_closed(&self) -> bool {
685 let inner = self.inner.read().await;
686 inner.as_ref().is_none_or(|client| client.is_closed())
687 }
688
689 pub async fn disconnect(&self) -> anyhow::Result<()> {
691 let mut inner = self.inner.write().await;
692 if let Some(ref mut client) = *inner {
693 client.ws_disconnect().await
694 } else {
695 Ok(())
696 }
697 }
698
699 pub async fn send_raw(&self, request: &HyperliquidWsRequest) -> anyhow::Result<()> {
703 self.ensure_connected().await?;
704 let mut inner = self.inner.write().await;
705 inner
706 .as_mut()
707 .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
708 .ws_send(request)
709 .await
710 }
711
712 pub async fn info_l2_book(
716 &self,
717 coin: &str,
718 timeout: Duration,
719 ) -> HyperliquidResult<crate::http::models::HyperliquidL2Book> {
720 self.ensure_connected().await.map_err(|e| Error::Http {
721 status: 500,
722 message: e.to_string(),
723 })?;
724 let mut inner = self.inner.write().await;
725 inner
726 .as_mut()
727 .ok_or_else(|| Error::Http {
728 status: 500,
729 message: "Client not connected".to_string(),
730 })?
731 .info_l2_book(coin, timeout)
732 .await
733 }
734
735 pub async fn post_info_raw(
739 &self,
740 payload: serde_json::Value,
741 timeout: Duration,
742 ) -> HyperliquidResult<PostResponsePayload> {
743 self.ensure_connected().await.map_err(|e| Error::Http {
744 status: 500,
745 message: e.to_string(),
746 })?;
747 let mut inner = self.inner.write().await;
748 inner
749 .as_mut()
750 .ok_or_else(|| Error::Http {
751 status: 500,
752 message: "Client not connected".to_string(),
753 })?
754 .post_info_raw(payload, timeout)
755 .await
756 }
757
758 pub async fn post_action_raw(
762 &self,
763 action: ActionPayload,
764 timeout: Duration,
765 ) -> HyperliquidResult<PostResponsePayload> {
766 self.ensure_connected().await.map_err(|e| Error::Http {
767 status: 500,
768 message: e.to_string(),
769 })?;
770 let mut inner = self.inner.write().await;
771 inner
772 .as_mut()
773 .ok_or_else(|| Error::Http {
774 status: 500,
775 message: "Client not connected".to_string(),
776 })?
777 .post_action_raw(action, timeout)
778 .await
779 }
780
781 pub async fn stream_execution_messages(
801 &self,
802 account_id: AccountId,
803 user_address: String,
804 ) -> anyhow::Result<impl Stream<Item = NautilusWsMessage>> {
805 self.ensure_connected().await?;
807
808 self.subscribe_order_updates(&user_address).await?;
810 self.subscribe_user_events(&user_address).await?;
811
812 let client = self.clone();
813 let (tx, rx) = mpsc::unbounded_channel();
814
815 tokio::spawn(async move {
817 let clock = get_atomic_clock_realtime();
818
819 loop {
820 let event = client.next_event().await;
821
822 match event {
823 Some(msg) => {
824 match &msg {
825 HyperliquidWsMessage::OrderUpdates { data } => {
826 let mut exec_reports = Vec::new();
827
828 for order_update in data {
830 if let Some(instrument) =
831 client.get_instrument_by_symbol(&order_update.order.coin)
832 {
833 let ts_init = clock.get_time_ns();
834
835 match parse_ws_order_status_report(
836 order_update,
837 &instrument,
838 account_id,
839 ts_init,
840 ) {
841 Ok(report) => {
842 exec_reports.push(ExecutionReport::Order(report));
843 }
844 Err(e) => {
845 tracing::error!(
846 "Error parsing order update: {}",
847 e
848 );
849 }
850 }
851 } else {
852 tracing::warn!(
853 "No instrument found for symbol: {}",
854 order_update.order.coin
855 );
856 }
857 }
858
859 if !exec_reports.is_empty()
861 && let Err(e) =
862 tx.send(NautilusWsMessage::ExecutionReports(exec_reports))
863 {
864 tracing::error!("Failed to send execution reports: {}", e);
865 break;
866 }
867 }
868 HyperliquidWsMessage::UserEvents { data } => {
869 use crate::websocket::messages::WsUserEventData;
870
871 let ts_init = clock.get_time_ns();
872
873 match data {
874 WsUserEventData::Fills { fills } => {
875 let mut exec_reports = Vec::new();
876
877 for fill in fills {
879 if let Some(instrument) =
880 client.get_instrument_by_symbol(&fill.coin)
881 {
882 match parse_ws_fill_report(
883 fill,
884 &instrument,
885 account_id,
886 ts_init,
887 ) {
888 Ok(report) => {
889 exec_reports
890 .push(ExecutionReport::Fill(report));
891 }
892 Err(e) => {
893 tracing::error!(
894 "Error parsing fill: {}",
895 e
896 );
897 }
898 }
899 } else {
900 tracing::warn!(
901 "No instrument found for symbol: {}",
902 fill.coin
903 );
904 }
905 }
906
907 if !exec_reports.is_empty()
909 && let Err(e) = tx.send(
910 NautilusWsMessage::ExecutionReports(exec_reports),
911 )
912 {
913 tracing::error!("Failed to send fill reports: {}", e);
914 break;
915 }
916 }
917 _ => {
918 }
920 }
921 }
922 _ => {
923 }
925 }
926 }
927 None => {
928 break;
930 }
931 }
932 }
933 });
934
935 Ok(async_stream::stream! {
937 let mut rx = rx;
938 while let Some(msg) = rx.recv().await {
939 yield msg;
940 }
941 })
942 }
943}
944
945#[cfg(feature = "python")]
947#[pyo3::pymethods]
948impl HyperliquidWebSocketClient {
949 #[new]
950 #[pyo3(signature = (url))]
951 fn py_new(url: String) -> PyResult<Self> {
952 Ok(Self::new(url))
953 }
954
955 #[getter]
956 #[pyo3(name = "url")]
957 #[must_use]
958 pub fn py_url(&self) -> String {
959 self.url().to_string()
960 }
961
962 #[pyo3(name = "is_active")]
963 fn py_is_active<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
964 let client = self.clone();
965 pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(client.is_active().await) })
966 }
967
968 #[pyo3(name = "is_closed")]
969 fn py_is_closed<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
970 let client = self.clone();
971 pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(client.is_closed().await) })
972 }
973
974 #[pyo3(name = "connect")]
975 fn py_connect<'py>(
976 &self,
977 py: Python<'py>,
978 instruments: Vec<Py<PyAny>>,
979 callback: Py<PyAny>,
980 ) -> PyResult<Bound<'py, PyAny>> {
981 for inst in instruments {
983 let inst_any = pyobject_to_instrument_any(py, inst)?;
984 self.add_instrument(inst_any);
985 }
986
987 let client = self.clone();
988
989 pyo3_async_runtimes::tokio::future_into_py(py, async move {
990 client.ensure_connected().await.map_err(to_pyruntime_err)?;
991
992 tokio::spawn(async move {
994 let clock = get_atomic_clock_realtime();
995
996 loop {
997 let event = client.next_event().await;
998
999 match event {
1000 Some(msg) => {
1001 tracing::debug!("Received WebSocket message: {:?}", msg);
1002
1003 match msg {
1005 HyperliquidWsMessage::Trades { data } => {
1006 for trade in data {
1007 if let Some(instrument) =
1008 client.get_instrument_by_symbol(&trade.coin)
1009 {
1010 let ts_init = clock.get_time_ns();
1011 match parse_ws_trade_tick(&trade, &instrument, ts_init)
1012 {
1013 Ok(tick) => {
1014 Python::attach(|py| {
1015 let py_obj = data_to_pycapsule(
1016 py,
1017 Data::Trade(tick),
1018 );
1019 if let Err(e) =
1020 callback.bind(py).call1((py_obj,))
1021 {
1022 tracing::error!(
1023 "Error calling Python callback: {}",
1024 e
1025 );
1026 }
1027 });
1028 }
1029 Err(e) => {
1030 tracing::error!(
1031 "Error parsing trade tick: {}",
1032 e
1033 );
1034 }
1035 }
1036 } else {
1037 tracing::warn!(
1038 "No instrument found for symbol: {}",
1039 trade.coin
1040 );
1041 }
1042 }
1043 }
1044 HyperliquidWsMessage::L2Book { data } => {
1045 if let Some(instrument) =
1046 client.get_instrument_by_symbol(&data.coin)
1047 {
1048 let ts_init = clock.get_time_ns();
1049 match parse_ws_order_book_deltas(
1050 &data,
1051 &instrument,
1052 ts_init,
1053 ) {
1054 Ok(deltas) => {
1055 Python::attach(|py| {
1056 let py_obj = data_to_pycapsule(
1057 py,
1058 Data::Deltas(OrderBookDeltas_API::new(
1059 deltas,
1060 )),
1061 );
1062 if let Err(e) =
1063 callback.bind(py).call1((py_obj,))
1064 {
1065 tracing::error!(
1066 "Error calling Python callback: {}",
1067 e
1068 );
1069 }
1070 });
1071 }
1072 Err(e) => {
1073 tracing::error!(
1074 "Error parsing order book deltas: {}",
1075 e
1076 );
1077 }
1078 }
1079 } else {
1080 tracing::warn!(
1081 "No instrument found for symbol: {}",
1082 data.coin
1083 );
1084 }
1085 }
1086 HyperliquidWsMessage::Bbo { data } => {
1087 if let Some(instrument) =
1088 client.get_instrument_by_symbol(&data.coin)
1089 {
1090 let ts_init = clock.get_time_ns();
1091 match parse_ws_quote_tick(&data, &instrument, ts_init) {
1092 Ok(quote) => {
1093 Python::attach(|py| {
1094 let py_obj =
1095 data_to_pycapsule(py, Data::Quote(quote));
1096 if let Err(e) =
1097 callback.bind(py).call1((py_obj,))
1098 {
1099 tracing::error!(
1100 "Error calling Python callback: {}",
1101 e
1102 );
1103 }
1104 });
1105 }
1106 Err(e) => {
1107 tracing::error!("Error parsing quote tick: {}", e);
1108 }
1109 }
1110 } else {
1111 tracing::warn!(
1112 "No instrument found for symbol: {}",
1113 data.coin
1114 );
1115 }
1116 }
1117 HyperliquidWsMessage::Candle { data } => {
1118 if let Some(instrument) =
1119 client.get_instrument_by_symbol(&data.s)
1120 {
1121 let ts_init = clock.get_time_ns();
1122 let bar_type_str =
1125 format!("{}-{}-LAST-EXTERNAL", instrument.id(), data.i);
1126 match bar_type_str.parse::<BarType>() {
1127 Ok(bar_type) => {
1128 match parse_ws_candle(
1129 &data,
1130 &instrument,
1131 &bar_type,
1132 ts_init,
1133 ) {
1134 Ok(bar) => {
1135 Python::attach(|py| {
1136 let py_obj = data_to_pycapsule(
1137 py,
1138 Data::Bar(bar),
1139 );
1140 if let Err(e) =
1141 callback.bind(py).call1((py_obj,))
1142 {
1143 tracing::error!(
1144 "Error calling Python callback: {}",
1145 e
1146 );
1147 }
1148 });
1149 }
1150 Err(e) => {
1151 tracing::error!(
1152 "Error parsing candle: {}",
1153 e
1154 );
1155 }
1156 }
1157 }
1158 Err(e) => {
1159 tracing::error!("Error creating bar type: {}", e);
1160 }
1161 }
1162 } else {
1163 tracing::warn!(
1164 "No instrument found for symbol: {}",
1165 data.s
1166 );
1167 }
1168 }
1169 HyperliquidWsMessage::OrderUpdates { data } => {
1170 for order_update in data {
1172 if let Some(instrument) = client
1173 .get_instrument_by_symbol(&order_update.order.coin)
1174 {
1175 let ts_init = clock.get_time_ns();
1176 let account_id =
1179 nautilus_model::identifiers::AccountId::new(
1180 "HYPERLIQUID-001",
1181 );
1182
1183 match parse_ws_order_status_report(
1184 &order_update,
1185 &instrument,
1186 account_id,
1187 ts_init,
1188 ) {
1189 Ok(report) => {
1190 tracing::info!(
1194 "Parsed order status report: order_id={}, status={:?}",
1195 report.venue_order_id,
1196 report.order_status
1197 );
1198 }
1199 Err(e) => {
1200 tracing::error!(
1201 "Error parsing order update: {}",
1202 e
1203 );
1204 }
1205 }
1206 } else {
1207 tracing::warn!(
1208 "No instrument found for symbol: {}",
1209 order_update.order.coin
1210 );
1211 }
1212 }
1213 }
1214 HyperliquidWsMessage::UserEvents { data } => {
1215 use crate::websocket::messages::WsUserEventData;
1216
1217 let account_id = nautilus_model::identifiers::AccountId::new(
1219 "HYPERLIQUID-001",
1220 );
1221 let ts_init = clock.get_time_ns();
1222
1223 match data {
1224 WsUserEventData::Fills { fills } => {
1225 for fill in fills {
1227 if let Some(instrument) =
1228 client.get_instrument_by_symbol(&fill.coin)
1229 {
1230 match parse_ws_fill_report(
1231 &fill,
1232 &instrument,
1233 account_id,
1234 ts_init,
1235 ) {
1236 Ok(report) => {
1237 tracing::info!(
1241 "Parsed fill report: trade_id={}, side={:?}, qty={}, price={}",
1242 report.trade_id,
1243 report.order_side,
1244 report.last_qty,
1245 report.last_px
1246 );
1247 }
1248 Err(e) => {
1249 tracing::error!(
1250 "Error parsing fill: {}",
1251 e
1252 );
1253 }
1254 }
1255 } else {
1256 tracing::warn!(
1257 "No instrument found for symbol: {}",
1258 fill.coin
1259 );
1260 }
1261 }
1262 }
1263 WsUserEventData::Funding { funding } => {
1264 tracing::debug!(
1265 "Received funding update: {:?}",
1266 funding
1267 );
1268 }
1271 WsUserEventData::Liquidation { liquidation } => {
1272 tracing::warn!(
1273 "Received liquidation event: {:?}",
1274 liquidation
1275 );
1276 }
1279 WsUserEventData::NonUserCancel { non_user_cancel } => {
1280 tracing::info!(
1281 "Received non-user cancel events: {:?}",
1282 non_user_cancel
1283 );
1284 }
1287 WsUserEventData::TriggerActivated { trigger_activated } => {
1288 tracing::debug!(
1289 "Trigger order activated: {:?}",
1290 trigger_activated
1291 );
1292 }
1295 WsUserEventData::TriggerTriggered { trigger_triggered } => {
1296 tracing::debug!(
1297 "Trigger order triggered: {:?}",
1298 trigger_triggered
1299 );
1300 }
1303 }
1304 }
1305 _ => {
1306 tracing::debug!("Unhandled message type: {:?}", msg);
1307 }
1308 }
1309 }
1310 None => {
1311 tracing::info!("WebSocket connection closed");
1312 break;
1313 }
1314 }
1315 }
1316 });
1317
1318 Ok(())
1319 })
1320 }
1321
1322 #[pyo3(name = "wait_until_active")]
1323 fn py_wait_until_active<'py>(
1324 &self,
1325 py: Python<'py>,
1326 timeout_secs: f64,
1327 ) -> PyResult<Bound<'py, PyAny>> {
1328 let client = self.clone();
1329
1330 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1331 let start = std::time::Instant::now();
1332 loop {
1333 if client.is_active().await {
1334 return Ok(());
1335 }
1336
1337 if start.elapsed().as_secs_f64() >= timeout_secs {
1338 return Err(PyRuntimeError::new_err(format!(
1339 "WebSocket connection did not become active within {} seconds",
1340 timeout_secs
1341 )));
1342 }
1343
1344 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1345 }
1346 })
1347 }
1348
1349 #[pyo3(name = "close")]
1350 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
1351 let client = self.clone();
1352
1353 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1354 if let Err(e) = client.disconnect().await {
1355 tracing::error!("Error on close: {e}");
1356 }
1357 Ok(())
1358 })
1359 }
1360
1361 #[pyo3(name = "subscribe_trades")]
1362 fn py_subscribe_trades<'py>(
1363 &self,
1364 py: Python<'py>,
1365 instrument_id: InstrumentId,
1366 ) -> PyResult<Bound<'py, PyAny>> {
1367 let client = self.clone();
1368 let coin = Ustr::from(instrument_id.symbol.as_str());
1369
1370 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1371 client
1372 .subscribe_trades(coin)
1373 .await
1374 .map_err(to_pyruntime_err)?;
1375 Ok(())
1376 })
1377 }
1378
1379 #[pyo3(name = "unsubscribe_trades")]
1380 fn py_unsubscribe_trades<'py>(
1381 &self,
1382 py: Python<'py>,
1383 instrument_id: InstrumentId,
1384 ) -> PyResult<Bound<'py, PyAny>> {
1385 let client = self.clone();
1386 let coin = Ustr::from(instrument_id.symbol.as_str());
1387
1388 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1389 client
1390 .unsubscribe_trades(coin)
1391 .await
1392 .map_err(to_pyruntime_err)?;
1393 Ok(())
1394 })
1395 }
1396
1397 #[pyo3(name = "subscribe_order_book_deltas")]
1398 fn py_subscribe_order_book_deltas<'py>(
1399 &self,
1400 py: Python<'py>,
1401 instrument_id: InstrumentId,
1402 _book_type: u8,
1403 _depth: u64,
1404 ) -> PyResult<Bound<'py, PyAny>> {
1405 let client = self.clone();
1406 let coin = Ustr::from(instrument_id.symbol.as_str());
1407
1408 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1409 client
1410 .subscribe_book(coin)
1411 .await
1412 .map_err(to_pyruntime_err)?;
1413 Ok(())
1414 })
1415 }
1416
1417 #[pyo3(name = "unsubscribe_order_book_deltas")]
1418 fn py_unsubscribe_order_book_deltas<'py>(
1419 &self,
1420 py: Python<'py>,
1421 instrument_id: InstrumentId,
1422 ) -> PyResult<Bound<'py, PyAny>> {
1423 let client = self.clone();
1424 let coin = Ustr::from(instrument_id.symbol.as_str());
1425
1426 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1427 client
1428 .unsubscribe_book(coin)
1429 .await
1430 .map_err(to_pyruntime_err)?;
1431 Ok(())
1432 })
1433 }
1434
1435 #[pyo3(name = "subscribe_order_book_snapshots")]
1436 fn py_subscribe_order_book_snapshots<'py>(
1437 &self,
1438 py: Python<'py>,
1439 instrument_id: InstrumentId,
1440 _book_type: u8,
1441 _depth: u64,
1442 ) -> PyResult<Bound<'py, PyAny>> {
1443 let client = self.clone();
1444 let coin = Ustr::from(instrument_id.symbol.as_str());
1445
1446 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1447 client
1448 .subscribe_book(coin)
1449 .await
1450 .map_err(to_pyruntime_err)?;
1451 Ok(())
1452 })
1453 }
1454
1455 #[pyo3(name = "subscribe_quotes")]
1456 fn py_subscribe_quotes<'py>(
1457 &self,
1458 py: Python<'py>,
1459 instrument_id: InstrumentId,
1460 ) -> PyResult<Bound<'py, PyAny>> {
1461 let client = self.clone();
1462 let coin_str = instrument_id
1464 .symbol
1465 .as_str()
1466 .split('-')
1467 .next()
1468 .ok_or_else(|| {
1469 PyErr::new::<pyo3::exceptions::PyValueError, _>("Invalid instrument symbol")
1470 })?;
1471 let coin = Ustr::from(coin_str);
1472
1473 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1474 client.subscribe_bbo(coin).await.map_err(to_pyruntime_err)?;
1475 Ok(())
1476 })
1477 }
1478
1479 #[pyo3(name = "unsubscribe_quotes")]
1480 fn py_unsubscribe_quotes<'py>(
1481 &self,
1482 py: Python<'py>,
1483 instrument_id: InstrumentId,
1484 ) -> PyResult<Bound<'py, PyAny>> {
1485 let client = self.clone();
1486 let coin_str = instrument_id
1488 .symbol
1489 .as_str()
1490 .split('-')
1491 .next()
1492 .ok_or_else(|| {
1493 PyErr::new::<pyo3::exceptions::PyValueError, _>("Invalid instrument symbol")
1494 })?;
1495 let coin = Ustr::from(coin_str);
1496
1497 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1498 client
1499 .unsubscribe_bbo(coin)
1500 .await
1501 .map_err(to_pyruntime_err)?;
1502 Ok(())
1503 })
1504 }
1505
1506 #[pyo3(name = "subscribe_bars")]
1507 fn py_subscribe_bars<'py>(
1508 &self,
1509 py: Python<'py>,
1510 bar_type: BarType,
1511 ) -> PyResult<Bound<'py, PyAny>> {
1512 let client = self.clone();
1513 let coin = Ustr::from(bar_type.instrument_id().symbol.as_str());
1514 let interval = "1m".to_string();
1515
1516 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1517 client
1518 .subscribe_candle(coin, interval)
1519 .await
1520 .map_err(to_pyruntime_err)?;
1521 Ok(())
1522 })
1523 }
1524
1525 #[pyo3(name = "unsubscribe_bars")]
1526 fn py_unsubscribe_bars<'py>(
1527 &self,
1528 py: Python<'py>,
1529 bar_type: BarType,
1530 ) -> PyResult<Bound<'py, PyAny>> {
1531 let client = self.clone();
1532 let coin = Ustr::from(bar_type.instrument_id().symbol.as_str());
1533 let interval = "1m".to_string();
1534
1535 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1536 client
1537 .unsubscribe_candle(coin, interval)
1538 .await
1539 .map_err(to_pyruntime_err)?;
1540 Ok(())
1541 })
1542 }
1543
1544 #[pyo3(name = "subscribe_order_updates")]
1545 fn py_subscribe_order_updates<'py>(
1546 &self,
1547 py: Python<'py>,
1548 user: String,
1549 ) -> PyResult<Bound<'py, PyAny>> {
1550 let client = self.clone();
1551
1552 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1553 client
1554 .subscribe_order_updates(&user)
1555 .await
1556 .map_err(to_pyruntime_err)?;
1557 Ok(())
1558 })
1559 }
1560
1561 #[pyo3(name = "subscribe_user_events")]
1562 fn py_subscribe_user_events<'py>(
1563 &self,
1564 py: Python<'py>,
1565 user: String,
1566 ) -> PyResult<Bound<'py, PyAny>> {
1567 let client = self.clone();
1568
1569 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1570 client
1571 .subscribe_user_events(&user)
1572 .await
1573 .map_err(to_pyruntime_err)?;
1574 Ok(())
1575 })
1576 }
1577}