nautilus_hyperliquid/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashSet, sync::Arc, time::Duration};
17
18use dashmap::DashMap;
19use futures_util::{Stream, future::BoxFuture};
20#[cfg(feature = "python")]
21use nautilus_core::python::to_pyruntime_err;
22use nautilus_core::time::get_atomic_clock_realtime;
23#[cfg(feature = "python")]
24use nautilus_model::{
25    data::{BarType, Data, OrderBookDeltas_API},
26    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
27};
28use nautilus_model::{
29    identifiers::{AccountId, InstrumentId},
30    instruments::{Instrument, InstrumentAny},
31};
32use nautilus_network::websocket::{WebSocketClient, WebSocketConfig, channel_message_handler};
33#[cfg(feature = "python")]
34use pyo3::{exceptions::PyRuntimeError, prelude::*};
35use tokio::sync::{RwLock, mpsc};
36use tokio_tungstenite::tungstenite::Message;
37use ustr::Ustr;
38
39#[cfg(feature = "python")]
40use crate::websocket::parse::{
41    parse_ws_candle, parse_ws_order_book_deltas, parse_ws_quote_tick, parse_ws_trade_tick,
42};
43use crate::{
44    http::error::{Error, Result as HyperliquidResult},
45    websocket::{
46        messages::{
47            ActionPayload, ExecutionReport, HyperliquidWsMessage, HyperliquidWsRequest,
48            NautilusWsMessage, PostRequest, PostResponsePayload, SubscriptionRequest,
49        },
50        parse::{parse_ws_fill_report, parse_ws_order_status_report},
51        post::{
52            PostBatcher, PostIds, PostLane, PostRouter, ScheduledPost, WsSender, lane_for_action,
53        },
54    },
55};
56
57/// Errors that can occur during Hyperliquid WebSocket operations.
58#[derive(Debug, Clone, thiserror::Error)]
59pub enum HyperliquidError {
60    #[error("URL parsing failed: {0}")]
61    UrlParsing(String),
62
63    #[error("Message serialization failed: {0}")]
64    MessageSerialization(String),
65
66    #[error("Message deserialization failed: {0}")]
67    MessageDeserialization(String),
68
69    #[error("WebSocket connection failed: {0}")]
70    Connection(String),
71
72    #[error("Channel send failed: {0}")]
73    ChannelSend(String),
74}
75
76/// Codec for encoding and decoding Hyperliquid WebSocket messages.
77///
78/// This struct provides methods to validate URLs and serialize/deserialize messages
79/// according to the Hyperliquid WebSocket protocol.
80#[derive(Debug, Default)]
81pub struct HyperliquidCodec;
82
83impl HyperliquidCodec {
84    /// Creates a new Hyperliquid codec instance.
85    pub fn new() -> Self {
86        Self
87    }
88
89    /// Validates that a URL is a proper WebSocket URL.
90    pub fn validate_url(url: &str) -> Result<(), HyperliquidError> {
91        if url.starts_with("ws://") || url.starts_with("wss://") {
92            Ok(())
93        } else {
94            Err(HyperliquidError::UrlParsing(format!(
95                "URL must start with ws:// or wss://, was: {}",
96                url
97            )))
98        }
99    }
100
101    /// Encodes a WebSocket request to JSON bytes.
102    pub fn encode(&self, request: &HyperliquidWsRequest) -> Result<Vec<u8>, HyperliquidError> {
103        serde_json::to_vec(request).map_err(|e| {
104            HyperliquidError::MessageSerialization(format!("Failed to serialize request: {}", e))
105        })
106    }
107
108    /// Decodes JSON bytes to a WebSocket message.
109    pub fn decode(&self, data: &[u8]) -> Result<HyperliquidWsMessage, HyperliquidError> {
110        serde_json::from_slice(data).map_err(|e| {
111            HyperliquidError::MessageDeserialization(format!(
112                "Failed to deserialize message: {}",
113                e
114            ))
115        })
116    }
117}
118
119/// Low-level Hyperliquid WebSocket client that wraps Nautilus WebSocketClient.
120///
121/// This is the inner client that handles the transport layer and provides low-level
122/// WebSocket methods with `ws_*` prefixes.
123#[derive(Debug)]
124pub struct HyperliquidWebSocketInnerClient {
125    inner: Arc<WebSocketClient>,
126    rx_inbound: mpsc::Receiver<HyperliquidWsMessage>,
127    sent_subscriptions: HashSet<String>,
128    _reader_task: tokio::task::JoinHandle<()>,
129    post_router: Arc<PostRouter>,
130    post_ids: PostIds,
131    #[allow(dead_code, reason = "Reserved for future direct WebSocket operations")]
132    ws_sender: WsSender,
133    post_batcher: PostBatcher,
134}
135
136impl HyperliquidWebSocketInnerClient {
137    /// Creates a new Hyperliquid WebSocket inner client with reconnection/backoff/heartbeat.
138    /// Returns a client that owns the inbound message receiver.
139    pub async fn connect(url: &str) -> anyhow::Result<Self> {
140        // Create message handler for receiving raw WebSocket messages
141        let (message_handler, mut raw_rx) = channel_message_handler();
142
143        let cfg = WebSocketConfig {
144            url: url.to_string(),
145            headers: vec![],
146            message_handler: Some(message_handler),
147            heartbeat: Some(20), // seconds; set lower than server idle timeout
148            heartbeat_msg: None, // use WS Ping frames by default
149            ping_handler: None,
150            reconnect_timeout_ms: Some(15_000),
151            reconnect_delay_initial_ms: Some(250),
152            reconnect_delay_max_ms: Some(5_000),
153            reconnect_backoff_factor: Some(2.0),
154            reconnect_jitter_ms: Some(200),
155        };
156
157        let client = Arc::new(WebSocketClient::connect(cfg, None, vec![], None).await?);
158        tracing::info!("Hyperliquid WebSocket connected: {}", url);
159
160        let post_router = PostRouter::new();
161        let post_ids = PostIds::new(1);
162        let (tx_inbound, rx_inbound) = mpsc::channel::<HyperliquidWsMessage>(1024);
163        let (tx_outbound, mut rx_outbound) = mpsc::channel::<HyperliquidWsRequest>(1024);
164
165        let ws_sender = WsSender::new(tx_outbound);
166
167        // Reader task: decode messages and route post replies *before* handing to general pipeline.
168        let post_router_for_reader = Arc::clone(&post_router);
169        let reader_task = tokio::spawn(async move {
170            while let Some(msg) = raw_rx.recv().await {
171                match msg {
172                    Message::Text(txt) => {
173                        tracing::debug!("Received WS text: {}", txt);
174                        match serde_json::from_str::<HyperliquidWsMessage>(&txt) {
175                            Ok(hl_msg) => {
176                                if let HyperliquidWsMessage::Post { data } = &hl_msg {
177                                    // Route the correlated response
178                                    post_router_for_reader.complete(data.clone()).await;
179                                }
180                                if let Err(e) = tx_inbound.send(hl_msg).await {
181                                    tracing::error!("Failed to send decoded message: {}", e);
182                                    break;
183                                }
184                            }
185                            Err(err) => {
186                                tracing::error!(
187                                    "Failed to decode Hyperliquid message: {} | text: {}",
188                                    err,
189                                    txt
190                                );
191                            }
192                        }
193                    }
194                    Message::Binary(data) => {
195                        tracing::debug!("Received binary message ({} bytes), ignoring", data.len())
196                    }
197                    Message::Ping(data) => {
198                        tracing::debug!("Received ping frame ({} bytes)", data.len())
199                    }
200                    Message::Pong(data) => {
201                        tracing::debug!("Received pong frame ({} bytes)", data.len())
202                    }
203                    Message::Close(close_frame) => {
204                        tracing::info!("Received close frame: {:?}", close_frame);
205                        break;
206                    }
207                    Message::Frame(_) => tracing::warn!("Received raw frame (unexpected)"),
208                }
209            }
210            tracing::info!("Hyperliquid WebSocket reader finished");
211        });
212
213        // Spawn task to handle outbound messages
214        let client_for_sender = Arc::clone(&client);
215        tokio::spawn(async move {
216            while let Some(req) = rx_outbound.recv().await {
217                let json = match serde_json::to_string(&req) {
218                    Ok(json) => json,
219                    Err(e) => {
220                        tracing::error!("Failed to serialize WS request: {}", e);
221                        continue;
222                    }
223                };
224                tracing::debug!("Sending WS message: {}", json);
225                if let Err(e) = client_for_sender.send_text(json, None).await {
226                    tracing::error!("Failed to send WS message: {}", e);
227                    break;
228                }
229            }
230            tracing::info!("WebSocket sender task finished");
231        });
232
233        // Create send function for batcher using a proper async closure
234        let ws_sender_for_batcher = ws_sender.clone();
235
236        let send_fn =
237            move |req: HyperliquidWsRequest| -> BoxFuture<'static, HyperliquidResult<()>> {
238                let sender = ws_sender_for_batcher.clone();
239                Box::pin(async move { sender.send(req).await })
240            };
241
242        let post_batcher = PostBatcher::new(send_fn);
243
244        let hl_client = Self {
245            inner: client,
246            rx_inbound,
247            sent_subscriptions: HashSet::new(),
248            _reader_task: reader_task,
249            post_router,
250            post_ids,
251            ws_sender,
252            post_batcher,
253        };
254
255        Ok(hl_client)
256    }
257
258    /// Low-level method to send a Hyperliquid WebSocket request.
259    pub async fn ws_send(&self, request: &HyperliquidWsRequest) -> anyhow::Result<()> {
260        let json = serde_json::to_string(request)?;
261        tracing::debug!("Sending WS message: {}", json);
262        self.inner
263            .send_text(json, None)
264            .await
265            .map_err(|e| anyhow::anyhow!(e))
266    }
267
268    /// Low-level method to send a request only once (dedup by JSON serialization).
269    pub async fn ws_send_once(&mut self, request: &HyperliquidWsRequest) -> anyhow::Result<()> {
270        let json = serde_json::to_string(request)?;
271        if self.sent_subscriptions.contains(&json) {
272            tracing::debug!("Skipping duplicate request: {}", json);
273            return Ok(());
274        }
275
276        tracing::debug!("Sending WS message: {}", json);
277        self.inner
278            .send_text(json.clone(), None)
279            .await
280            .map_err(|e| anyhow::anyhow!(e))?;
281
282        self.sent_subscriptions.insert(json);
283        Ok(())
284    }
285
286    /// Low-level method to subscribe to a specific channel.
287    pub async fn ws_subscribe(&mut self, subscription: SubscriptionRequest) -> anyhow::Result<()> {
288        let request = HyperliquidWsRequest::Subscribe { subscription };
289        self.ws_send_once(&request).await
290    }
291
292    /// Low-level method to unsubscribe from a specific channel.
293    pub async fn ws_unsubscribe(
294        &mut self,
295        subscription: SubscriptionRequest,
296    ) -> anyhow::Result<()> {
297        let request = HyperliquidWsRequest::Unsubscribe { subscription };
298        self.ws_send(&request).await
299    }
300
301    /// Get the next event from the WebSocket stream.
302    /// Returns None when the connection is closed or the receiver is exhausted.
303    pub async fn ws_next_event(&mut self) -> Option<HyperliquidWsMessage> {
304        self.rx_inbound.recv().await
305    }
306
307    /// Returns true if the WebSocket connection is active.
308    pub fn is_active(&self) -> bool {
309        self.inner.is_active()
310    }
311
312    /// Returns true if the WebSocket is reconnecting.
313    pub fn is_reconnecting(&self) -> bool {
314        self.inner.is_reconnecting()
315    }
316
317    /// Returns true if the WebSocket is disconnecting.
318    pub fn is_disconnecting(&self) -> bool {
319        self.inner.is_disconnecting()
320    }
321
322    /// Returns true if the WebSocket is closed.
323    pub fn is_closed(&self) -> bool {
324        self.inner.is_closed()
325    }
326
327    /// Disconnect the WebSocket client.
328    pub async fn ws_disconnect(&mut self) -> anyhow::Result<()> {
329        self.inner.disconnect().await;
330        Ok(())
331    }
332
333    /// Convenience: enqueue a post on a specific lane.
334    async fn enqueue_post(
335        &self,
336        id: u64,
337        request: PostRequest,
338        lane: PostLane,
339    ) -> HyperliquidResult<()> {
340        self.post_batcher
341            .enqueue(ScheduledPost { id, request, lane })
342            .await
343    }
344
345    /// Core: send an Info post and await response with timeout.
346    pub async fn post_info_raw(
347        &self,
348        payload: serde_json::Value,
349        timeout: Duration,
350    ) -> HyperliquidResult<PostResponsePayload> {
351        let id = self.post_ids.next();
352        let rx = self.post_router.register(id).await?;
353        self.enqueue_post(id, PostRequest::Info { payload }, PostLane::Normal)
354            .await?;
355        let resp = self.post_router.await_with_timeout(id, rx, timeout).await?;
356        Ok(resp.response)
357    }
358
359    /// Core: send an Action post and await response with timeout.
360    pub async fn post_action_raw(
361        &self,
362        action: ActionPayload,
363        timeout: Duration,
364    ) -> HyperliquidResult<PostResponsePayload> {
365        let id = self.post_ids.next();
366        let rx = self.post_router.register(id).await?;
367        let lane = lane_for_action(&action.action);
368        self.enqueue_post(id, PostRequest::Action { payload: action }, lane)
369            .await?;
370        let resp = self.post_router.await_with_timeout(id, rx, timeout).await?;
371        Ok(resp.response)
372    }
373
374    /// Get l2Book via WS post and parse using shared REST model.
375    pub async fn info_l2_book(
376        &self,
377        coin: &str,
378        timeout: Duration,
379    ) -> HyperliquidResult<crate::http::models::HyperliquidL2Book> {
380        let payload = match self
381            .post_info_raw(serde_json::json!({"type":"l2Book","coin":coin}), timeout)
382            .await?
383        {
384            PostResponsePayload::Info { payload } => payload,
385            PostResponsePayload::Error { payload } => return Err(Error::exchange(payload)),
386            PostResponsePayload::Action { .. } => {
387                return Err(Error::decode("expected info payload, was action"));
388            }
389        };
390        serde_json::from_value(payload).map_err(Error::Serde)
391    }
392}
393
394/// High-level Hyperliquid WebSocket client that provides standardized domain methods.
395///
396/// This client uses Arc<RwLock<>> for internal state to support Clone and safe sharing
397/// across async tasks, following the same pattern as other exchange adapters (OKX, Bitmex, Bybit).
398#[derive(Clone, Debug)]
399#[cfg_attr(
400    feature = "python",
401    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
402)]
403pub struct HyperliquidWebSocketClient {
404    inner: Arc<RwLock<Option<HyperliquidWebSocketInnerClient>>>,
405    url: String,
406    instruments: Arc<DashMap<InstrumentId, InstrumentAny>>,
407}
408
409impl HyperliquidWebSocketClient {
410    /// Creates a new Hyperliquid WebSocket client without connecting.
411    /// The connection will be established when `ensure_connected()` is called.
412    pub fn new(url: String) -> Self {
413        Self {
414            inner: Arc::new(RwLock::new(None)),
415            url,
416            instruments: Arc::new(DashMap::new()),
417        }
418    }
419
420    /// Adds an instrument to the cache for parsing WebSocket messages.
421    pub fn add_instrument(&self, instrument: InstrumentAny) {
422        self.instruments.insert(instrument.id(), instrument);
423    }
424
425    /// Gets an instrument from the cache by ID.
426    pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
427        self.instruments.get(id).map(|e| e.value().clone())
428    }
429
430    /// Gets an instrument from the cache by symbol.
431    pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
432        self.instruments
433            .iter()
434            .find(|e| e.key().symbol == (*symbol).into())
435            .map(|e| e.value().clone())
436    }
437
438    /// Creates a new Hyperliquid WebSocket client and establishes connection.
439    pub async fn connect(url: &str) -> anyhow::Result<Self> {
440        let inner_client = HyperliquidWebSocketInnerClient::connect(url).await?;
441        Ok(Self {
442            inner: Arc::new(RwLock::new(Some(inner_client))),
443            url: url.to_string(),
444            instruments: Arc::new(DashMap::new()),
445        })
446    }
447
448    /// Establishes the WebSocket connection if not already connected.
449    pub async fn ensure_connected(&self) -> anyhow::Result<()> {
450        let mut inner = self.inner.write().await;
451        if inner.is_none() {
452            let inner_client = HyperliquidWebSocketInnerClient::connect(&self.url).await?;
453            *inner = Some(inner_client);
454        }
455        Ok(())
456    }
457
458    /// Returns true if the WebSocket is connected.
459    pub async fn is_connected(&self) -> bool {
460        let inner = self.inner.read().await;
461        inner.is_some()
462    }
463
464    /// Returns the URL of this WebSocket client.
465    pub fn url(&self) -> &str {
466        &self.url
467    }
468
469    /// Subscribe to order updates for a specific user address.
470    ///
471    /// Ensures connection is established before subscribing.
472    pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
473        self.ensure_connected().await?;
474        let subscription = SubscriptionRequest::OrderUpdates {
475            user: user.to_string(),
476        };
477        let mut inner = self.inner.write().await;
478        inner
479            .as_mut()
480            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
481            .ws_subscribe(subscription)
482            .await
483    }
484
485    /// Subscribe to user events (fills, funding, liquidations) for a specific user address.
486    ///
487    /// Ensures connection is established before subscribing.
488    pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
489        self.ensure_connected().await?;
490        let subscription = SubscriptionRequest::UserEvents {
491            user: user.to_string(),
492        };
493        let mut inner = self.inner.write().await;
494        inner
495            .as_mut()
496            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
497            .ws_subscribe(subscription)
498            .await
499    }
500
501    /// Subscribe to all user channels (order updates + user events) for convenience.
502    pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
503        self.subscribe_order_updates(user).await?;
504        self.subscribe_user_events(user).await?;
505        Ok(())
506    }
507
508    /// Subscribe to trades for a specific coin.
509    ///
510    /// Ensures connection is established before subscribing.
511    pub async fn subscribe_trades(&self, coin: Ustr) -> anyhow::Result<()> {
512        self.ensure_connected().await?;
513        let subscription = SubscriptionRequest::Trades { coin };
514        let mut inner = self.inner.write().await;
515        inner
516            .as_mut()
517            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
518            .ws_subscribe(subscription)
519            .await
520    }
521
522    /// Unsubscribe from trades for a specific coin.
523    ///
524    /// Ensures connection is established before unsubscribing.
525    pub async fn unsubscribe_trades(&self, coin: Ustr) -> anyhow::Result<()> {
526        self.ensure_connected().await?;
527        let subscription = SubscriptionRequest::Trades { coin };
528        let mut inner = self.inner.write().await;
529        inner
530            .as_mut()
531            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
532            .ws_unsubscribe(subscription)
533            .await
534    }
535
536    /// Subscribe to L2 order book for a specific coin.
537    ///
538    /// Ensures connection is established before subscribing.
539    pub async fn subscribe_book(&self, coin: Ustr) -> anyhow::Result<()> {
540        self.ensure_connected().await?;
541        let subscription = SubscriptionRequest::L2Book {
542            coin,
543            n_sig_figs: None,
544            mantissa: None,
545        };
546        let mut inner = self.inner.write().await;
547        inner
548            .as_mut()
549            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
550            .ws_subscribe(subscription)
551            .await
552    }
553
554    /// Unsubscribe from L2 order book for a specific coin.
555    ///
556    /// Ensures connection is established before unsubscribing.
557    pub async fn unsubscribe_book(&self, coin: Ustr) -> anyhow::Result<()> {
558        self.ensure_connected().await?;
559        let subscription = SubscriptionRequest::L2Book {
560            coin,
561            n_sig_figs: None,
562            mantissa: None,
563        };
564        let mut inner = self.inner.write().await;
565        inner
566            .as_mut()
567            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
568            .ws_unsubscribe(subscription)
569            .await
570    }
571
572    /// Subscribe to BBO (best bid/offer) for a specific coin.
573    ///
574    /// Ensures connection is established before subscribing.
575    pub async fn subscribe_bbo(&self, coin: Ustr) -> anyhow::Result<()> {
576        self.ensure_connected().await?;
577        let subscription = SubscriptionRequest::Bbo { coin };
578        let mut inner = self.inner.write().await;
579        inner
580            .as_mut()
581            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
582            .ws_subscribe(subscription)
583            .await
584    }
585
586    /// Unsubscribe from BBO (best bid/offer) for a specific coin.
587    ///
588    /// Ensures connection is established before unsubscribing.
589    pub async fn unsubscribe_bbo(&self, coin: Ustr) -> anyhow::Result<()> {
590        self.ensure_connected().await?;
591        let subscription = SubscriptionRequest::Bbo { coin };
592        let mut inner = self.inner.write().await;
593        inner
594            .as_mut()
595            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
596            .ws_unsubscribe(subscription)
597            .await
598    }
599
600    /// Subscribe to candlestick data for a specific coin and interval.
601    ///
602    /// Ensures connection is established before subscribing.
603    pub async fn subscribe_candle(&self, coin: Ustr, interval: String) -> anyhow::Result<()> {
604        self.ensure_connected().await?;
605        let subscription = SubscriptionRequest::Candle { coin, interval };
606        let mut inner = self.inner.write().await;
607        inner
608            .as_mut()
609            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
610            .ws_subscribe(subscription)
611            .await
612    }
613
614    /// Unsubscribe from candlestick data for a specific coin and interval.
615    ///
616    /// Ensures connection is established before unsubscribing.
617    pub async fn unsubscribe_candle(&self, coin: Ustr, interval: String) -> anyhow::Result<()> {
618        self.ensure_connected().await?;
619        let subscription = SubscriptionRequest::Candle { coin, interval };
620        let mut inner = self.inner.write().await;
621        inner
622            .as_mut()
623            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
624            .ws_unsubscribe(subscription)
625            .await
626    }
627
628    /// Get the next event from the WebSocket stream.
629    /// Returns None when the connection is closed or the receiver is exhausted.
630    pub async fn next_event(&self) -> Option<HyperliquidWsMessage> {
631        let mut inner = self.inner.write().await;
632        if let Some(ref mut client) = *inner {
633            client.ws_next_event().await
634        } else {
635            None
636        }
637    }
638
639    /// Returns true if the WebSocket connection is active.
640    pub async fn is_active(&self) -> bool {
641        let inner = self.inner.read().await;
642        inner.as_ref().is_some_and(|client| client.is_active())
643    }
644
645    /// Returns true if the WebSocket is reconnecting.
646    pub async fn is_reconnecting(&self) -> bool {
647        let inner = self.inner.read().await;
648        inner
649            .as_ref()
650            .is_some_and(|client| client.is_reconnecting())
651    }
652
653    /// Returns true if the WebSocket is disconnecting.
654    pub async fn is_disconnecting(&self) -> bool {
655        let inner = self.inner.read().await;
656        inner
657            .as_ref()
658            .is_some_and(|client| client.is_disconnecting())
659    }
660
661    /// Returns true if the WebSocket is closed.
662    pub async fn is_closed(&self) -> bool {
663        let inner = self.inner.read().await;
664        inner.as_ref().is_none_or(|client| client.is_closed())
665    }
666
667    /// Disconnect the WebSocket client.
668    pub async fn disconnect(&self) -> anyhow::Result<()> {
669        let mut inner = self.inner.write().await;
670        if let Some(ref mut client) = *inner {
671            client.ws_disconnect().await
672        } else {
673            Ok(())
674        }
675    }
676
677    /// Escape hatch: send raw requests for tests/power users.
678    ///
679    /// Ensures connection is established before sending.
680    pub async fn send_raw(&self, request: &HyperliquidWsRequest) -> anyhow::Result<()> {
681        self.ensure_connected().await?;
682        let mut inner = self.inner.write().await;
683        inner
684            .as_mut()
685            .ok_or_else(|| anyhow::anyhow!("Client not connected"))?
686            .ws_send(request)
687            .await
688    }
689
690    /// High-level: call info l2Book (WS post)
691    ///
692    /// Ensures connection is established before making the request.
693    pub async fn info_l2_book(
694        &self,
695        coin: &str,
696        timeout: Duration,
697    ) -> HyperliquidResult<crate::http::models::HyperliquidL2Book> {
698        self.ensure_connected().await.map_err(|e| Error::Http {
699            status: 500,
700            message: e.to_string(),
701        })?;
702        let mut inner = self.inner.write().await;
703        inner
704            .as_mut()
705            .ok_or_else(|| Error::Http {
706                status: 500,
707                message: "Client not connected".to_string(),
708            })?
709            .info_l2_book(coin, timeout)
710            .await
711    }
712
713    /// High-level: fire arbitrary info (WS post) returning raw payload.
714    ///
715    /// Ensures connection is established before making the request.
716    pub async fn post_info_raw(
717        &self,
718        payload: serde_json::Value,
719        timeout: Duration,
720    ) -> HyperliquidResult<PostResponsePayload> {
721        self.ensure_connected().await.map_err(|e| Error::Http {
722            status: 500,
723            message: e.to_string(),
724        })?;
725        let mut inner = self.inner.write().await;
726        inner
727            .as_mut()
728            .ok_or_else(|| Error::Http {
729                status: 500,
730                message: "Client not connected".to_string(),
731            })?
732            .post_info_raw(payload, timeout)
733            .await
734    }
735
736    /// High-level: fire action (already signed ActionPayload)
737    ///
738    /// Ensures connection is established before making the request.
739    pub async fn post_action_raw(
740        &self,
741        action: ActionPayload,
742        timeout: Duration,
743    ) -> HyperliquidResult<PostResponsePayload> {
744        self.ensure_connected().await.map_err(|e| Error::Http {
745            status: 500,
746            message: e.to_string(),
747        })?;
748        let mut inner = self.inner.write().await;
749        inner
750            .as_mut()
751            .ok_or_else(|| Error::Http {
752                status: 500,
753                message: "Client not connected".to_string(),
754            })?
755            .post_action_raw(action, timeout)
756            .await
757    }
758
759    /// Creates a stream of execution messages (order updates and fills).
760    ///
761    /// This method spawns a background task that listens for WebSocket messages
762    /// and processes OrderUpdates and UserEvents (fills) into ExecutionReports.
763    /// The execution reports are sent through the returned stream for processing
764    /// by the execution client.
765    ///
766    /// # Arguments
767    ///
768    /// * `account_id` - Account ID for report generation
769    /// * `user_address` - User address to subscribe to order updates and user events
770    ///
771    /// # Returns
772    ///
773    /// A stream of `NautilusWsMessage` containing execution reports
774    ///
775    /// # Errors
776    ///
777    /// Returns an error if subscription fails or connection cannot be established
778    pub async fn stream_execution_messages(
779        &self,
780        account_id: AccountId,
781        user_address: String,
782    ) -> anyhow::Result<impl Stream<Item = NautilusWsMessage>> {
783        // Ensure connection
784        self.ensure_connected().await?;
785
786        // Subscribe to order updates and user events
787        self.subscribe_order_updates(&user_address).await?;
788        self.subscribe_user_events(&user_address).await?;
789
790        let client = self.clone();
791        let (tx, rx) = mpsc::unbounded_channel();
792
793        // Spawn background task to process WebSocket messages
794        tokio::spawn(async move {
795            let clock = get_atomic_clock_realtime();
796
797            loop {
798                let event = client.next_event().await;
799
800                match event {
801                    Some(msg) => {
802                        match &msg {
803                            HyperliquidWsMessage::OrderUpdates { data } => {
804                                let mut exec_reports = Vec::new();
805
806                                // Process each order update in the array
807                                for order_update in data {
808                                    if let Some(instrument) =
809                                        client.get_instrument_by_symbol(&order_update.order.coin)
810                                    {
811                                        let ts_init = clock.get_time_ns();
812
813                                        match parse_ws_order_status_report(
814                                            order_update,
815                                            &instrument,
816                                            account_id,
817                                            ts_init,
818                                        ) {
819                                            Ok(report) => {
820                                                exec_reports.push(ExecutionReport::Order(report));
821                                            }
822                                            Err(e) => {
823                                                tracing::error!(
824                                                    "Error parsing order update: {}",
825                                                    e
826                                                );
827                                            }
828                                        }
829                                    } else {
830                                        tracing::warn!(
831                                            "No instrument found for symbol: {}",
832                                            order_update.order.coin
833                                        );
834                                    }
835                                }
836
837                                // Send reports if any
838                                if !exec_reports.is_empty()
839                                    && let Err(e) =
840                                        tx.send(NautilusWsMessage::ExecutionReports(exec_reports))
841                                {
842                                    tracing::error!("Failed to send execution reports: {}", e);
843                                    break;
844                                }
845                            }
846                            HyperliquidWsMessage::UserEvents { data } => {
847                                use crate::websocket::messages::WsUserEventData;
848
849                                let ts_init = clock.get_time_ns();
850
851                                match data {
852                                    WsUserEventData::Fills { fills } => {
853                                        let mut exec_reports = Vec::new();
854
855                                        // Process each fill
856                                        for fill in fills {
857                                            if let Some(instrument) =
858                                                client.get_instrument_by_symbol(&fill.coin)
859                                            {
860                                                match parse_ws_fill_report(
861                                                    fill,
862                                                    &instrument,
863                                                    account_id,
864                                                    ts_init,
865                                                ) {
866                                                    Ok(report) => {
867                                                        exec_reports
868                                                            .push(ExecutionReport::Fill(report));
869                                                    }
870                                                    Err(e) => {
871                                                        tracing::error!(
872                                                            "Error parsing fill: {}",
873                                                            e
874                                                        );
875                                                    }
876                                                }
877                                            } else {
878                                                tracing::warn!(
879                                                    "No instrument found for symbol: {}",
880                                                    fill.coin
881                                                );
882                                            }
883                                        }
884
885                                        // Send reports if any
886                                        if !exec_reports.is_empty()
887                                            && let Err(e) = tx.send(
888                                                NautilusWsMessage::ExecutionReports(exec_reports),
889                                            )
890                                        {
891                                            tracing::error!("Failed to send fill reports: {}", e);
892                                            break;
893                                        }
894                                    }
895                                    _ => {
896                                        // Other user events (funding, liquidation, etc.) not handled yet
897                                    }
898                                }
899                            }
900                            _ => {
901                                // Ignore other message types in execution stream
902                            }
903                        }
904                    }
905                    None => {
906                        // Connection closed
907                        break;
908                    }
909                }
910            }
911        });
912
913        // Return the stream
914        Ok(async_stream::stream! {
915            let mut rx = rx;
916            while let Some(msg) = rx.recv().await {
917                yield msg;
918            }
919        })
920    }
921}
922
923// Python bindings
924#[cfg(feature = "python")]
925#[pyo3::pymethods]
926impl HyperliquidWebSocketClient {
927    #[new]
928    #[pyo3(signature = (url))]
929    fn py_new(url: String) -> PyResult<Self> {
930        Ok(Self::new(url))
931    }
932
933    #[getter]
934    #[pyo3(name = "url")]
935    #[must_use]
936    pub fn py_url(&self) -> String {
937        self.url().to_string()
938    }
939
940    #[pyo3(name = "is_active")]
941    fn py_is_active<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
942        let client = self.clone();
943        pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(client.is_active().await) })
944    }
945
946    #[pyo3(name = "is_closed")]
947    fn py_is_closed<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
948        let client = self.clone();
949        pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(client.is_closed().await) })
950    }
951
952    #[pyo3(name = "connect")]
953    fn py_connect<'py>(
954        &self,
955        py: Python<'py>,
956        instruments: Vec<Py<PyAny>>,
957        callback: Py<PyAny>,
958    ) -> PyResult<Bound<'py, PyAny>> {
959        // Parse instruments from Python objects and store in cache
960        for inst in instruments {
961            let inst_any = pyobject_to_instrument_any(py, inst)?;
962            self.add_instrument(inst_any);
963        }
964
965        let client = self.clone();
966
967        pyo3_async_runtimes::tokio::future_into_py(py, async move {
968            client.ensure_connected().await.map_err(to_pyruntime_err)?;
969
970            // Spawn background task to handle incoming messages
971            tokio::spawn(async move {
972                let clock = get_atomic_clock_realtime();
973
974                loop {
975                    let event = client.next_event().await;
976
977                    match event {
978                        Some(msg) => {
979                            tracing::debug!("Received WebSocket message: {:?}", msg);
980
981                            // Parse and send to Python callback
982                            match msg {
983                                HyperliquidWsMessage::Trades { data } => {
984                                    for trade in data {
985                                        if let Some(instrument) =
986                                            client.get_instrument_by_symbol(&trade.coin)
987                                        {
988                                            let ts_init = clock.get_time_ns();
989                                            match parse_ws_trade_tick(&trade, &instrument, ts_init)
990                                            {
991                                                Ok(tick) => {
992                                                    Python::attach(|py| {
993                                                        let py_obj = data_to_pycapsule(
994                                                            py,
995                                                            Data::Trade(tick),
996                                                        );
997                                                        if let Err(e) =
998                                                            callback.bind(py).call1((py_obj,))
999                                                        {
1000                                                            tracing::error!(
1001                                                                "Error calling Python callback: {}",
1002                                                                e
1003                                                            );
1004                                                        }
1005                                                    });
1006                                                }
1007                                                Err(e) => {
1008                                                    tracing::error!(
1009                                                        "Error parsing trade tick: {}",
1010                                                        e
1011                                                    );
1012                                                }
1013                                            }
1014                                        } else {
1015                                            tracing::warn!(
1016                                                "No instrument found for symbol: {}",
1017                                                trade.coin
1018                                            );
1019                                        }
1020                                    }
1021                                }
1022                                HyperliquidWsMessage::L2Book { data } => {
1023                                    if let Some(instrument) =
1024                                        client.get_instrument_by_symbol(&data.coin)
1025                                    {
1026                                        let ts_init = clock.get_time_ns();
1027                                        match parse_ws_order_book_deltas(
1028                                            &data,
1029                                            &instrument,
1030                                            ts_init,
1031                                        ) {
1032                                            Ok(deltas) => {
1033                                                Python::attach(|py| {
1034                                                    let py_obj = data_to_pycapsule(
1035                                                        py,
1036                                                        Data::Deltas(OrderBookDeltas_API::new(
1037                                                            deltas,
1038                                                        )),
1039                                                    );
1040                                                    if let Err(e) =
1041                                                        callback.bind(py).call1((py_obj,))
1042                                                    {
1043                                                        tracing::error!(
1044                                                            "Error calling Python callback: {}",
1045                                                            e
1046                                                        );
1047                                                    }
1048                                                });
1049                                            }
1050                                            Err(e) => {
1051                                                tracing::error!(
1052                                                    "Error parsing order book deltas: {}",
1053                                                    e
1054                                                );
1055                                            }
1056                                        }
1057                                    } else {
1058                                        tracing::warn!(
1059                                            "No instrument found for symbol: {}",
1060                                            data.coin
1061                                        );
1062                                    }
1063                                }
1064                                HyperliquidWsMessage::Bbo { data } => {
1065                                    if let Some(instrument) =
1066                                        client.get_instrument_by_symbol(&data.coin)
1067                                    {
1068                                        let ts_init = clock.get_time_ns();
1069                                        match parse_ws_quote_tick(&data, &instrument, ts_init) {
1070                                            Ok(quote) => {
1071                                                Python::attach(|py| {
1072                                                    let py_obj =
1073                                                        data_to_pycapsule(py, Data::Quote(quote));
1074                                                    if let Err(e) =
1075                                                        callback.bind(py).call1((py_obj,))
1076                                                    {
1077                                                        tracing::error!(
1078                                                            "Error calling Python callback: {}",
1079                                                            e
1080                                                        );
1081                                                    }
1082                                                });
1083                                            }
1084                                            Err(e) => {
1085                                                tracing::error!("Error parsing quote tick: {}", e);
1086                                            }
1087                                        }
1088                                    } else {
1089                                        tracing::warn!(
1090                                            "No instrument found for symbol: {}",
1091                                            data.coin
1092                                        );
1093                                    }
1094                                }
1095                                HyperliquidWsMessage::Candle { data } => {
1096                                    if let Some(instrument) =
1097                                        client.get_instrument_by_symbol(&data.s)
1098                                    {
1099                                        let ts_init = clock.get_time_ns();
1100                                        // Create a bar type from the instrument and interval
1101                                        // The actual bar type construction should be done based on the interval
1102                                        let bar_type_str =
1103                                            format!("{}-{}-LAST-EXTERNAL", instrument.id(), data.i);
1104                                        match bar_type_str.parse::<BarType>() {
1105                                            Ok(bar_type) => {
1106                                                match parse_ws_candle(
1107                                                    &data,
1108                                                    &instrument,
1109                                                    &bar_type,
1110                                                    ts_init,
1111                                                ) {
1112                                                    Ok(bar) => {
1113                                                        Python::attach(|py| {
1114                                                            let py_obj = data_to_pycapsule(
1115                                                                py,
1116                                                                Data::Bar(bar),
1117                                                            );
1118                                                            if let Err(e) =
1119                                                                callback.bind(py).call1((py_obj,))
1120                                                            {
1121                                                                tracing::error!(
1122                                                                    "Error calling Python callback: {}",
1123                                                                    e
1124                                                                );
1125                                                            }
1126                                                        });
1127                                                    }
1128                                                    Err(e) => {
1129                                                        tracing::error!(
1130                                                            "Error parsing candle: {}",
1131                                                            e
1132                                                        );
1133                                                    }
1134                                                }
1135                                            }
1136                                            Err(e) => {
1137                                                tracing::error!("Error creating bar type: {}", e);
1138                                            }
1139                                        }
1140                                    } else {
1141                                        tracing::warn!(
1142                                            "No instrument found for symbol: {}",
1143                                            data.s
1144                                        );
1145                                    }
1146                                }
1147                                HyperliquidWsMessage::OrderUpdates { data } => {
1148                                    // Process each order update in the array
1149                                    for order_update in data {
1150                                        if let Some(instrument) = client
1151                                            .get_instrument_by_symbol(&order_update.order.coin)
1152                                        {
1153                                            let ts_init = clock.get_time_ns();
1154                                            // We need an account_id - this should come from the client config
1155                                            // For now, use a default account ID
1156                                            let account_id =
1157                                                nautilus_model::identifiers::AccountId::new(
1158                                                    "HYPERLIQUID-001",
1159                                                );
1160
1161                                            match parse_ws_order_status_report(
1162                                                &order_update,
1163                                                &instrument,
1164                                                account_id,
1165                                                ts_init,
1166                                            ) {
1167                                                Ok(report) => {
1168                                                    // Note: Execution reports should be handled via
1169                                                    // stream_execution_messages() for execution clients,
1170                                                    // not through data callbacks
1171                                                    tracing::info!(
1172                                                        "Parsed order status report: order_id={}, status={:?}",
1173                                                        report.venue_order_id,
1174                                                        report.order_status
1175                                                    );
1176                                                }
1177                                                Err(e) => {
1178                                                    tracing::error!(
1179                                                        "Error parsing order update: {}",
1180                                                        e
1181                                                    );
1182                                                }
1183                                            }
1184                                        } else {
1185                                            tracing::warn!(
1186                                                "No instrument found for symbol: {}",
1187                                                order_update.order.coin
1188                                            );
1189                                        }
1190                                    }
1191                                }
1192                                HyperliquidWsMessage::UserEvents { data } => {
1193                                    use crate::websocket::messages::WsUserEventData;
1194
1195                                    // We need an account_id - this should come from the client config
1196                                    let account_id = nautilus_model::identifiers::AccountId::new(
1197                                        "HYPERLIQUID-001",
1198                                    );
1199                                    let ts_init = clock.get_time_ns();
1200
1201                                    match data {
1202                                        WsUserEventData::Fills { fills } => {
1203                                            // Process each fill
1204                                            for fill in fills {
1205                                                if let Some(instrument) =
1206                                                    client.get_instrument_by_symbol(&fill.coin)
1207                                                {
1208                                                    match parse_ws_fill_report(
1209                                                        &fill,
1210                                                        &instrument,
1211                                                        account_id,
1212                                                        ts_init,
1213                                                    ) {
1214                                                        Ok(report) => {
1215                                                            // Note: Execution reports should be handled via
1216                                                            // stream_execution_messages() for execution clients,
1217                                                            // not through data callbacks
1218                                                            tracing::info!(
1219                                                                "Parsed fill report: trade_id={}, side={:?}, qty={}, price={}",
1220                                                                report.trade_id,
1221                                                                report.order_side,
1222                                                                report.last_qty,
1223                                                                report.last_px
1224                                                            );
1225                                                        }
1226                                                        Err(e) => {
1227                                                            tracing::error!(
1228                                                                "Error parsing fill: {}",
1229                                                                e
1230                                                            );
1231                                                        }
1232                                                    }
1233                                                } else {
1234                                                    tracing::warn!(
1235                                                        "No instrument found for symbol: {}",
1236                                                        fill.coin
1237                                                    );
1238                                                }
1239                                            }
1240                                        }
1241                                        WsUserEventData::Funding { funding } => {
1242                                            tracing::debug!(
1243                                                "Received funding update: {:?}",
1244                                                funding
1245                                            );
1246                                            // Funding updates would need to be converted to appropriate Nautilus events
1247                                            // This could be implemented if funding rate updates are needed
1248                                        }
1249                                        WsUserEventData::Liquidation { liquidation } => {
1250                                            tracing::warn!(
1251                                                "Received liquidation event: {:?}",
1252                                                liquidation
1253                                            );
1254                                            // Liquidation events would need special handling
1255                                            // This could be implemented based on requirements
1256                                        }
1257                                        WsUserEventData::NonUserCancel { non_user_cancel } => {
1258                                            tracing::info!(
1259                                                "Received non-user cancel events: {:?}",
1260                                                non_user_cancel
1261                                            );
1262                                            // These are system-initiated cancels (e.g., post-only rejected)
1263                                            // Could be converted to order status updates if needed
1264                                        }
1265                                        WsUserEventData::TriggerActivated { trigger_activated } => {
1266                                            tracing::debug!(
1267                                                "Trigger order activated: {:?}",
1268                                                trigger_activated
1269                                            );
1270                                            // Trigger activation events indicate a conditional order moved to active
1271                                            // Could be converted to order status updates if needed
1272                                        }
1273                                        WsUserEventData::TriggerTriggered { trigger_triggered } => {
1274                                            tracing::debug!(
1275                                                "Trigger order triggered: {:?}",
1276                                                trigger_triggered
1277                                            );
1278                                            // Trigger execution events indicate a conditional order was triggered
1279                                            // Could be converted to order status updates if needed
1280                                        }
1281                                    }
1282                                }
1283                                _ => {
1284                                    tracing::debug!("Unhandled message type: {:?}", msg);
1285                                }
1286                            }
1287                        }
1288                        None => {
1289                            tracing::info!("WebSocket connection closed");
1290                            break;
1291                        }
1292                    }
1293                }
1294            });
1295
1296            Ok(())
1297        })
1298    }
1299
1300    #[pyo3(name = "wait_until_active")]
1301    fn py_wait_until_active<'py>(
1302        &self,
1303        py: Python<'py>,
1304        timeout_secs: f64,
1305    ) -> PyResult<Bound<'py, PyAny>> {
1306        let client = self.clone();
1307
1308        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1309            let start = std::time::Instant::now();
1310            loop {
1311                if client.is_active().await {
1312                    return Ok(());
1313                }
1314
1315                if start.elapsed().as_secs_f64() >= timeout_secs {
1316                    return Err(PyRuntimeError::new_err(format!(
1317                        "WebSocket connection did not become active within {} seconds",
1318                        timeout_secs
1319                    )));
1320                }
1321
1322                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1323            }
1324        })
1325    }
1326
1327    #[pyo3(name = "close")]
1328    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
1329        let client = self.clone();
1330
1331        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1332            if let Err(e) = client.disconnect().await {
1333                tracing::error!("Error on close: {e}");
1334            }
1335            Ok(())
1336        })
1337    }
1338
1339    #[pyo3(name = "subscribe_trades")]
1340    fn py_subscribe_trades<'py>(
1341        &self,
1342        py: Python<'py>,
1343        instrument_id: InstrumentId,
1344    ) -> PyResult<Bound<'py, PyAny>> {
1345        let client = self.clone();
1346        let coin = Ustr::from(instrument_id.symbol.as_str());
1347
1348        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1349            client
1350                .subscribe_trades(coin)
1351                .await
1352                .map_err(to_pyruntime_err)?;
1353            Ok(())
1354        })
1355    }
1356
1357    #[pyo3(name = "unsubscribe_trades")]
1358    fn py_unsubscribe_trades<'py>(
1359        &self,
1360        py: Python<'py>,
1361        instrument_id: InstrumentId,
1362    ) -> PyResult<Bound<'py, PyAny>> {
1363        let client = self.clone();
1364        let coin = Ustr::from(instrument_id.symbol.as_str());
1365
1366        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1367            client
1368                .unsubscribe_trades(coin)
1369                .await
1370                .map_err(to_pyruntime_err)?;
1371            Ok(())
1372        })
1373    }
1374
1375    #[pyo3(name = "subscribe_order_book_deltas")]
1376    fn py_subscribe_order_book_deltas<'py>(
1377        &self,
1378        py: Python<'py>,
1379        instrument_id: InstrumentId,
1380        _book_type: u8,
1381        _depth: u64,
1382    ) -> PyResult<Bound<'py, PyAny>> {
1383        let client = self.clone();
1384        let coin = Ustr::from(instrument_id.symbol.as_str());
1385
1386        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1387            client
1388                .subscribe_book(coin)
1389                .await
1390                .map_err(to_pyruntime_err)?;
1391            Ok(())
1392        })
1393    }
1394
1395    #[pyo3(name = "unsubscribe_order_book_deltas")]
1396    fn py_unsubscribe_order_book_deltas<'py>(
1397        &self,
1398        py: Python<'py>,
1399        instrument_id: InstrumentId,
1400    ) -> PyResult<Bound<'py, PyAny>> {
1401        let client = self.clone();
1402        let coin = Ustr::from(instrument_id.symbol.as_str());
1403
1404        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1405            client
1406                .unsubscribe_book(coin)
1407                .await
1408                .map_err(to_pyruntime_err)?;
1409            Ok(())
1410        })
1411    }
1412
1413    #[pyo3(name = "subscribe_order_book_snapshots")]
1414    fn py_subscribe_order_book_snapshots<'py>(
1415        &self,
1416        py: Python<'py>,
1417        instrument_id: InstrumentId,
1418        _book_type: u8,
1419        _depth: u64,
1420    ) -> PyResult<Bound<'py, PyAny>> {
1421        let client = self.clone();
1422        let coin = Ustr::from(instrument_id.symbol.as_str());
1423
1424        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1425            client
1426                .subscribe_book(coin)
1427                .await
1428                .map_err(to_pyruntime_err)?;
1429            Ok(())
1430        })
1431    }
1432
1433    #[pyo3(name = "subscribe_quotes")]
1434    fn py_subscribe_quotes<'py>(
1435        &self,
1436        py: Python<'py>,
1437        instrument_id: InstrumentId,
1438    ) -> PyResult<Bound<'py, PyAny>> {
1439        let client = self.clone();
1440        let coin = Ustr::from(instrument_id.symbol.as_str());
1441
1442        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1443            client.subscribe_bbo(coin).await.map_err(to_pyruntime_err)?;
1444            Ok(())
1445        })
1446    }
1447
1448    #[pyo3(name = "unsubscribe_quotes")]
1449    fn py_unsubscribe_quotes<'py>(
1450        &self,
1451        py: Python<'py>,
1452        instrument_id: InstrumentId,
1453    ) -> PyResult<Bound<'py, PyAny>> {
1454        let client = self.clone();
1455        let coin = Ustr::from(instrument_id.symbol.as_str());
1456
1457        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1458            client
1459                .unsubscribe_bbo(coin)
1460                .await
1461                .map_err(to_pyruntime_err)?;
1462            Ok(())
1463        })
1464    }
1465
1466    #[pyo3(name = "subscribe_bars")]
1467    fn py_subscribe_bars<'py>(
1468        &self,
1469        py: Python<'py>,
1470        bar_type: BarType,
1471    ) -> PyResult<Bound<'py, PyAny>> {
1472        let client = self.clone();
1473        let coin = Ustr::from(bar_type.instrument_id().symbol.as_str());
1474        let interval = "1m".to_string();
1475
1476        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1477            client
1478                .subscribe_candle(coin, interval)
1479                .await
1480                .map_err(to_pyruntime_err)?;
1481            Ok(())
1482        })
1483    }
1484
1485    #[pyo3(name = "unsubscribe_bars")]
1486    fn py_unsubscribe_bars<'py>(
1487        &self,
1488        py: Python<'py>,
1489        bar_type: BarType,
1490    ) -> PyResult<Bound<'py, PyAny>> {
1491        let client = self.clone();
1492        let coin = Ustr::from(bar_type.instrument_id().symbol.as_str());
1493        let interval = "1m".to_string();
1494
1495        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1496            client
1497                .unsubscribe_candle(coin, interval)
1498                .await
1499                .map_err(to_pyruntime_err)?;
1500            Ok(())
1501        })
1502    }
1503
1504    #[pyo3(name = "subscribe_order_updates")]
1505    fn py_subscribe_order_updates<'py>(
1506        &self,
1507        py: Python<'py>,
1508        user: String,
1509    ) -> PyResult<Bound<'py, PyAny>> {
1510        let client = self.clone();
1511
1512        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1513            client
1514                .subscribe_order_updates(&user)
1515                .await
1516                .map_err(to_pyruntime_err)?;
1517            Ok(())
1518        })
1519    }
1520
1521    #[pyo3(name = "subscribe_user_events")]
1522    fn py_subscribe_user_events<'py>(
1523        &self,
1524        py: Python<'py>,
1525        user: String,
1526    ) -> PyResult<Bound<'py, PyAny>> {
1527        let client = self.clone();
1528
1529        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1530            client
1531                .subscribe_user_events(&user)
1532                .await
1533                .map_err(to_pyruntime_err)?;
1534            Ok(())
1535        })
1536    }
1537}