nautilus_dydx/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket client for dYdX v4 API.
17//!
18//! This client provides streaming connectivity to dYdX's WebSocket API for both
19//! public market data and private account updates.
20//!
21//! # Authentication
22//!
23//! dYdX v4 uses Cosmos SDK wallet-based authentication. Unlike traditional exchanges:
24//! - **Public channels** require no authentication.
25//! - **Private channels** (subaccounts) only require the wallet address in the subscription message.
26//! - No signature or API key is needed for WebSocket connections themselves.
27//!
28//! # References
29//!
30//! <https://docs.dydx.trade/developers/indexer/websockets>
31
32/// Rate limit key for subscription operations (subscribe/unsubscribe).
33///
34/// dYdX allows up to 2 subscription messages per second per connection.
35/// See: <https://docs.dydx.trade/developers/indexer/websockets#rate-limits>
36pub const DYDX_RATE_LIMIT_KEY_SUBSCRIPTION: &str = "subscription";
37
38/// WebSocket topic delimiter for dYdX (channel:symbol format).
39pub const DYDX_WS_TOPIC_DELIMITER: char = ':';
40
41/// Default WebSocket quota for dYdX subscriptions (2 messages per second).
42pub static DYDX_WS_SUBSCRIPTION_QUOTA: LazyLock<Quota> =
43    LazyLock::new(|| Quota::per_second(NonZeroU32::new(2).expect("non-zero")));
44
45use std::{
46    num::NonZeroU32,
47    sync::{
48        Arc, LazyLock,
49        atomic::{AtomicBool, AtomicU8, Ordering},
50    },
51};
52
53use arc_swap::ArcSwap;
54use dashmap::DashMap;
55use nautilus_common::live::get_runtime;
56use nautilus_model::{
57    identifiers::{AccountId, InstrumentId},
58    instruments::{Instrument, InstrumentAny},
59};
60use nautilus_network::{
61    mode::ConnectionMode,
62    ratelimiter::quota::Quota,
63    websocket::{
64        AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
65    },
66};
67use ustr::Ustr;
68
69use super::{
70    enums::NautilusWsMessage,
71    error::{DydxWsError, DydxWsResult},
72    handler::{FeedHandler, HandlerCommand},
73};
74use crate::common::credential::DydxCredential;
75
76/// WebSocket client for dYdX v4 market data and account streams.
77///
78/// # Authentication
79///
80/// dYdX v4 does not require traditional API key signatures for WebSocket connections.
81/// Public channels work without any credentials. Private channels (subaccounts) only
82/// need the wallet address included in the subscription message.
83///
84/// The [`DydxCredential`] stored in this client is used for:
85/// - Providing the wallet address for private channel subscriptions
86/// - Transaction signing (when placing orders via the validator node)
87///
88/// It is **NOT** used for WebSocket message signing or authentication.
89///
90/// # Architecture
91///
92/// This client follows a two-layer architecture:
93/// - **Outer client** (this struct): Orchestrates connection and maintains Python-accessible state
94/// - **Inner handler**: Owns WebSocketClient exclusively and processes messages in a dedicated task
95///
96/// Communication uses lock-free channels:
97/// - Commands flow from client → handler via `cmd_tx`
98/// - Parsed events flow from handler → client via `out_rx`
99#[derive(Debug)]
100#[cfg_attr(
101    feature = "python",
102    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx")
103)]
104pub struct DydxWebSocketClient {
105    /// The WebSocket connection URL.
106    url: String,
107    /// Optional credential for private channels (only wallet address is used).
108    credential: Option<Arc<DydxCredential>>,
109    /// Whether authentication is required for this client.
110    requires_auth: bool,
111    /// Authentication tracker for WebSocket connections.
112    auth_tracker: AuthTracker,
113    /// Subscription state tracker for managing channel subscriptions.
114    subscriptions: SubscriptionState,
115    /// Shared connection state (lock-free atomic).
116    connection_mode: Arc<ArcSwap<AtomicU8>>,
117    /// Manual disconnect signal.
118    signal: Arc<AtomicBool>,
119    /// Cached instruments for parsing market data (Python-accessible).
120    instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
121    /// Optional account ID for account message parsing.
122    account_id: Option<AccountId>,
123    /// Optional heartbeat interval in seconds.
124    heartbeat: Option<u64>,
125    /// Command channel sender to handler (wrapped in RwLock so updates are visible across clones).
126    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
127    /// Receiver for parsed Nautilus messages from handler.
128    out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
129    /// Background handler task handle.
130    handler_task: Option<tokio::task::JoinHandle<()>>,
131}
132
133impl Clone for DydxWebSocketClient {
134    fn clone(&self) -> Self {
135        Self {
136            url: self.url.clone(),
137            credential: self.credential.clone(),
138            requires_auth: self.requires_auth,
139            auth_tracker: self.auth_tracker.clone(),
140            subscriptions: self.subscriptions.clone(),
141            connection_mode: self.connection_mode.clone(),
142            signal: self.signal.clone(),
143            instruments_cache: self.instruments_cache.clone(),
144            account_id: self.account_id,
145            heartbeat: self.heartbeat,
146            cmd_tx: self.cmd_tx.clone(),
147            out_rx: None,       // Cannot clone receiver - only one owner allowed
148            handler_task: None, // Cannot clone task handle
149        }
150    }
151}
152
153impl DydxWebSocketClient {
154    /// Creates a new public WebSocket client for market data.
155    #[must_use]
156    pub fn new_public(url: String, _heartbeat: Option<u64>) -> Self {
157        use std::sync::atomic::AtomicU8;
158
159        // Create dummy command channel (will be replaced on connect)
160        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
161
162        Self {
163            url,
164            credential: None,
165            requires_auth: false,
166            auth_tracker: AuthTracker::new(),
167            subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
168            connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
169                ConnectionMode::Closed as u8,
170            ))),
171            signal: Arc::new(AtomicBool::new(false)),
172            instruments_cache: Arc::new(DashMap::new()),
173            account_id: None,
174            heartbeat: _heartbeat,
175            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
176            out_rx: None,
177            handler_task: None,
178        }
179    }
180
181    /// Creates a new private WebSocket client for account updates.
182    #[must_use]
183    pub fn new_private(
184        url: String,
185        credential: DydxCredential,
186        account_id: AccountId,
187        _heartbeat: Option<u64>,
188    ) -> Self {
189        use std::sync::atomic::AtomicU8;
190
191        // Create dummy command channel (will be replaced on connect)
192        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
193
194        Self {
195            url,
196            credential: Some(Arc::new(credential)),
197            requires_auth: true,
198            auth_tracker: AuthTracker::new(),
199            subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
200            connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
201                ConnectionMode::Closed as u8,
202            ))),
203            signal: Arc::new(AtomicBool::new(false)),
204            instruments_cache: Arc::new(DashMap::new()),
205            account_id: Some(account_id),
206            heartbeat: _heartbeat,
207            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
208            out_rx: None,
209            handler_task: None,
210        }
211    }
212
213    /// Returns the credential associated with this client, if any.
214    #[must_use]
215    pub fn credential(&self) -> Option<&Arc<DydxCredential>> {
216        self.credential.as_ref()
217    }
218
219    /// Returns `true` when the client is connected.
220    #[must_use]
221    pub fn is_connected(&self) -> bool {
222        let mode = self.connection_mode.load();
223        let mode_u8 = mode.load(Ordering::Relaxed);
224        matches!(
225            mode_u8,
226            x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
227        )
228    }
229
230    /// Returns the URL of this WebSocket client.
231    #[must_use]
232    pub fn url(&self) -> &str {
233        &self.url
234    }
235
236    /// Returns a clone of the connection mode atomic reference.
237    ///
238    /// This is primarily used for Python bindings that need to monitor connection state.
239    #[must_use]
240    pub fn connection_mode_atomic(&self) -> Arc<ArcSwap<AtomicU8>> {
241        self.connection_mode.clone()
242    }
243
244    /// Sets the account ID for account message parsing.
245    pub fn set_account_id(&mut self, account_id: AccountId) {
246        self.account_id = Some(account_id);
247    }
248
249    /// Returns the account ID if set.
250    #[must_use]
251    pub fn account_id(&self) -> Option<AccountId> {
252        self.account_id
253    }
254
255    /// Caches a single instrument.
256    ///
257    /// Any existing instrument with the same ID will be replaced.
258    pub fn cache_instrument(&self, instrument: InstrumentAny) {
259        let symbol = instrument.id().symbol.inner();
260        self.instruments_cache.insert(symbol, instrument.clone());
261
262        // Before connect() the handler isn't running; this send will fail and that's expected
263        // because connect() replays the instruments via InitializeInstruments
264        if let Ok(cmd_tx) = self.cmd_tx.try_read()
265            && let Err(e) = cmd_tx.send(HandlerCommand::UpdateInstrument(Box::new(instrument)))
266        {
267            tracing::debug!("Failed to send UpdateInstrument command to handler: {e}");
268        }
269    }
270
271    /// Caches multiple instruments.
272    ///
273    /// Any existing instruments with the same IDs will be replaced.
274    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
275        for instrument in &instruments {
276            self.instruments_cache
277                .insert(instrument.id().symbol.inner(), instrument.clone());
278        }
279
280        // Before connect() the handler isn't running; this send will fail and that's expected
281        // because connect() replays the instruments via InitializeInstruments
282        if !instruments.is_empty()
283            && let Ok(cmd_tx) = self.cmd_tx.try_read()
284            && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments))
285        {
286            tracing::debug!("Failed to send InitializeInstruments command to handler: {e}");
287        }
288    }
289
290    /// Returns a reference to the instruments cache.
291    #[must_use]
292    pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
293        &self.instruments_cache
294    }
295
296    /// Retrieves an instrument from the cache by symbol.
297    ///
298    /// Returns `None` if the instrument is not found.
299    #[must_use]
300    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
301        self.instruments_cache.get(symbol).map(|r| r.clone())
302    }
303
304    /// Takes ownership of the inbound typed message receiver.
305    /// Returns None if the receiver has already been taken or not connected.
306    pub fn take_receiver(
307        &mut self,
308    ) -> Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>> {
309        self.out_rx.take()
310    }
311
312    /// Connects the websocket client in handler mode with automatic reconnection.
313    ///
314    /// Spawns a background handler task that owns the WebSocketClient and processes
315    /// raw messages into typed [`NautilusWsMessage`] values.
316    ///
317    /// # Errors
318    ///
319    /// Returns an error if the connection cannot be established.
320    pub async fn connect(&mut self) -> DydxWsResult<()> {
321        if self.is_connected() {
322            return Ok(());
323        }
324
325        // Reset stop signal from any previous disconnect
326        self.signal.store(false, Ordering::Relaxed);
327
328        let (message_handler, raw_rx) = channel_message_handler();
329
330        let cfg = WebSocketConfig {
331            url: self.url.clone(),
332            headers: vec![],
333            heartbeat: self.heartbeat,
334            heartbeat_msg: None,
335            reconnect_timeout_ms: Some(15_000),
336            reconnect_delay_initial_ms: Some(250),
337            reconnect_delay_max_ms: Some(5_000),
338            reconnect_backoff_factor: Some(2.0),
339            reconnect_jitter_ms: Some(200),
340            reconnect_max_attempts: None,
341        };
342
343        let client = WebSocketClient::connect(
344            cfg,
345            Some(message_handler),
346            None,
347            None,
348            vec![],
349            Some(*DYDX_WS_SUBSCRIPTION_QUOTA),
350        )
351        .await
352        .map_err(|e| DydxWsError::Transport(e.to_string()))?;
353
354        // Update connection state atomically
355        self.connection_mode.store(client.connection_mode_atomic());
356
357        // Create fresh channels for this connection
358        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
359        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
360
361        // Update the shared cmd_tx so all clones see the new sender
362        {
363            let mut guard = self.cmd_tx.write().await;
364            *guard = cmd_tx;
365        }
366        self.out_rx = Some(out_rx);
367
368        // Replay cached instruments to the new handler
369        if !self.instruments_cache.is_empty() {
370            let cached_instruments: Vec<InstrumentAny> = self
371                .instruments_cache
372                .iter()
373                .map(|entry| entry.value().clone())
374                .collect();
375            let cmd_tx_guard = self.cmd_tx.read().await;
376            if let Err(e) =
377                cmd_tx_guard.send(HandlerCommand::InitializeInstruments(cached_instruments))
378            {
379                tracing::error!("Failed to replay instruments to handler: {e}");
380            }
381        }
382
383        // Spawn handler task
384        let account_id = self.account_id;
385        let signal = self.signal.clone();
386        let subscriptions = self.subscriptions.clone();
387
388        let handler_task = get_runtime().spawn(async move {
389            let mut handler = FeedHandler::new(
390                account_id,
391                cmd_rx,
392                out_tx,
393                raw_rx,
394                client,
395                signal,
396                subscriptions,
397            );
398            handler.run().await;
399        });
400
401        self.handler_task = Some(handler_task);
402        tracing::info!("Connected dYdX WebSocket: {}", self.url);
403        Ok(())
404    }
405
406    /// Disconnects the websocket client.
407    ///
408    /// # Errors
409    ///
410    /// Returns an error if the underlying client cannot be accessed.
411    pub async fn disconnect(&mut self) -> DydxWsResult<()> {
412        // Set stop signal
413        self.signal.store(true, Ordering::Relaxed);
414
415        // Reset connection mode to Closed so is_connected() returns false
416        // and subsequent connect() calls will create new channels
417        self.connection_mode
418            .store(Arc::new(AtomicU8::new(ConnectionMode::Closed as u8)));
419
420        // Abort handler task if it exists
421        if let Some(handle) = self.handler_task.take() {
422            handle.abort();
423        }
424
425        // Drop receiver to stop any consumers
426        self.out_rx = None;
427
428        tracing::info!("Disconnected dYdX WebSocket");
429        Ok(())
430    }
431
432    /// Sends a text message via the handler.
433    async fn send_text_inner(&self, text: &str) -> DydxWsResult<()> {
434        self.cmd_tx
435            .read()
436            .await
437            .send(HandlerCommand::SendText(text.to_string()))
438            .map_err(|e| {
439                DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
440            })?;
441        Ok(())
442    }
443
444    /// Sends a command to the handler.
445    ///
446    /// # Errors
447    ///
448    /// Returns an error if the handler task has terminated.
449    pub fn send_command(&self, cmd: HandlerCommand) -> DydxWsResult<()> {
450        if let Ok(guard) = self.cmd_tx.try_read() {
451            guard.send(cmd).map_err(|e| {
452                DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
453            })?;
454        } else {
455            return Err(DydxWsError::Transport(
456                "Failed to acquire lock on command channel".to_string(),
457            ));
458        }
459        Ok(())
460    }
461
462    fn ticker_from_instrument_id(instrument_id: &InstrumentId) -> String {
463        let mut s = instrument_id.symbol.as_str().to_string();
464        if let Some(stripped) = s.strip_suffix("-PERP") {
465            s = stripped.to_string();
466        }
467        s
468    }
469
470    fn topic(channel: super::enums::DydxWsChannel, id: Option<&str>) -> String {
471        match id {
472            Some(id) => format!("{}{}{}", channel.as_ref(), DYDX_WS_TOPIC_DELIMITER, id),
473            None => channel.as_ref().to_string(),
474        }
475    }
476
477    async fn send_and_track_subscribe(
478        &self,
479        sub: super::messages::DydxSubscription,
480        topic: &str,
481    ) -> DydxWsResult<()> {
482        self.subscriptions.mark_subscribe(topic);
483        let payload = serde_json::to_string(&sub)?;
484        if let Err(e) = self.send_text_inner(&payload).await {
485            self.subscriptions.mark_failure(topic);
486            self.subscriptions.remove_reference(topic);
487            return Err(e);
488        }
489        Ok(())
490    }
491
492    async fn send_and_track_unsubscribe(
493        &self,
494        sub: super::messages::DydxSubscription,
495        topic: &str,
496    ) -> DydxWsResult<()> {
497        self.subscriptions.mark_unsubscribe(topic);
498        let payload = serde_json::to_string(&sub)?;
499        if let Err(e) = self.send_text_inner(&payload).await {
500            // Restore reference so the subscription remains active if the unsubscribe fails.
501            self.subscriptions.add_reference(topic);
502            self.subscriptions.mark_subscribe(topic);
503            return Err(e);
504        }
505        Ok(())
506    }
507
508    /// Subscribes to public trade updates for a specific instrument.
509    ///
510    /// # Errors
511    ///
512    /// Returns an error if the subscription request fails.
513    ///
514    /// # References
515    ///
516    /// <https://docs.dydx.trade/developers/indexer/websockets#trades-channel>
517    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
518        let ticker = Self::ticker_from_instrument_id(&instrument_id);
519        let topic = Self::topic(super::enums::DydxWsChannel::Trades, Some(&ticker));
520        if !self.subscriptions.add_reference(&topic) {
521            return Ok(());
522        }
523
524        let sub = super::messages::DydxSubscription {
525            op: super::enums::DydxWsOperation::Subscribe,
526            channel: super::enums::DydxWsChannel::Trades,
527            id: Some(ticker),
528        };
529
530        self.send_and_track_subscribe(sub, &topic).await
531    }
532
533    /// Unsubscribes from public trade updates for a specific instrument.
534    ///
535    /// # Errors
536    ///
537    /// Returns an error if the unsubscription request fails.
538    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
539        let ticker = Self::ticker_from_instrument_id(&instrument_id);
540        let topic = Self::topic(super::enums::DydxWsChannel::Trades, Some(&ticker));
541        if !self.subscriptions.remove_reference(&topic) {
542            return Ok(());
543        }
544
545        let sub = super::messages::DydxSubscription {
546            op: super::enums::DydxWsOperation::Unsubscribe,
547            channel: super::enums::DydxWsChannel::Trades,
548            id: Some(ticker),
549        };
550
551        self.send_and_track_unsubscribe(sub, &topic).await
552    }
553
554    /// Subscribes to orderbook updates for a specific instrument.
555    ///
556    /// # Errors
557    ///
558    /// Returns an error if the subscription request fails.
559    ///
560    /// # References
561    ///
562    /// <https://docs.dydx.trade/developers/indexer/websockets#orderbook-channel>
563    pub async fn subscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
564        let ticker = Self::ticker_from_instrument_id(&instrument_id);
565        let topic = Self::topic(super::enums::DydxWsChannel::Orderbook, Some(&ticker));
566        if !self.subscriptions.add_reference(&topic) {
567            return Ok(());
568        }
569
570        let sub = super::messages::DydxSubscription {
571            op: super::enums::DydxWsOperation::Subscribe,
572            channel: super::enums::DydxWsChannel::Orderbook,
573            id: Some(ticker),
574        };
575
576        self.send_and_track_subscribe(sub, &topic).await
577    }
578
579    /// Unsubscribes from orderbook updates for a specific instrument.
580    ///
581    /// # Errors
582    ///
583    /// Returns an error if the unsubscription request fails.
584    pub async fn unsubscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
585        let ticker = Self::ticker_from_instrument_id(&instrument_id);
586        let topic = Self::topic(super::enums::DydxWsChannel::Orderbook, Some(&ticker));
587        if !self.subscriptions.remove_reference(&topic) {
588            return Ok(());
589        }
590
591        let sub = super::messages::DydxSubscription {
592            op: super::enums::DydxWsOperation::Unsubscribe,
593            channel: super::enums::DydxWsChannel::Orderbook,
594            id: Some(ticker),
595        };
596
597        self.send_and_track_unsubscribe(sub, &topic).await
598    }
599
600    /// Subscribes to candle/kline updates for a specific instrument.
601    ///
602    /// # Errors
603    ///
604    /// Returns an error if the subscription request fails.
605    ///
606    /// # References
607    ///
608    /// <https://docs.dydx.trade/developers/indexer/websockets#candles-channel>
609    pub async fn subscribe_candles(
610        &self,
611        instrument_id: InstrumentId,
612        resolution: &str,
613    ) -> DydxWsResult<()> {
614        let ticker = Self::ticker_from_instrument_id(&instrument_id);
615        let id = format!("{ticker}/{resolution}");
616        let topic = Self::topic(super::enums::DydxWsChannel::Candles, Some(&id));
617        if !self.subscriptions.add_reference(&topic) {
618            return Ok(());
619        }
620
621        let sub = super::messages::DydxSubscription {
622            op: super::enums::DydxWsOperation::Subscribe,
623            channel: super::enums::DydxWsChannel::Candles,
624            id: Some(id),
625        };
626
627        self.send_and_track_subscribe(sub, &topic).await
628    }
629
630    /// Unsubscribes from candle/kline updates for a specific instrument.
631    ///
632    /// # Errors
633    ///
634    /// Returns an error if the unsubscription request fails.
635    pub async fn unsubscribe_candles(
636        &self,
637        instrument_id: InstrumentId,
638        resolution: &str,
639    ) -> DydxWsResult<()> {
640        let ticker = Self::ticker_from_instrument_id(&instrument_id);
641        let id = format!("{ticker}/{resolution}");
642        let topic = Self::topic(super::enums::DydxWsChannel::Candles, Some(&id));
643        if !self.subscriptions.remove_reference(&topic) {
644            return Ok(());
645        }
646
647        let sub = super::messages::DydxSubscription {
648            op: super::enums::DydxWsOperation::Unsubscribe,
649            channel: super::enums::DydxWsChannel::Candles,
650            id: Some(id),
651        };
652
653        self.send_and_track_unsubscribe(sub, &topic).await
654    }
655
656    /// Subscribes to market updates for all instruments.
657    ///
658    /// # Errors
659    ///
660    /// Returns an error if the subscription request fails.
661    ///
662    /// # References
663    ///
664    /// <https://docs.dydx.trade/developers/indexer/websockets#markets-channel>
665    pub async fn subscribe_markets(&self) -> DydxWsResult<()> {
666        let topic = Self::topic(super::enums::DydxWsChannel::Markets, None);
667        if !self.subscriptions.add_reference(&topic) {
668            return Ok(());
669        }
670
671        let sub = super::messages::DydxSubscription {
672            op: super::enums::DydxWsOperation::Subscribe,
673            channel: super::enums::DydxWsChannel::Markets,
674            id: None,
675        };
676
677        self.send_and_track_subscribe(sub, &topic).await
678    }
679
680    /// Unsubscribes from market updates.
681    ///
682    /// # Errors
683    ///
684    /// Returns an error if the unsubscription request fails.
685    pub async fn unsubscribe_markets(&self) -> DydxWsResult<()> {
686        let topic = Self::topic(super::enums::DydxWsChannel::Markets, None);
687        if !self.subscriptions.remove_reference(&topic) {
688            return Ok(());
689        }
690
691        let sub = super::messages::DydxSubscription {
692            op: super::enums::DydxWsOperation::Unsubscribe,
693            channel: super::enums::DydxWsChannel::Markets,
694            id: None,
695        };
696
697        self.send_and_track_unsubscribe(sub, &topic).await
698    }
699
700    /// Subscribes to subaccount updates (orders, fills, positions, balances).
701    ///
702    /// This requires authentication and will only work for private WebSocket clients
703    /// created with [`Self::new_private`].
704    ///
705    /// # Errors
706    ///
707    /// Returns an error if the client was not created with credentials or if the
708    /// subscription request fails.
709    ///
710    /// # References
711    ///
712    /// <https://docs.dydx.trade/developers/indexer/websockets#subaccounts-channel>
713    pub async fn subscribe_subaccount(
714        &self,
715        address: &str,
716        subaccount_number: u32,
717    ) -> DydxWsResult<()> {
718        if !self.requires_auth {
719            return Err(DydxWsError::Authentication(
720                "Subaccount subscriptions require authentication. Use new_private() to create an authenticated client".to_string(),
721            ));
722        }
723        let id = format!("{address}/{subaccount_number}");
724        let topic = Self::topic(super::enums::DydxWsChannel::Subaccounts, Some(&id));
725        if !self.subscriptions.add_reference(&topic) {
726            return Ok(());
727        }
728
729        let sub = super::messages::DydxSubscription {
730            op: super::enums::DydxWsOperation::Subscribe,
731            channel: super::enums::DydxWsChannel::Subaccounts,
732            id: Some(id),
733        };
734
735        self.send_and_track_subscribe(sub, &topic).await
736    }
737
738    /// Unsubscribes from subaccount updates.
739    ///
740    /// # Errors
741    ///
742    /// Returns an error if the unsubscription request fails.
743    pub async fn unsubscribe_subaccount(
744        &self,
745        address: &str,
746        subaccount_number: u32,
747    ) -> DydxWsResult<()> {
748        let id = format!("{address}/{subaccount_number}");
749        let topic = Self::topic(super::enums::DydxWsChannel::Subaccounts, Some(&id));
750        if !self.subscriptions.remove_reference(&topic) {
751            return Ok(());
752        }
753
754        let sub = super::messages::DydxSubscription {
755            op: super::enums::DydxWsOperation::Unsubscribe,
756            channel: super::enums::DydxWsChannel::Subaccounts,
757            id: Some(id),
758        };
759
760        self.send_and_track_unsubscribe(sub, &topic).await
761    }
762
763    /// Subscribes to block height updates.
764    ///
765    /// # Errors
766    ///
767    /// Returns an error if the subscription request fails.
768    ///
769    /// # References
770    ///
771    /// <https://docs.dydx.trade/developers/indexer/websockets#block-height-channel>
772    pub async fn subscribe_block_height(&self) -> DydxWsResult<()> {
773        let topic = Self::topic(super::enums::DydxWsChannel::BlockHeight, None);
774        if !self.subscriptions.add_reference(&topic) {
775            return Ok(());
776        }
777
778        let sub = super::messages::DydxSubscription {
779            op: super::enums::DydxWsOperation::Subscribe,
780            channel: super::enums::DydxWsChannel::BlockHeight,
781            id: None,
782        };
783
784        self.send_and_track_subscribe(sub, &topic).await
785    }
786
787    /// Unsubscribes from block height updates.
788    ///
789    /// # Errors
790    ///
791    /// Returns an error if the unsubscription request fails.
792    pub async fn unsubscribe_block_height(&self) -> DydxWsResult<()> {
793        let topic = Self::topic(super::enums::DydxWsChannel::BlockHeight, None);
794        if !self.subscriptions.remove_reference(&topic) {
795            return Ok(());
796        }
797
798        let sub = super::messages::DydxSubscription {
799            op: super::enums::DydxWsOperation::Unsubscribe,
800            channel: super::enums::DydxWsChannel::BlockHeight,
801            id: None,
802        };
803
804        self.send_and_track_unsubscribe(sub, &topic).await
805    }
806}