nautilus_hyperliquid/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashSet, sync::Arc, time::Duration};
17
18use dashmap::DashMap;
19use futures_util::{Stream, future::BoxFuture};
20#[cfg(feature = "python")]
21use nautilus_core::python::to_pyruntime_err;
22use nautilus_core::time::get_atomic_clock_realtime;
23#[cfg(feature = "python")]
24use nautilus_model::{
25    data::{BarType, Data, OrderBookDeltas_API},
26    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
27};
28use nautilus_model::{
29    identifiers::{AccountId, InstrumentId},
30    instruments::{Instrument, InstrumentAny},
31};
32use nautilus_network::websocket::{WebSocketClient, WebSocketConfig, channel_message_handler};
33#[cfg(feature = "python")]
34use pyo3::{exceptions::PyRuntimeError, prelude::*};
35use tokio::sync::{RwLock, mpsc};
36use tokio_tungstenite::tungstenite::Message;
37use ustr::Ustr;
38
39#[cfg(feature = "python")]
40use crate::websocket::parse::{
41    parse_ws_candle, parse_ws_order_book_deltas, parse_ws_quote_tick, parse_ws_trade_tick,
42};
43use crate::{
44    http::error::{Error, Result as HyperliquidResult},
45    websocket::{
46        messages::{
47            ActionPayload, ExecutionReport, HyperliquidWsMessage, HyperliquidWsRequest,
48            NautilusWsMessage, PostRequest, PostResponsePayload, SubscriptionRequest,
49        },
50        parse::{parse_ws_fill_report, parse_ws_order_status_report},
51        post::{
52            PostBatcher, PostIds, PostLane, PostRouter, ScheduledPost, WsSender, lane_for_action,
53        },
54    },
55};
56
57/// Errors that can occur during Hyperliquid WebSocket operations.
58#[derive(Debug, Clone, thiserror::Error)]
59pub enum HyperliquidError {
60    #[error("URL parsing failed: {0}")]
61    UrlParsing(String),
62
63    #[error("Message serialization failed: {0}")]
64    MessageSerialization(String),
65
66    #[error("Message deserialization failed: {0}")]
67    MessageDeserialization(String),
68
69    #[error("WebSocket connection failed: {0}")]
70    Connection(String),
71
72    #[error("Channel send failed: {0}")]
73    ChannelSend(String),
74}
75
76/// Codec for encoding and decoding Hyperliquid WebSocket messages.
77///
78/// This struct provides methods to validate URLs and serialize/deserialize messages
79/// according to the Hyperliquid WebSocket protocol.
80#[derive(Debug, Default)]
81pub struct HyperliquidCodec;
82
83impl HyperliquidCodec {
84    /// Creates a new Hyperliquid codec instance.
85    pub fn new() -> Self {
86        Self
87    }
88
89    /// Validates that a URL is a proper WebSocket URL.
90    pub fn validate_url(url: &str) -> Result<(), HyperliquidError> {
91        if url.starts_with("ws://") || url.starts_with("wss://") {
92            Ok(())
93        } else {
94            Err(HyperliquidError::UrlParsing(format!(
95                "URL must start with ws:// or wss://, was: {}",
96                url
97            )))
98        }
99    }
100
101    /// Encodes a WebSocket request to JSON bytes.
102    pub fn encode(&self, request: &HyperliquidWsRequest) -> Result<Vec<u8>, HyperliquidError> {
103        serde_json::to_vec(request).map_err(|e| {
104            HyperliquidError::MessageSerialization(format!("Failed to serialize request: {}", e))
105        })
106    }
107
108    /// Decodes JSON bytes to a WebSocket message.
109    pub fn decode(&self, data: &[u8]) -> Result<HyperliquidWsMessage, HyperliquidError> {
110        serde_json::from_slice(data).map_err(|e| {
111            HyperliquidError::MessageDeserialization(format!(
112                "Failed to deserialize message: {}",
113                e
114            ))
115        })
116    }
117}
118
119/// Low-level Hyperliquid WebSocket client that wraps Nautilus WebSocketClient.
120///
121/// This is the inner client that handles the transport layer and provides low-level
122/// WebSocket methods with `ws_*` prefixes.
123#[derive(Debug)]
124pub struct HyperliquidWebSocketInnerClient {
125    inner: Arc<WebSocketClient>,
126    rx_inbound: mpsc::Receiver<HyperliquidWsMessage>,
127    sent_subscriptions: HashSet<String>,
128    _reader_task: tokio::task::JoinHandle<()>,
129    post_router: Arc<PostRouter>,
130    post_ids: PostIds,
131    #[allow(dead_code, reason = "Reserved for future direct WebSocket operations")]
132    ws_sender: WsSender,
133    post_batcher: PostBatcher,
134}
135
136impl HyperliquidWebSocketInnerClient {
137    /// Creates a new Hyperliquid WebSocket inner client with reconnection/backoff/heartbeat.
138    /// Returns a client that owns the inbound message receiver.
139    pub async fn connect(url: &str) -> anyhow::Result<Self> {
140        // Create message handler for receiving raw WebSocket messages
141        let (message_handler, mut raw_rx) = channel_message_handler();
142
143        let cfg = WebSocketConfig {
144            url: url.to_string(),
145            headers: vec![],
146            message_handler: Some(message_handler),
147            heartbeat: Some(20), // seconds; set lower than server idle timeout
148            heartbeat_msg: None, // use WS Ping frames by default
149            ping_handler: None,
150            reconnect_timeout_ms: Some(15_000),
151            reconnect_delay_initial_ms: Some(250),
152            reconnect_delay_max_ms: Some(5_000),
153            reconnect_backoff_factor: Some(2.0),
154            reconnect_jitter_ms: Some(200),
155        };
156
157        let client = Arc::new(WebSocketClient::connect(cfg, None, vec![], None).await?);
158        tracing::info!("Hyperliquid WebSocket connected: {}", url);
159
160        let post_router = PostRouter::new();
161        let post_ids = PostIds::new(1);
162        let (tx_inbound, rx_inbound) = mpsc::channel::<HyperliquidWsMessage>(1024);
163        let (tx_outbound, mut rx_outbound) = mpsc::channel::<HyperliquidWsRequest>(1024);
164
165        let ws_sender = WsSender::new(tx_outbound);
166
167        // Reader task: decode messages and route post replies *before* handing to general pipeline.
168        let post_router_for_reader = Arc::clone(&post_router);
169        let reader_task = tokio::spawn(async move {
170            while let Some(msg) = raw_rx.recv().await {
171                match msg {
172                    Message::Text(txt) => {
173                        tracing::debug!("Received WS text: {}", txt);
174                        match serde_json::from_str::<HyperliquidWsMessage>(&txt) {
175                            Ok(hl_msg) => {
176                                if let HyperliquidWsMessage::Post { data } = &hl_msg {
177                                    // Route the correlated response
178                                    post_router_for_reader.complete(data.clone()).await;
179                                }
180                                if let Err(e) = tx_inbound.send(hl_msg).await {
181                                    tracing::error!("Failed to send decoded message: {}", e);
182                                    break;
183                                }
184                            }
185                            Err(e) => {
186                                tracing::error!(
187                                    "Failed to decode Hyperliquid message: {} | text: {}",
188                                    e,
189                                    txt
190                                );
191                            }
192                        }
193                    }
194                    Message::Binary(data) => {
195                        tracing::debug!("Received binary message ({} bytes), ignoring", data.len());
196                    }
197                    Message::Ping(data) => {
198                        tracing::debug!("Received ping frame ({} bytes)", data.len());
199                    }
200                    Message::Pong(data) => {
201                        tracing::debug!("Received pong frame ({} bytes)", data.len());
202                    }
203                    Message::Close(close_frame) => {
204                        tracing::info!("Received close frame: {:?}", close_frame);
205                        break;
206                    }
207                    Message::Frame(_) => tracing::warn!("Received raw frame (unexpected)"),
208                }
209            }
210            tracing::info!("Hyperliquid WebSocket reader finished");
211        });
212
213        // Spawn task to handle outbound messages
214        let client_for_sender = Arc::clone(&client);
215        tokio::spawn(async move {
216            while let Some(req) = rx_outbound.recv().await {
217                let json = match serde_json::to_string(&req) {
218                    Ok(json) => json,
219                    Err(e) => {
220                        tracing::error!("Failed to serialize WS request: {}", e);
221                        continue;
222                    }
223                };
224                tracing::debug!("Sending WS message: {}", json);
225                if let Err(e) = client_for_sender.send_text(json, None).await {
226                    tracing::error!("Failed to send WS message: {}", e);
227                    break;
228                }
229            }
230            tracing::info!("WebSocket sender task finished");
231        });
232
233        // Create send function for batcher using a proper async closure
234        let ws_sender_for_batcher = ws_sender.clone();
235
236        let send_fn =
237            move |req: HyperliquidWsRequest| -> BoxFuture<'static, HyperliquidResult<()>> {
238                let sender = ws_sender_for_batcher.clone();
239                Box::pin(async move { sender.send(req).await })
240            };
241
242        let post_batcher = PostBatcher::new(send_fn);
243
244        let hl_client = Self {
245            inner: client,
246            rx_inbound,
247            sent_subscriptions: HashSet::new(),
248            _reader_task: reader_task,
249            post_router,
250            post_ids,
251            ws_sender,
252            post_batcher,
253        };
254
255        Ok(hl_client)
256    }
257
258    /// Low-level method to send a Hyperliquid WebSocket request.
259    pub async fn ws_send(&self, request: &HyperliquidWsRequest) -> anyhow::Result<()> {
260        let json = serde_json::to_string(request)?;
261        tracing::debug!("Sending WS message: {}", json);
262        self.inner
263            .send_text(json, None)
264            .await
265            .map_err(|e| anyhow::anyhow!(e))
266    }
267
268    /// Low-level method to send a request only once (dedup by JSON serialization).
269    pub async fn ws_send_once(&mut self, request: &HyperliquidWsRequest) -> anyhow::Result<()> {
270        let json = serde_json::to_string(request)?;
271        if self.sent_subscriptions.contains(&json) {
272            tracing::debug!("Skipping duplicate request: {}", json);
273            return Ok(());
274        }
275
276        tracing::debug!("Sending WS message: {}", json);
277        self.inner
278            .send_text(json.clone(), None)
279            .await
280            .map_err(|e| anyhow::anyhow!(e))?;
281
282        self.sent_subscriptions.insert(json);
283        Ok(())
284    }
285
286    /// Low-level method to subscribe to a specific channel.
287    pub async fn ws_subscribe(&mut self, subscription: SubscriptionRequest) -> anyhow::Result<()> {
288        let request = HyperliquidWsRequest::Subscribe { subscription };
289        self.ws_send_once(&request).await
290    }
291
292    /// Low-level method to unsubscribe from a specific channel.
293    pub async fn ws_unsubscribe(
294        &mut self,
295        subscription: SubscriptionRequest,
296    ) -> anyhow::Result<()> {
297        let request = HyperliquidWsRequest::Unsubscribe { subscription };
298        self.ws_send(&request).await
299    }
300
301    /// Get the next event from the WebSocket stream.
302    /// Returns None when the connection is closed or the receiver is exhausted.
303    pub async fn ws_next_event(&mut self) -> Option<HyperliquidWsMessage> {
304        self.rx_inbound.recv().await
305    }
306
307    /// Returns true if the WebSocket connection is active.
308    pub fn is_active(&self) -> bool {
309        self.inner.is_active()
310    }
311
312    /// Returns true if the WebSocket is reconnecting.
313    pub fn is_reconnecting(&self) -> bool {
314        self.inner.is_reconnecting()
315    }
316
317    /// Returns true if the WebSocket is disconnecting.
318    pub fn is_disconnecting(&self) -> bool {
319        self.inner.is_disconnecting()
320    }
321
322    /// Returns true if the WebSocket is closed.
323    pub fn is_closed(&self) -> bool {
324        self.inner.is_closed()
325    }
326
327    /// Disconnect the WebSocket client.
328    pub async fn ws_disconnect(&mut self) -> anyhow::Result<()> {
329        self.inner.disconnect().await;
330        Ok(())
331    }
332
333    /// Convenience: enqueue a post on a specific lane.
334    async fn enqueue_post(
335        &self,
336        id: u64,
337        request: PostRequest,
338        lane: PostLane,
339    ) -> HyperliquidResult<()> {
340        self.post_batcher
341            .enqueue(ScheduledPost { id, request, lane })
342            .await
343    }
344
345    /// Core: send an Info post and await response with timeout.
346    pub async fn post_info_raw(
347        &self,
348        payload: serde_json::Value,
349        timeout: Duration,
350    ) -> HyperliquidResult<PostResponsePayload> {
351        let id = self.post_ids.next();
352        let rx = self.post_router.register(id).await?;
353        self.enqueue_post(id, PostRequest::Info { payload }, PostLane::Normal)
354            .await?;
355        let resp = self.post_router.await_with_timeout(id, rx, timeout).await?;
356        Ok(resp.response)
357    }
358
359    /// Core: send an Action post and await response with timeout.
360    pub async fn post_action_raw(
361        &self,
362        action: ActionPayload,
363        timeout: Duration,
364    ) -> HyperliquidResult<PostResponsePayload> {
365        let id = self.post_ids.next();
366        let rx = self.post_router.register(id).await?;
367        let lane = lane_for_action(&action.action);
368        self.enqueue_post(id, PostRequest::Action { payload: action }, lane)
369            .await?;
370        let resp = self.post_router.await_with_timeout(id, rx, timeout).await?;
371        Ok(resp.response)
372    }
373
374    /// Get l2Book via WS post and parse using shared REST model.
375    pub async fn info_l2_book(
376        &self,
377        coin: &str,
378        timeout: Duration,
379    ) -> HyperliquidResult<crate::http::models::HyperliquidL2Book> {
380        let payload = match self
381            .post_info_raw(serde_json::json!({"type":"l2Book","coin":coin}), timeout)
382            .await?
383        {
384            PostResponsePayload::Info { payload } => payload,
385            PostResponsePayload::Error { payload } => return Err(Error::exchange(payload)),
386            PostResponsePayload::Action { .. } => {
387                return Err(Error::decode("expected info payload, was action"));
388            }
389        };
390        serde_json::from_value(payload).map_err(Error::Serde)
391    }
392}
393
394/// High-level Hyperliquid WebSocket client that provides standardized domain methods.
395///
396/// This client uses Arc<RwLock<>> for internal state to support Clone and safe sharing
397/// across async tasks, following the same pattern as other exchange adapters (OKX, Bitmex, Bybit).
398#[derive(Clone, Debug)]
399#[cfg_attr(
400    feature = "python",
401    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
402)]
403pub struct HyperliquidWebSocketClient {
404    inner: Arc<RwLock<Option<HyperliquidWebSocketInnerClient>>>,
405    url: String,
406    instruments: Arc<DashMap<InstrumentId, InstrumentAny>>,
407    instruments_by_symbol: Arc<DashMap<Ustr, InstrumentId>>,
408}
409
410impl HyperliquidWebSocketClient {
411    /// Creates a new Hyperliquid WebSocket client without connecting.
412    /// The connection will be established when `ensure_connected()` is called.
413    pub fn new(url: String) -> Self {
414        Self {
415            inner: Arc::new(RwLock::new(None)),
416            url,
417            instruments: Arc::new(DashMap::new()),
418            instruments_by_symbol: Arc::new(DashMap::new()),
419        }
420    }
421
422    /// Adds an instrument to the cache for parsing WebSocket messages.
423    pub fn add_instrument(&self, instrument: InstrumentAny) {
424        // Insert instrument into primary cache
425        let instrument_id = instrument.id();
426        self.instruments.insert(instrument_id, instrument);
427
428        // Extract coin prefix (e.g., "BTC" from "BTC-PERP") and index for fast lookup
429        let symbol = instrument_id.symbol.as_str();
430        if let Some(coin) = symbol.split('-').next() {
431            self.instruments_by_symbol
432                .insert(Ustr::from(coin), instrument_id);
433        }
434    }
435
436    /// Gets an instrument from the cache by ID.
437    pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
438        self.instruments.get(id).map(|e| e.value().clone())
439    }
440
441    /// Gets an instrument from the cache by symbol.
442    pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
443        // Fast path: lookup instrument id by coin prefix, then fetch instrument by id.
444        if let Some(id_entry) = self.instruments_by_symbol.get(symbol) {
445            let instrument_id = *id_entry.value();
446            if let Some(inst_entry) = self.instruments.get(&instrument_id) {
447                return Some(inst_entry.value().clone());
448            }
449        }
450
451        // Fallback: (should be rare) scan full instruments map to find exact symbol match
452        self.instruments
453            .iter()
454            .find(|e| e.key().symbol == (*symbol).into())
455            .map(|e| e.value().clone())
456    }
457
458    /// Creates a new Hyperliquid WebSocket client and establishes connection.
459    pub async fn connect(url: &str) -> anyhow::Result<Self> {
460        let inner_client = HyperliquidWebSocketInnerClient::connect(url).await?;
461        Ok(Self {
462            inner: Arc::new(RwLock::new(Some(inner_client))),
463            url: url.to_string(),
464            instruments: Arc::new(DashMap::new()),
465            instruments_by_symbol: Arc::new(DashMap::new()),
466        })
467    }
468
469    /// Establishes the WebSocket connection if not already connected.
470    pub async fn ensure_connected(&self) -> anyhow::Result<()> {
471        let mut inner = self.inner.write().await;
472        if inner.is_none() {
473            let inner_client = HyperliquidWebSocketInnerClient::connect(&self.url).await?;
474            *inner = Some(inner_client);
475        }
476        Ok(())
477    }
478
479    /// Returns true if the WebSocket is connected.
480    pub async fn is_connected(&self) -> bool {
481        let inner = self.inner.read().await;
482        inner.is_some()
483    }
484
485    /// Returns the URL of this WebSocket client.
486    pub fn url(&self) -> &str {
487        &self.url
488    }
489
490    /// Subscribe to order updates for a specific user address.
491    ///
492    /// Ensures connection is established before subscribing.
493    pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
494        self.ensure_connected().await?;
495        let subscription = SubscriptionRequest::OrderUpdates {
496            user: user.to_string(),
497        };
498        let mut inner = self.inner.write().await;
499        inner
500            .as_mut()
501            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
502            .ws_subscribe(subscription)
503            .await
504    }
505
506    /// Subscribe to user events (fills, funding, liquidations) for a specific user address.
507    ///
508    /// Ensures connection is established before subscribing.
509    pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
510        self.ensure_connected().await?;
511        let subscription = SubscriptionRequest::UserEvents {
512            user: user.to_string(),
513        };
514        let mut inner = self.inner.write().await;
515        inner
516            .as_mut()
517            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
518            .ws_subscribe(subscription)
519            .await
520    }
521
522    /// Subscribe to all user channels (order updates + user events) for convenience.
523    pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
524        self.subscribe_order_updates(user).await?;
525        self.subscribe_user_events(user).await?;
526        Ok(())
527    }
528
529    /// Subscribe to trades for a specific coin.
530    ///
531    /// Ensures connection is established before subscribing.
532    pub async fn subscribe_trades(&self, coin: Ustr) -> anyhow::Result<()> {
533        self.ensure_connected().await?;
534        let subscription = SubscriptionRequest::Trades { coin };
535        let mut inner = self.inner.write().await;
536        inner
537            .as_mut()
538            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
539            .ws_subscribe(subscription)
540            .await
541    }
542
543    /// Unsubscribe from trades for a specific coin.
544    ///
545    /// Ensures connection is established before unsubscribing.
546    pub async fn unsubscribe_trades(&self, coin: Ustr) -> anyhow::Result<()> {
547        self.ensure_connected().await?;
548        let subscription = SubscriptionRequest::Trades { coin };
549        let mut inner = self.inner.write().await;
550        inner
551            .as_mut()
552            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
553            .ws_unsubscribe(subscription)
554            .await
555    }
556
557    /// Subscribe to L2 order book for a specific coin.
558    ///
559    /// Ensures connection is established before subscribing.
560    pub async fn subscribe_book(&self, coin: Ustr) -> anyhow::Result<()> {
561        self.ensure_connected().await?;
562        let subscription = SubscriptionRequest::L2Book {
563            coin,
564            n_sig_figs: None,
565            mantissa: None,
566        };
567        let mut inner = self.inner.write().await;
568        inner
569            .as_mut()
570            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
571            .ws_subscribe(subscription)
572            .await
573    }
574
575    /// Unsubscribe from L2 order book for a specific coin.
576    ///
577    /// Ensures connection is established before unsubscribing.
578    pub async fn unsubscribe_book(&self, coin: Ustr) -> anyhow::Result<()> {
579        self.ensure_connected().await?;
580        let subscription = SubscriptionRequest::L2Book {
581            coin,
582            n_sig_figs: None,
583            mantissa: None,
584        };
585        let mut inner = self.inner.write().await;
586        inner
587            .as_mut()
588            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
589            .ws_unsubscribe(subscription)
590            .await
591    }
592
593    /// Subscribe to BBO (best bid/offer) for a specific coin.
594    ///
595    /// Ensures connection is established before subscribing.
596    pub async fn subscribe_bbo(&self, coin: Ustr) -> anyhow::Result<()> {
597        self.ensure_connected().await?;
598        tracing::info!("Subscribing to BBO for coin: {}", coin);
599        let subscription = SubscriptionRequest::Bbo { coin };
600        let mut inner = self.inner.write().await;
601        inner
602            .as_mut()
603            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
604            .ws_subscribe(subscription)
605            .await
606    }
607
608    /// Unsubscribe from BBO (best bid/offer) for a specific coin.
609    ///
610    /// Ensures connection is established before unsubscribing.
611    pub async fn unsubscribe_bbo(&self, coin: Ustr) -> anyhow::Result<()> {
612        self.ensure_connected().await?;
613        let subscription = SubscriptionRequest::Bbo { coin };
614        let mut inner = self.inner.write().await;
615        inner
616            .as_mut()
617            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
618            .ws_unsubscribe(subscription)
619            .await
620    }
621
622    /// Subscribe to candlestick data for a specific coin and interval.
623    ///
624    /// Ensures connection is established before subscribing.
625    pub async fn subscribe_candle(&self, coin: Ustr, interval: String) -> anyhow::Result<()> {
626        self.ensure_connected().await?;
627        let subscription = SubscriptionRequest::Candle { coin, interval };
628        let mut inner = self.inner.write().await;
629        inner
630            .as_mut()
631            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
632            .ws_subscribe(subscription)
633            .await
634    }
635
636    /// Unsubscribe from candlestick data for a specific coin and interval.
637    ///
638    /// Ensures connection is established before unsubscribing.
639    pub async fn unsubscribe_candle(&self, coin: Ustr, interval: String) -> anyhow::Result<()> {
640        self.ensure_connected().await?;
641        let subscription = SubscriptionRequest::Candle { coin, interval };
642        let mut inner = self.inner.write().await;
643        inner
644            .as_mut()
645            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
646            .ws_unsubscribe(subscription)
647            .await
648    }
649
650    /// Get the next event from the WebSocket stream.
651    /// Returns None when the connection is closed or the receiver is exhausted.
652    pub async fn next_event(&self) -> Option<HyperliquidWsMessage> {
653        let mut inner = self.inner.write().await;
654        if let Some(ref mut client) = *inner {
655            client.ws_next_event().await
656        } else {
657            None
658        }
659    }
660
661    /// Returns true if the WebSocket connection is active.
662    pub async fn is_active(&self) -> bool {
663        let inner = self.inner.read().await;
664        inner.as_ref().is_some_and(|client| client.is_active())
665    }
666
667    /// Returns true if the WebSocket is reconnecting.
668    pub async fn is_reconnecting(&self) -> bool {
669        let inner = self.inner.read().await;
670        inner
671            .as_ref()
672            .is_some_and(|client| client.is_reconnecting())
673    }
674
675    /// Returns true if the WebSocket is disconnecting.
676    pub async fn is_disconnecting(&self) -> bool {
677        let inner = self.inner.read().await;
678        inner
679            .as_ref()
680            .is_some_and(|client| client.is_disconnecting())
681    }
682
683    /// Returns true if the WebSocket is closed.
684    pub async fn is_closed(&self) -> bool {
685        let inner = self.inner.read().await;
686        inner.as_ref().is_none_or(|client| client.is_closed())
687    }
688
689    /// Disconnect the WebSocket client.
690    pub async fn disconnect(&self) -> anyhow::Result<()> {
691        let mut inner = self.inner.write().await;
692        if let Some(ref mut client) = *inner {
693            client.ws_disconnect().await
694        } else {
695            Ok(())
696        }
697    }
698
699    /// Escape hatch: send raw requests for tests/power users.
700    ///
701    /// Ensures connection is established before sending.
702    pub async fn send_raw(&self, request: &HyperliquidWsRequest) -> anyhow::Result<()> {
703        self.ensure_connected().await?;
704        let mut inner = self.inner.write().await;
705        inner
706            .as_mut()
707            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
708            .ws_send(request)
709            .await
710    }
711
712    /// High-level: call info l2Book (WS post)
713    ///
714    /// Ensures connection is established before making the request.
715    pub async fn info_l2_book(
716        &self,
717        coin: &str,
718        timeout: Duration,
719    ) -> HyperliquidResult<crate::http::models::HyperliquidL2Book> {
720        self.ensure_connected().await.map_err(|e| Error::Http {
721            status: 500,
722            message: e.to_string(),
723        })?;
724        let mut inner = self.inner.write().await;
725        inner
726            .as_mut()
727            .ok_or_else(|| Error::Http {
728                status: 500,
729                message: "Client not connected".to_string(),
730            })?
731            .info_l2_book(coin, timeout)
732            .await
733    }
734
735    /// High-level: fire arbitrary info (WS post) returning raw payload.
736    ///
737    /// Ensures connection is established before making the request.
738    pub async fn post_info_raw(
739        &self,
740        payload: serde_json::Value,
741        timeout: Duration,
742    ) -> HyperliquidResult<PostResponsePayload> {
743        self.ensure_connected().await.map_err(|e| Error::Http {
744            status: 500,
745            message: e.to_string(),
746        })?;
747        let mut inner = self.inner.write().await;
748        inner
749            .as_mut()
750            .ok_or_else(|| Error::Http {
751                status: 500,
752                message: "Client not connected".to_string(),
753            })?
754            .post_info_raw(payload, timeout)
755            .await
756    }
757
758    /// High-level: fire action (already signed ActionPayload)
759    ///
760    /// Ensures connection is established before making the request.
761    pub async fn post_action_raw(
762        &self,
763        action: ActionPayload,
764        timeout: Duration,
765    ) -> HyperliquidResult<PostResponsePayload> {
766        self.ensure_connected().await.map_err(|e| Error::Http {
767            status: 500,
768            message: e.to_string(),
769        })?;
770        let mut inner = self.inner.write().await;
771        inner
772            .as_mut()
773            .ok_or_else(|| Error::Http {
774                status: 500,
775                message: "Client not connected".to_string(),
776            })?
777            .post_action_raw(action, timeout)
778            .await
779    }
780
781    /// Creates a stream of execution messages (order updates and fills).
782    ///
783    /// This method spawns a background task that listens for WebSocket messages
784    /// and processes OrderUpdates and UserEvents (fills) into ExecutionReports.
785    /// The execution reports are sent through the returned stream for processing
786    /// by the execution client.
787    ///
788    /// # Arguments
789    ///
790    /// * `account_id` - Account ID for report generation
791    /// * `user_address` - User address to subscribe to order updates and user events
792    ///
793    /// # Returns
794    ///
795    /// A stream of `NautilusWsMessage` containing execution reports
796    ///
797    /// # Errors
798    ///
799    /// Returns an error if subscription fails or connection cannot be established
800    pub async fn stream_execution_messages(
801        &self,
802        account_id: AccountId,
803        user_address: String,
804    ) -> anyhow::Result<impl Stream<Item = NautilusWsMessage>> {
805        // Ensure connection
806        self.ensure_connected().await?;
807
808        // Subscribe to order updates and user events
809        self.subscribe_order_updates(&user_address).await?;
810        self.subscribe_user_events(&user_address).await?;
811
812        let client = self.clone();
813        let (tx, rx) = mpsc::unbounded_channel();
814
815        // Spawn background task to process WebSocket messages
816        tokio::spawn(async move {
817            let clock = get_atomic_clock_realtime();
818
819            loop {
820                let event = client.next_event().await;
821
822                match event {
823                    Some(msg) => {
824                        match &msg {
825                            HyperliquidWsMessage::OrderUpdates { data } => {
826                                let mut exec_reports = Vec::new();
827
828                                // Process each order update in the array
829                                for order_update in data {
830                                    if let Some(instrument) =
831                                        client.get_instrument_by_symbol(&order_update.order.coin)
832                                    {
833                                        let ts_init = clock.get_time_ns();
834
835                                        match parse_ws_order_status_report(
836                                            order_update,
837                                            &instrument,
838                                            account_id,
839                                            ts_init,
840                                        ) {
841                                            Ok(report) => {
842                                                exec_reports.push(ExecutionReport::Order(report));
843                                            }
844                                            Err(e) => {
845                                                tracing::error!(
846                                                    "Error parsing order update: {}",
847                                                    e
848                                                );
849                                            }
850                                        }
851                                    } else {
852                                        tracing::warn!(
853                                            "No instrument found for symbol: {}",
854                                            order_update.order.coin
855                                        );
856                                    }
857                                }
858
859                                // Send reports if any
860                                if !exec_reports.is_empty()
861                                    && let Err(e) =
862                                        tx.send(NautilusWsMessage::ExecutionReports(exec_reports))
863                                {
864                                    tracing::error!("Failed to send execution reports: {}", e);
865                                    break;
866                                }
867                            }
868                            HyperliquidWsMessage::UserEvents { data } => {
869                                use crate::websocket::messages::WsUserEventData;
870
871                                let ts_init = clock.get_time_ns();
872
873                                match data {
874                                    WsUserEventData::Fills { fills } => {
875                                        let mut exec_reports = Vec::new();
876
877                                        // Process each fill
878                                        for fill in fills {
879                                            if let Some(instrument) =
880                                                client.get_instrument_by_symbol(&fill.coin)
881                                            {
882                                                match parse_ws_fill_report(
883                                                    fill,
884                                                    &instrument,
885                                                    account_id,
886                                                    ts_init,
887                                                ) {
888                                                    Ok(report) => {
889                                                        exec_reports
890                                                            .push(ExecutionReport::Fill(report));
891                                                    }
892                                                    Err(e) => {
893                                                        tracing::error!(
894                                                            "Error parsing fill: {}",
895                                                            e
896                                                        );
897                                                    }
898                                                }
899                                            } else {
900                                                tracing::warn!(
901                                                    "No instrument found for symbol: {}",
902                                                    fill.coin
903                                                );
904                                            }
905                                        }
906
907                                        // Send reports if any
908                                        if !exec_reports.is_empty()
909                                            && let Err(e) = tx.send(
910                                                NautilusWsMessage::ExecutionReports(exec_reports),
911                                            )
912                                        {
913                                            tracing::error!("Failed to send fill reports: {}", e);
914                                            break;
915                                        }
916                                    }
917                                    _ => {
918                                        // Other user events (funding, liquidation, etc.) not handled yet
919                                    }
920                                }
921                            }
922                            _ => {
923                                // Ignore other message types in execution stream
924                            }
925                        }
926                    }
927                    None => {
928                        // Connection closed
929                        break;
930                    }
931                }
932            }
933        });
934
935        // Return the stream
936        Ok(async_stream::stream! {
937            let mut rx = rx;
938            while let Some(msg) = rx.recv().await {
939                yield msg;
940            }
941        })
942    }
943}
944
945// Python bindings
946#[cfg(feature = "python")]
947#[pyo3::pymethods]
948impl HyperliquidWebSocketClient {
949    #[new]
950    #[pyo3(signature = (url))]
951    fn py_new(url: String) -> PyResult<Self> {
952        Ok(Self::new(url))
953    }
954
955    #[getter]
956    #[pyo3(name = "url")]
957    #[must_use]
958    pub fn py_url(&self) -> String {
959        self.url().to_string()
960    }
961
962    #[pyo3(name = "is_active")]
963    fn py_is_active<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
964        let client = self.clone();
965        pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(client.is_active().await) })
966    }
967
968    #[pyo3(name = "is_closed")]
969    fn py_is_closed<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
970        let client = self.clone();
971        pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(client.is_closed().await) })
972    }
973
974    #[pyo3(name = "connect")]
975    fn py_connect<'py>(
976        &self,
977        py: Python<'py>,
978        instruments: Vec<Py<PyAny>>,
979        callback: Py<PyAny>,
980    ) -> PyResult<Bound<'py, PyAny>> {
981        // Parse instruments from Python objects and store in cache
982        for inst in instruments {
983            let inst_any = pyobject_to_instrument_any(py, inst)?;
984            self.add_instrument(inst_any);
985        }
986
987        let client = self.clone();
988
989        pyo3_async_runtimes::tokio::future_into_py(py, async move {
990            client.ensure_connected().await.map_err(to_pyruntime_err)?;
991
992            // Spawn background task to handle incoming messages
993            tokio::spawn(async move {
994                let clock = get_atomic_clock_realtime();
995
996                loop {
997                    let event = client.next_event().await;
998
999                    match event {
1000                        Some(msg) => {
1001                            tracing::debug!("Received WebSocket message: {:?}", msg);
1002
1003                            // Parse and send to Python callback
1004                            match msg {
1005                                HyperliquidWsMessage::Trades { data } => {
1006                                    for trade in data {
1007                                        if let Some(instrument) =
1008                                            client.get_instrument_by_symbol(&trade.coin)
1009                                        {
1010                                            let ts_init = clock.get_time_ns();
1011                                            match parse_ws_trade_tick(&trade, &instrument, ts_init)
1012                                            {
1013                                                Ok(tick) => {
1014                                                    Python::attach(|py| {
1015                                                        let py_obj = data_to_pycapsule(
1016                                                            py,
1017                                                            Data::Trade(tick),
1018                                                        );
1019                                                        if let Err(e) =
1020                                                            callback.bind(py).call1((py_obj,))
1021                                                        {
1022                                                            tracing::error!(
1023                                                                "Error calling Python callback: {}",
1024                                                                e
1025                                                            );
1026                                                        }
1027                                                    });
1028                                                }
1029                                                Err(e) => {
1030                                                    tracing::error!(
1031                                                        "Error parsing trade tick: {}",
1032                                                        e
1033                                                    );
1034                                                }
1035                                            }
1036                                        } else {
1037                                            tracing::warn!(
1038                                                "No instrument found for symbol: {}",
1039                                                trade.coin
1040                                            );
1041                                        }
1042                                    }
1043                                }
1044                                HyperliquidWsMessage::L2Book { data } => {
1045                                    if let Some(instrument) =
1046                                        client.get_instrument_by_symbol(&data.coin)
1047                                    {
1048                                        let ts_init = clock.get_time_ns();
1049                                        match parse_ws_order_book_deltas(
1050                                            &data,
1051                                            &instrument,
1052                                            ts_init,
1053                                        ) {
1054                                            Ok(deltas) => {
1055                                                Python::attach(|py| {
1056                                                    let py_obj = data_to_pycapsule(
1057                                                        py,
1058                                                        Data::Deltas(OrderBookDeltas_API::new(
1059                                                            deltas,
1060                                                        )),
1061                                                    );
1062                                                    if let Err(e) =
1063                                                        callback.bind(py).call1((py_obj,))
1064                                                    {
1065                                                        tracing::error!(
1066                                                            "Error calling Python callback: {}",
1067                                                            e
1068                                                        );
1069                                                    }
1070                                                });
1071                                            }
1072                                            Err(e) => {
1073                                                tracing::error!(
1074                                                    "Error parsing order book deltas: {}",
1075                                                    e
1076                                                );
1077                                            }
1078                                        }
1079                                    } else {
1080                                        tracing::warn!(
1081                                            "No instrument found for symbol: {}",
1082                                            data.coin
1083                                        );
1084                                    }
1085                                }
1086                                HyperliquidWsMessage::Bbo { data } => {
1087                                    if let Some(instrument) =
1088                                        client.get_instrument_by_symbol(&data.coin)
1089                                    {
1090                                        let ts_init = clock.get_time_ns();
1091                                        match parse_ws_quote_tick(&data, &instrument, ts_init) {
1092                                            Ok(quote) => {
1093                                                Python::attach(|py| {
1094                                                    let py_obj =
1095                                                        data_to_pycapsule(py, Data::Quote(quote));
1096                                                    if let Err(e) =
1097                                                        callback.bind(py).call1((py_obj,))
1098                                                    {
1099                                                        tracing::error!(
1100                                                            "Error calling Python callback: {}",
1101                                                            e
1102                                                        );
1103                                                    }
1104                                                });
1105                                            }
1106                                            Err(e) => {
1107                                                tracing::error!("Error parsing quote tick: {}", e);
1108                                            }
1109                                        }
1110                                    } else {
1111                                        tracing::warn!(
1112                                            "No instrument found for symbol: {}",
1113                                            data.coin
1114                                        );
1115                                    }
1116                                }
1117                                HyperliquidWsMessage::Candle { data } => {
1118                                    if let Some(instrument) =
1119                                        client.get_instrument_by_symbol(&data.s)
1120                                    {
1121                                        let ts_init = clock.get_time_ns();
1122                                        // Create a bar type from the instrument and interval
1123                                        // The actual bar type construction should be done based on the interval
1124                                        let bar_type_str =
1125                                            format!("{}-{}-LAST-EXTERNAL", instrument.id(), data.i);
1126                                        match bar_type_str.parse::<BarType>() {
1127                                            Ok(bar_type) => {
1128                                                match parse_ws_candle(
1129                                                    &data,
1130                                                    &instrument,
1131                                                    &bar_type,
1132                                                    ts_init,
1133                                                ) {
1134                                                    Ok(bar) => {
1135                                                        Python::attach(|py| {
1136                                                            let py_obj = data_to_pycapsule(
1137                                                                py,
1138                                                                Data::Bar(bar),
1139                                                            );
1140                                                            if let Err(e) =
1141                                                                callback.bind(py).call1((py_obj,))
1142                                                            {
1143                                                                tracing::error!(
1144                                                                    "Error calling Python callback: {}",
1145                                                                    e
1146                                                                );
1147                                                            }
1148                                                        });
1149                                                    }
1150                                                    Err(e) => {
1151                                                        tracing::error!(
1152                                                            "Error parsing candle: {}",
1153                                                            e
1154                                                        );
1155                                                    }
1156                                                }
1157                                            }
1158                                            Err(e) => {
1159                                                tracing::error!("Error creating bar type: {}", e);
1160                                            }
1161                                        }
1162                                    } else {
1163                                        tracing::warn!(
1164                                            "No instrument found for symbol: {}",
1165                                            data.s
1166                                        );
1167                                    }
1168                                }
1169                                HyperliquidWsMessage::OrderUpdates { data } => {
1170                                    // Process each order update in the array
1171                                    for order_update in data {
1172                                        if let Some(instrument) = client
1173                                            .get_instrument_by_symbol(&order_update.order.coin)
1174                                        {
1175                                            let ts_init = clock.get_time_ns();
1176                                            // We need an account_id - this should come from the client config
1177                                            // For now, use a default account ID
1178                                            let account_id =
1179                                                nautilus_model::identifiers::AccountId::new(
1180                                                    "HYPERLIQUID-001",
1181                                                );
1182
1183                                            match parse_ws_order_status_report(
1184                                                &order_update,
1185                                                &instrument,
1186                                                account_id,
1187                                                ts_init,
1188                                            ) {
1189                                                Ok(report) => {
1190                                                    // Note: Execution reports should be handled via
1191                                                    // stream_execution_messages() for execution clients,
1192                                                    // not through data callbacks
1193                                                    tracing::info!(
1194                                                        "Parsed order status report: order_id={}, status={:?}",
1195                                                        report.venue_order_id,
1196                                                        report.order_status
1197                                                    );
1198                                                }
1199                                                Err(e) => {
1200                                                    tracing::error!(
1201                                                        "Error parsing order update: {}",
1202                                                        e
1203                                                    );
1204                                                }
1205                                            }
1206                                        } else {
1207                                            tracing::warn!(
1208                                                "No instrument found for symbol: {}",
1209                                                order_update.order.coin
1210                                            );
1211                                        }
1212                                    }
1213                                }
1214                                HyperliquidWsMessage::UserEvents { data } => {
1215                                    use crate::websocket::messages::WsUserEventData;
1216
1217                                    // We need an account_id - this should come from the client config
1218                                    let account_id = nautilus_model::identifiers::AccountId::new(
1219                                        "HYPERLIQUID-001",
1220                                    );
1221                                    let ts_init = clock.get_time_ns();
1222
1223                                    match data {
1224                                        WsUserEventData::Fills { fills } => {
1225                                            // Process each fill
1226                                            for fill in fills {
1227                                                if let Some(instrument) =
1228                                                    client.get_instrument_by_symbol(&fill.coin)
1229                                                {
1230                                                    match parse_ws_fill_report(
1231                                                        &fill,
1232                                                        &instrument,
1233                                                        account_id,
1234                                                        ts_init,
1235                                                    ) {
1236                                                        Ok(report) => {
1237                                                            // Note: Execution reports should be handled via
1238                                                            // stream_execution_messages() for execution clients,
1239                                                            // not through data callbacks
1240                                                            tracing::info!(
1241                                                                "Parsed fill report: trade_id={}, side={:?}, qty={}, price={}",
1242                                                                report.trade_id,
1243                                                                report.order_side,
1244                                                                report.last_qty,
1245                                                                report.last_px
1246                                                            );
1247                                                        }
1248                                                        Err(e) => {
1249                                                            tracing::error!(
1250                                                                "Error parsing fill: {}",
1251                                                                e
1252                                                            );
1253                                                        }
1254                                                    }
1255                                                } else {
1256                                                    tracing::warn!(
1257                                                        "No instrument found for symbol: {}",
1258                                                        fill.coin
1259                                                    );
1260                                                }
1261                                            }
1262                                        }
1263                                        WsUserEventData::Funding { funding } => {
1264                                            tracing::debug!(
1265                                                "Received funding update: {:?}",
1266                                                funding
1267                                            );
1268                                            // Funding updates would need to be converted to appropriate Nautilus events
1269                                            // This could be implemented if funding rate updates are needed
1270                                        }
1271                                        WsUserEventData::Liquidation { liquidation } => {
1272                                            tracing::warn!(
1273                                                "Received liquidation event: {:?}",
1274                                                liquidation
1275                                            );
1276                                            // Liquidation events would need special handling
1277                                            // This could be implemented based on requirements
1278                                        }
1279                                        WsUserEventData::NonUserCancel { non_user_cancel } => {
1280                                            tracing::info!(
1281                                                "Received non-user cancel events: {:?}",
1282                                                non_user_cancel
1283                                            );
1284                                            // These are system-initiated cancels (e.g., post-only rejected)
1285                                            // Could be converted to order status updates if needed
1286                                        }
1287                                        WsUserEventData::TriggerActivated { trigger_activated } => {
1288                                            tracing::debug!(
1289                                                "Trigger order activated: {:?}",
1290                                                trigger_activated
1291                                            );
1292                                            // Trigger activation events indicate a conditional order moved to active
1293                                            // Could be converted to order status updates if needed
1294                                        }
1295                                        WsUserEventData::TriggerTriggered { trigger_triggered } => {
1296                                            tracing::debug!(
1297                                                "Trigger order triggered: {:?}",
1298                                                trigger_triggered
1299                                            );
1300                                            // Trigger execution events indicate a conditional order was triggered
1301                                            // Could be converted to order status updates if needed
1302                                        }
1303                                    }
1304                                }
1305                                _ => {
1306                                    tracing::debug!("Unhandled message type: {:?}", msg);
1307                                }
1308                            }
1309                        }
1310                        None => {
1311                            tracing::info!("WebSocket connection closed");
1312                            break;
1313                        }
1314                    }
1315                }
1316            });
1317
1318            Ok(())
1319        })
1320    }
1321
1322    #[pyo3(name = "wait_until_active")]
1323    fn py_wait_until_active<'py>(
1324        &self,
1325        py: Python<'py>,
1326        timeout_secs: f64,
1327    ) -> PyResult<Bound<'py, PyAny>> {
1328        let client = self.clone();
1329
1330        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1331            let start = std::time::Instant::now();
1332            loop {
1333                if client.is_active().await {
1334                    return Ok(());
1335                }
1336
1337                if start.elapsed().as_secs_f64() >= timeout_secs {
1338                    return Err(PyRuntimeError::new_err(format!(
1339                        "WebSocket connection did not become active within {} seconds",
1340                        timeout_secs
1341                    )));
1342                }
1343
1344                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1345            }
1346        })
1347    }
1348
1349    #[pyo3(name = "close")]
1350    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
1351        let client = self.clone();
1352
1353        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1354            if let Err(e) = client.disconnect().await {
1355                tracing::error!("Error on close: {e}");
1356            }
1357            Ok(())
1358        })
1359    }
1360
1361    #[pyo3(name = "subscribe_trades")]
1362    fn py_subscribe_trades<'py>(
1363        &self,
1364        py: Python<'py>,
1365        instrument_id: InstrumentId,
1366    ) -> PyResult<Bound<'py, PyAny>> {
1367        let client = self.clone();
1368        let coin = Ustr::from(instrument_id.symbol.as_str());
1369
1370        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1371            client
1372                .subscribe_trades(coin)
1373                .await
1374                .map_err(to_pyruntime_err)?;
1375            Ok(())
1376        })
1377    }
1378
1379    #[pyo3(name = "unsubscribe_trades")]
1380    fn py_unsubscribe_trades<'py>(
1381        &self,
1382        py: Python<'py>,
1383        instrument_id: InstrumentId,
1384    ) -> PyResult<Bound<'py, PyAny>> {
1385        let client = self.clone();
1386        let coin = Ustr::from(instrument_id.symbol.as_str());
1387
1388        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1389            client
1390                .unsubscribe_trades(coin)
1391                .await
1392                .map_err(to_pyruntime_err)?;
1393            Ok(())
1394        })
1395    }
1396
1397    #[pyo3(name = "subscribe_order_book_deltas")]
1398    fn py_subscribe_order_book_deltas<'py>(
1399        &self,
1400        py: Python<'py>,
1401        instrument_id: InstrumentId,
1402        _book_type: u8,
1403        _depth: u64,
1404    ) -> PyResult<Bound<'py, PyAny>> {
1405        let client = self.clone();
1406        let coin = Ustr::from(instrument_id.symbol.as_str());
1407
1408        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1409            client
1410                .subscribe_book(coin)
1411                .await
1412                .map_err(to_pyruntime_err)?;
1413            Ok(())
1414        })
1415    }
1416
1417    #[pyo3(name = "unsubscribe_order_book_deltas")]
1418    fn py_unsubscribe_order_book_deltas<'py>(
1419        &self,
1420        py: Python<'py>,
1421        instrument_id: InstrumentId,
1422    ) -> PyResult<Bound<'py, PyAny>> {
1423        let client = self.clone();
1424        let coin = Ustr::from(instrument_id.symbol.as_str());
1425
1426        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1427            client
1428                .unsubscribe_book(coin)
1429                .await
1430                .map_err(to_pyruntime_err)?;
1431            Ok(())
1432        })
1433    }
1434
1435    #[pyo3(name = "subscribe_order_book_snapshots")]
1436    fn py_subscribe_order_book_snapshots<'py>(
1437        &self,
1438        py: Python<'py>,
1439        instrument_id: InstrumentId,
1440        _book_type: u8,
1441        _depth: u64,
1442    ) -> PyResult<Bound<'py, PyAny>> {
1443        let client = self.clone();
1444        let coin = Ustr::from(instrument_id.symbol.as_str());
1445
1446        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1447            client
1448                .subscribe_book(coin)
1449                .await
1450                .map_err(to_pyruntime_err)?;
1451            Ok(())
1452        })
1453    }
1454
1455    #[pyo3(name = "subscribe_quotes")]
1456    fn py_subscribe_quotes<'py>(
1457        &self,
1458        py: Python<'py>,
1459        instrument_id: InstrumentId,
1460    ) -> PyResult<Bound<'py, PyAny>> {
1461        let client = self.clone();
1462        // Extract coin from symbol (e.g., "BTC-USD-PERP" -> "BTC")
1463        let coin_str = instrument_id
1464            .symbol
1465            .as_str()
1466            .split('-')
1467            .next()
1468            .ok_or_else(|| {
1469                PyErr::new::<pyo3::exceptions::PyValueError, _>("Invalid instrument symbol")
1470            })?;
1471        let coin = Ustr::from(coin_str);
1472
1473        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1474            client.subscribe_bbo(coin).await.map_err(to_pyruntime_err)?;
1475            Ok(())
1476        })
1477    }
1478
1479    #[pyo3(name = "unsubscribe_quotes")]
1480    fn py_unsubscribe_quotes<'py>(
1481        &self,
1482        py: Python<'py>,
1483        instrument_id: InstrumentId,
1484    ) -> PyResult<Bound<'py, PyAny>> {
1485        let client = self.clone();
1486        // Extract coin from symbol (e.g., "BTC-USD-PERP" -> "BTC")
1487        let coin_str = instrument_id
1488            .symbol
1489            .as_str()
1490            .split('-')
1491            .next()
1492            .ok_or_else(|| {
1493                PyErr::new::<pyo3::exceptions::PyValueError, _>("Invalid instrument symbol")
1494            })?;
1495        let coin = Ustr::from(coin_str);
1496
1497        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1498            client
1499                .unsubscribe_bbo(coin)
1500                .await
1501                .map_err(to_pyruntime_err)?;
1502            Ok(())
1503        })
1504    }
1505
1506    #[pyo3(name = "subscribe_bars")]
1507    fn py_subscribe_bars<'py>(
1508        &self,
1509        py: Python<'py>,
1510        bar_type: BarType,
1511    ) -> PyResult<Bound<'py, PyAny>> {
1512        let client = self.clone();
1513        let coin = Ustr::from(bar_type.instrument_id().symbol.as_str());
1514        let interval = "1m".to_string();
1515
1516        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1517            client
1518                .subscribe_candle(coin, interval)
1519                .await
1520                .map_err(to_pyruntime_err)?;
1521            Ok(())
1522        })
1523    }
1524
1525    #[pyo3(name = "unsubscribe_bars")]
1526    fn py_unsubscribe_bars<'py>(
1527        &self,
1528        py: Python<'py>,
1529        bar_type: BarType,
1530    ) -> PyResult<Bound<'py, PyAny>> {
1531        let client = self.clone();
1532        let coin = Ustr::from(bar_type.instrument_id().symbol.as_str());
1533        let interval = "1m".to_string();
1534
1535        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1536            client
1537                .unsubscribe_candle(coin, interval)
1538                .await
1539                .map_err(to_pyruntime_err)?;
1540            Ok(())
1541        })
1542    }
1543
1544    #[pyo3(name = "subscribe_order_updates")]
1545    fn py_subscribe_order_updates<'py>(
1546        &self,
1547        py: Python<'py>,
1548        user: String,
1549    ) -> PyResult<Bound<'py, PyAny>> {
1550        let client = self.clone();
1551
1552        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1553            client
1554                .subscribe_order_updates(&user)
1555                .await
1556                .map_err(to_pyruntime_err)?;
1557            Ok(())
1558        })
1559    }
1560
1561    #[pyo3(name = "subscribe_user_events")]
1562    fn py_subscribe_user_events<'py>(
1563        &self,
1564        py: Python<'py>,
1565        user: String,
1566    ) -> PyResult<Bound<'py, PyAny>> {
1567        let client = self.clone();
1568
1569        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1570            client
1571                .subscribe_user_events(&user)
1572                .await
1573                .map_err(to_pyruntime_err)?;
1574            Ok(())
1575        })
1576    }
1577}