nautilus_dydx/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket client for dYdX v4 API.
17//!
18//! This client provides streaming connectivity to dYdX's WebSocket API for both
19//! public market data and private account updates.
20//!
21//! # Authentication
22//!
23//! dYdX v4 uses Cosmos SDK wallet-based authentication. Unlike traditional exchanges:
24//! - **Public channels** require no authentication.
25//! - **Private channels** (subaccounts) only require the wallet address in the subscription message.
26//! - No signature or API key is needed for WebSocket connections themselves.
27//!
28//! # References
29//!
30//! <https://docs.dydx.trade/developers/indexer/websockets>
31
32/// Rate limit key for subscription operations (subscribe/unsubscribe).
33///
34/// dYdX allows up to 2 subscription messages per second per connection.
35/// See: <https://docs.dydx.trade/developers/indexer/websockets#rate-limits>
36pub const DYDX_RATE_LIMIT_KEY_SUBSCRIPTION: &str = "subscription";
37
38/// WebSocket topic delimiter for dYdX (channel:symbol format).
39pub const DYDX_WS_TOPIC_DELIMITER: char = ':';
40
41/// Default WebSocket quota for dYdX subscriptions (2 messages per second).
42pub static DYDX_WS_SUBSCRIPTION_QUOTA: LazyLock<Quota> =
43    LazyLock::new(|| Quota::per_second(NonZeroU32::new(2).expect("non-zero")));
44
45use std::{
46    num::NonZeroU32,
47    sync::{
48        Arc, LazyLock,
49        atomic::{AtomicBool, AtomicU8, Ordering},
50    },
51};
52
53use arc_swap::ArcSwap;
54use dashmap::DashMap;
55use nautilus_model::{
56    identifiers::{AccountId, InstrumentId},
57    instruments::{Instrument, InstrumentAny},
58};
59use nautilus_network::{
60    mode::ConnectionMode,
61    ratelimiter::quota::Quota,
62    websocket::{
63        AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
64    },
65};
66use ustr::Ustr;
67
68use super::{
69    enums::NautilusWsMessage,
70    error::{DydxWsError, DydxWsResult},
71    handler::{FeedHandler, HandlerCommand},
72};
73use crate::common::credential::DydxCredential;
74
75/// WebSocket client for dYdX v4 market data and account streams.
76///
77/// # Authentication
78///
79/// dYdX v4 does not require traditional API key signatures for WebSocket connections.
80/// Public channels work without any credentials. Private channels (subaccounts) only
81/// need the wallet address included in the subscription message.
82///
83/// The [`DydxCredential`] stored in this client is used for:
84/// - Providing the wallet address for private channel subscriptions
85/// - Transaction signing (when placing orders via the validator node)
86///
87/// It is **NOT** used for WebSocket message signing or authentication.
88///
89/// # Architecture
90///
91/// This client follows a two-layer architecture:
92/// - **Outer client** (this struct): Orchestrates connection and maintains Python-accessible state
93/// - **Inner handler**: Owns WebSocketClient exclusively and processes messages in a dedicated task
94///
95/// Communication uses lock-free channels:
96/// - Commands flow from client → handler via `cmd_tx`
97/// - Parsed events flow from handler → client via `out_rx`
98#[derive(Debug)]
99#[cfg_attr(
100    feature = "python",
101    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx")
102)]
103pub struct DydxWebSocketClient {
104    /// The WebSocket connection URL.
105    url: String,
106    /// Optional credential for private channels (only wallet address is used).
107    credential: Option<Arc<DydxCredential>>,
108    /// Whether authentication is required for this client.
109    requires_auth: bool,
110    /// Authentication tracker for WebSocket connections.
111    auth_tracker: AuthTracker,
112    /// Subscription state tracker for managing channel subscriptions.
113    subscriptions: SubscriptionState,
114    /// Shared connection state (lock-free atomic).
115    connection_mode: Arc<ArcSwap<AtomicU8>>,
116    /// Manual disconnect signal.
117    signal: Arc<AtomicBool>,
118    /// Cached instruments for parsing market data (Python-accessible).
119    instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
120    /// Optional account ID for account message parsing.
121    account_id: Option<AccountId>,
122    /// Optional heartbeat interval in seconds.
123    heartbeat: Option<u64>,
124    /// Command channel sender to handler (wrapped in RwLock so updates are visible across clones).
125    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
126    /// Receiver for parsed Nautilus messages from handler.
127    out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
128    /// Background handler task handle.
129    handler_task: Option<tokio::task::JoinHandle<()>>,
130}
131
132impl Clone for DydxWebSocketClient {
133    fn clone(&self) -> Self {
134        Self {
135            url: self.url.clone(),
136            credential: self.credential.clone(),
137            requires_auth: self.requires_auth,
138            auth_tracker: self.auth_tracker.clone(),
139            subscriptions: self.subscriptions.clone(),
140            connection_mode: self.connection_mode.clone(),
141            signal: self.signal.clone(),
142            instruments_cache: self.instruments_cache.clone(),
143            account_id: self.account_id,
144            heartbeat: self.heartbeat,
145            cmd_tx: self.cmd_tx.clone(),
146            out_rx: None,       // Cannot clone receiver - only one owner allowed
147            handler_task: None, // Cannot clone task handle
148        }
149    }
150}
151
152impl DydxWebSocketClient {
153    /// Creates a new public WebSocket client for market data.
154    #[must_use]
155    pub fn new_public(url: String, _heartbeat: Option<u64>) -> Self {
156        use std::sync::atomic::AtomicU8;
157
158        // Create dummy command channel (will be replaced on connect)
159        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
160
161        Self {
162            url,
163            credential: None,
164            requires_auth: false,
165            auth_tracker: AuthTracker::new(),
166            subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
167            connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
168                ConnectionMode::Closed as u8,
169            ))),
170            signal: Arc::new(AtomicBool::new(false)),
171            instruments_cache: Arc::new(DashMap::new()),
172            account_id: None,
173            heartbeat: _heartbeat,
174            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
175            out_rx: None,
176            handler_task: None,
177        }
178    }
179
180    /// Creates a new private WebSocket client for account updates.
181    #[must_use]
182    pub fn new_private(
183        url: String,
184        credential: DydxCredential,
185        account_id: AccountId,
186        _heartbeat: Option<u64>,
187    ) -> Self {
188        use std::sync::atomic::AtomicU8;
189
190        // Create dummy command channel (will be replaced on connect)
191        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
192
193        Self {
194            url,
195            credential: Some(Arc::new(credential)),
196            requires_auth: true,
197            auth_tracker: AuthTracker::new(),
198            subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
199            connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
200                ConnectionMode::Closed as u8,
201            ))),
202            signal: Arc::new(AtomicBool::new(false)),
203            instruments_cache: Arc::new(DashMap::new()),
204            account_id: Some(account_id),
205            heartbeat: _heartbeat,
206            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
207            out_rx: None,
208            handler_task: None,
209        }
210    }
211
212    /// Returns the credential associated with this client, if any.
213    #[must_use]
214    pub fn credential(&self) -> Option<&Arc<DydxCredential>> {
215        self.credential.as_ref()
216    }
217
218    /// Returns `true` when the client is connected.
219    #[must_use]
220    pub fn is_connected(&self) -> bool {
221        let mode = self.connection_mode.load();
222        let mode_u8 = mode.load(Ordering::Relaxed);
223        matches!(
224            mode_u8,
225            x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
226        )
227    }
228
229    /// Returns the URL of this WebSocket client.
230    #[must_use]
231    pub fn url(&self) -> &str {
232        &self.url
233    }
234
235    /// Returns a clone of the connection mode atomic reference.
236    ///
237    /// This is primarily used for Python bindings that need to monitor connection state.
238    #[must_use]
239    pub fn connection_mode_atomic(&self) -> Arc<ArcSwap<AtomicU8>> {
240        self.connection_mode.clone()
241    }
242
243    /// Sets the account ID for account message parsing.
244    pub fn set_account_id(&mut self, account_id: AccountId) {
245        self.account_id = Some(account_id);
246    }
247
248    /// Returns the account ID if set.
249    #[must_use]
250    pub fn account_id(&self) -> Option<AccountId> {
251        self.account_id
252    }
253
254    /// Caches a single instrument.
255    ///
256    /// Any existing instrument with the same ID will be replaced.
257    pub fn cache_instrument(&self, instrument: InstrumentAny) {
258        let symbol = instrument.id().symbol.inner();
259        self.instruments_cache.insert(symbol, instrument.clone());
260
261        // Before connect() the handler isn't running; this send will fail and that's expected
262        // because connect() replays the instruments via InitializeInstruments
263        if let Ok(cmd_tx) = self.cmd_tx.try_read()
264            && let Err(e) = cmd_tx.send(HandlerCommand::UpdateInstrument(Box::new(instrument)))
265        {
266            tracing::debug!("Failed to send UpdateInstrument command to handler: {e}");
267        }
268    }
269
270    /// Caches multiple instruments.
271    ///
272    /// Any existing instruments with the same IDs will be replaced.
273    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
274        for instrument in &instruments {
275            self.instruments_cache
276                .insert(instrument.id().symbol.inner(), instrument.clone());
277        }
278
279        // Before connect() the handler isn't running; this send will fail and that's expected
280        // because connect() replays the instruments via InitializeInstruments
281        if !instruments.is_empty()
282            && let Ok(cmd_tx) = self.cmd_tx.try_read()
283            && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments))
284        {
285            tracing::debug!("Failed to send InitializeInstruments command to handler: {e}");
286        }
287    }
288
289    /// Returns a reference to the instruments cache.
290    #[must_use]
291    pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
292        &self.instruments_cache
293    }
294
295    /// Retrieves an instrument from the cache by symbol.
296    ///
297    /// Returns `None` if the instrument is not found.
298    #[must_use]
299    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
300        self.instruments_cache.get(symbol).map(|r| r.clone())
301    }
302
303    /// Takes ownership of the inbound typed message receiver.
304    /// Returns None if the receiver has already been taken or not connected.
305    pub fn take_receiver(
306        &mut self,
307    ) -> Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>> {
308        self.out_rx.take()
309    }
310
311    /// Connects the websocket client in handler mode with automatic reconnection.
312    ///
313    /// Spawns a background handler task that owns the WebSocketClient and processes
314    /// raw messages into typed [`NautilusWsMessage`] values.
315    ///
316    /// # Errors
317    ///
318    /// Returns an error if the connection cannot be established.
319    pub async fn connect(&mut self) -> DydxWsResult<()> {
320        if self.is_connected() {
321            return Ok(());
322        }
323
324        // Reset stop signal from any previous disconnect
325        self.signal.store(false, Ordering::Relaxed);
326
327        let (message_handler, raw_rx) = channel_message_handler();
328
329        let cfg = WebSocketConfig {
330            url: self.url.clone(),
331            headers: vec![],
332            message_handler: Some(message_handler),
333            heartbeat: self.heartbeat,
334            heartbeat_msg: None,
335            ping_handler: None,
336            reconnect_timeout_ms: Some(15_000),
337            reconnect_delay_initial_ms: Some(250),
338            reconnect_delay_max_ms: Some(5_000),
339            reconnect_backoff_factor: Some(2.0),
340            reconnect_jitter_ms: Some(200),
341            reconnect_max_attempts: None,
342        };
343
344        let client = WebSocketClient::connect(cfg, None, vec![], Some(*DYDX_WS_SUBSCRIPTION_QUOTA))
345            .await
346            .map_err(|e| DydxWsError::Transport(e.to_string()))?;
347
348        // Update connection state atomically
349        self.connection_mode.store(client.connection_mode_atomic());
350
351        // Create fresh channels for this connection
352        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
353        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
354
355        // Update the shared cmd_tx so all clones see the new sender
356        {
357            let mut guard = self.cmd_tx.write().await;
358            *guard = cmd_tx;
359        }
360        self.out_rx = Some(out_rx);
361
362        // Replay cached instruments to the new handler
363        if !self.instruments_cache.is_empty() {
364            let cached_instruments: Vec<InstrumentAny> = self
365                .instruments_cache
366                .iter()
367                .map(|entry| entry.value().clone())
368                .collect();
369            let cmd_tx_guard = self.cmd_tx.read().await;
370            if let Err(e) =
371                cmd_tx_guard.send(HandlerCommand::InitializeInstruments(cached_instruments))
372            {
373                tracing::error!("Failed to replay instruments to handler: {e}");
374            }
375        }
376
377        // Spawn handler task
378        let account_id = self.account_id;
379        let signal = self.signal.clone();
380        let subscriptions = self.subscriptions.clone();
381
382        let handler_task = tokio::spawn(async move {
383            let mut handler = FeedHandler::new(
384                account_id,
385                cmd_rx,
386                out_tx,
387                raw_rx,
388                client,
389                signal,
390                subscriptions,
391            );
392            handler.run().await;
393        });
394
395        self.handler_task = Some(handler_task);
396        tracing::info!("Connected dYdX WebSocket: {}", self.url);
397        Ok(())
398    }
399
400    /// Disconnects the websocket client.
401    ///
402    /// # Errors
403    ///
404    /// Returns an error if the underlying client cannot be accessed.
405    pub async fn disconnect(&mut self) -> DydxWsResult<()> {
406        // Set stop signal
407        self.signal.store(true, Ordering::Relaxed);
408
409        // Reset connection mode to Closed so is_connected() returns false
410        // and subsequent connect() calls will create new channels
411        self.connection_mode
412            .store(Arc::new(AtomicU8::new(ConnectionMode::Closed as u8)));
413
414        // Abort handler task if it exists
415        if let Some(handle) = self.handler_task.take() {
416            handle.abort();
417        }
418
419        // Drop receiver to stop any consumers
420        self.out_rx = None;
421
422        tracing::info!("Disconnected dYdX WebSocket");
423        Ok(())
424    }
425
426    /// Sends a text message via the handler.
427    async fn send_text_inner(&self, text: &str) -> DydxWsResult<()> {
428        self.cmd_tx
429            .read()
430            .await
431            .send(HandlerCommand::SendText(text.to_string()))
432            .map_err(|e| {
433                DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
434            })?;
435        Ok(())
436    }
437
438    /// Sends a command to the handler.
439    ///
440    /// # Errors
441    ///
442    /// Returns an error if the handler task has terminated.
443    pub fn send_command(&self, cmd: HandlerCommand) -> DydxWsResult<()> {
444        if let Ok(guard) = self.cmd_tx.try_read() {
445            guard.send(cmd).map_err(|e| {
446                DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
447            })?;
448        } else {
449            return Err(DydxWsError::Transport(
450                "Failed to acquire lock on command channel".to_string(),
451            ));
452        }
453        Ok(())
454    }
455
456    fn ticker_from_instrument_id(instrument_id: &InstrumentId) -> String {
457        let mut s = instrument_id.symbol.as_str().to_string();
458        if let Some(stripped) = s.strip_suffix("-PERP") {
459            s = stripped.to_string();
460        }
461        s
462    }
463
464    fn topic(channel: super::enums::DydxWsChannel, id: Option<&str>) -> String {
465        match id {
466            Some(id) => format!("{}{}{}", channel.as_ref(), DYDX_WS_TOPIC_DELIMITER, id),
467            None => channel.as_ref().to_string(),
468        }
469    }
470
471    async fn send_and_track_subscribe(
472        &self,
473        sub: super::messages::DydxSubscription,
474        topic: &str,
475    ) -> DydxWsResult<()> {
476        self.subscriptions.mark_subscribe(topic);
477        let payload = serde_json::to_string(&sub)?;
478        if let Err(e) = self.send_text_inner(&payload).await {
479            self.subscriptions.mark_failure(topic);
480            self.subscriptions.remove_reference(topic);
481            return Err(e);
482        }
483        Ok(())
484    }
485
486    async fn send_and_track_unsubscribe(
487        &self,
488        sub: super::messages::DydxSubscription,
489        topic: &str,
490    ) -> DydxWsResult<()> {
491        self.subscriptions.mark_unsubscribe(topic);
492        let payload = serde_json::to_string(&sub)?;
493        if let Err(e) = self.send_text_inner(&payload).await {
494            // Restore reference so the subscription remains active if the unsubscribe fails.
495            self.subscriptions.add_reference(topic);
496            self.subscriptions.mark_subscribe(topic);
497            return Err(e);
498        }
499        Ok(())
500    }
501
502    /// Subscribes to public trade updates for a specific instrument.
503    ///
504    /// # Errors
505    ///
506    /// Returns an error if the subscription request fails.
507    ///
508    /// # References
509    ///
510    /// <https://docs.dydx.trade/developers/indexer/websockets#trades-channel>
511    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
512        let ticker = Self::ticker_from_instrument_id(&instrument_id);
513        let topic = Self::topic(super::enums::DydxWsChannel::Trades, Some(&ticker));
514        if !self.subscriptions.add_reference(&topic) {
515            return Ok(());
516        }
517
518        let sub = super::messages::DydxSubscription {
519            op: super::enums::DydxWsOperation::Subscribe,
520            channel: super::enums::DydxWsChannel::Trades,
521            id: Some(ticker),
522        };
523
524        self.send_and_track_subscribe(sub, &topic).await
525    }
526
527    /// Unsubscribes from public trade updates for a specific instrument.
528    ///
529    /// # Errors
530    ///
531    /// Returns an error if the unsubscription request fails.
532    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
533        let ticker = Self::ticker_from_instrument_id(&instrument_id);
534        let topic = Self::topic(super::enums::DydxWsChannel::Trades, Some(&ticker));
535        if !self.subscriptions.remove_reference(&topic) {
536            return Ok(());
537        }
538
539        let sub = super::messages::DydxSubscription {
540            op: super::enums::DydxWsOperation::Unsubscribe,
541            channel: super::enums::DydxWsChannel::Trades,
542            id: Some(ticker),
543        };
544
545        self.send_and_track_unsubscribe(sub, &topic).await
546    }
547
548    /// Subscribes to orderbook updates for a specific instrument.
549    ///
550    /// # Errors
551    ///
552    /// Returns an error if the subscription request fails.
553    ///
554    /// # References
555    ///
556    /// <https://docs.dydx.trade/developers/indexer/websockets#orderbook-channel>
557    pub async fn subscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
558        let ticker = Self::ticker_from_instrument_id(&instrument_id);
559        let topic = Self::topic(super::enums::DydxWsChannel::Orderbook, Some(&ticker));
560        if !self.subscriptions.add_reference(&topic) {
561            return Ok(());
562        }
563
564        let sub = super::messages::DydxSubscription {
565            op: super::enums::DydxWsOperation::Subscribe,
566            channel: super::enums::DydxWsChannel::Orderbook,
567            id: Some(ticker),
568        };
569
570        self.send_and_track_subscribe(sub, &topic).await
571    }
572
573    /// Unsubscribes from orderbook updates for a specific instrument.
574    ///
575    /// # Errors
576    ///
577    /// Returns an error if the unsubscription request fails.
578    pub async fn unsubscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
579        let ticker = Self::ticker_from_instrument_id(&instrument_id);
580        let topic = Self::topic(super::enums::DydxWsChannel::Orderbook, Some(&ticker));
581        if !self.subscriptions.remove_reference(&topic) {
582            return Ok(());
583        }
584
585        let sub = super::messages::DydxSubscription {
586            op: super::enums::DydxWsOperation::Unsubscribe,
587            channel: super::enums::DydxWsChannel::Orderbook,
588            id: Some(ticker),
589        };
590
591        self.send_and_track_unsubscribe(sub, &topic).await
592    }
593
594    /// Subscribes to candle/kline updates for a specific instrument.
595    ///
596    /// # Errors
597    ///
598    /// Returns an error if the subscription request fails.
599    ///
600    /// # References
601    ///
602    /// <https://docs.dydx.trade/developers/indexer/websockets#candles-channel>
603    pub async fn subscribe_candles(
604        &self,
605        instrument_id: InstrumentId,
606        resolution: &str,
607    ) -> DydxWsResult<()> {
608        let ticker = Self::ticker_from_instrument_id(&instrument_id);
609        let id = format!("{ticker}/{resolution}");
610        let topic = Self::topic(super::enums::DydxWsChannel::Candles, Some(&id));
611        if !self.subscriptions.add_reference(&topic) {
612            return Ok(());
613        }
614
615        let sub = super::messages::DydxSubscription {
616            op: super::enums::DydxWsOperation::Subscribe,
617            channel: super::enums::DydxWsChannel::Candles,
618            id: Some(id),
619        };
620
621        self.send_and_track_subscribe(sub, &topic).await
622    }
623
624    /// Unsubscribes from candle/kline updates for a specific instrument.
625    ///
626    /// # Errors
627    ///
628    /// Returns an error if the unsubscription request fails.
629    pub async fn unsubscribe_candles(
630        &self,
631        instrument_id: InstrumentId,
632        resolution: &str,
633    ) -> DydxWsResult<()> {
634        let ticker = Self::ticker_from_instrument_id(&instrument_id);
635        let id = format!("{ticker}/{resolution}");
636        let topic = Self::topic(super::enums::DydxWsChannel::Candles, Some(&id));
637        if !self.subscriptions.remove_reference(&topic) {
638            return Ok(());
639        }
640
641        let sub = super::messages::DydxSubscription {
642            op: super::enums::DydxWsOperation::Unsubscribe,
643            channel: super::enums::DydxWsChannel::Candles,
644            id: Some(id),
645        };
646
647        self.send_and_track_unsubscribe(sub, &topic).await
648    }
649
650    /// Subscribes to market updates for all instruments.
651    ///
652    /// # Errors
653    ///
654    /// Returns an error if the subscription request fails.
655    ///
656    /// # References
657    ///
658    /// <https://docs.dydx.trade/developers/indexer/websockets#markets-channel>
659    pub async fn subscribe_markets(&self) -> DydxWsResult<()> {
660        let topic = Self::topic(super::enums::DydxWsChannel::Markets, None);
661        if !self.subscriptions.add_reference(&topic) {
662            return Ok(());
663        }
664
665        let sub = super::messages::DydxSubscription {
666            op: super::enums::DydxWsOperation::Subscribe,
667            channel: super::enums::DydxWsChannel::Markets,
668            id: None,
669        };
670
671        self.send_and_track_subscribe(sub, &topic).await
672    }
673
674    /// Unsubscribes from market updates.
675    ///
676    /// # Errors
677    ///
678    /// Returns an error if the unsubscription request fails.
679    pub async fn unsubscribe_markets(&self) -> DydxWsResult<()> {
680        let topic = Self::topic(super::enums::DydxWsChannel::Markets, None);
681        if !self.subscriptions.remove_reference(&topic) {
682            return Ok(());
683        }
684
685        let sub = super::messages::DydxSubscription {
686            op: super::enums::DydxWsOperation::Unsubscribe,
687            channel: super::enums::DydxWsChannel::Markets,
688            id: None,
689        };
690
691        self.send_and_track_unsubscribe(sub, &topic).await
692    }
693
694    /// Subscribes to subaccount updates (orders, fills, positions, balances).
695    ///
696    /// This requires authentication and will only work for private WebSocket clients
697    /// created with [`Self::new_private`].
698    ///
699    /// # Errors
700    ///
701    /// Returns an error if the client was not created with credentials or if the
702    /// subscription request fails.
703    ///
704    /// # References
705    ///
706    /// <https://docs.dydx.trade/developers/indexer/websockets#subaccounts-channel>
707    pub async fn subscribe_subaccount(
708        &self,
709        address: &str,
710        subaccount_number: u32,
711    ) -> DydxWsResult<()> {
712        if !self.requires_auth {
713            return Err(DydxWsError::Authentication(
714                "Subaccount subscriptions require authentication. Use new_private() to create an authenticated client".to_string(),
715            ));
716        }
717        let id = format!("{address}/{subaccount_number}");
718        let topic = Self::topic(super::enums::DydxWsChannel::Subaccounts, Some(&id));
719        if !self.subscriptions.add_reference(&topic) {
720            return Ok(());
721        }
722
723        let sub = super::messages::DydxSubscription {
724            op: super::enums::DydxWsOperation::Subscribe,
725            channel: super::enums::DydxWsChannel::Subaccounts,
726            id: Some(id),
727        };
728
729        self.send_and_track_subscribe(sub, &topic).await
730    }
731
732    /// Unsubscribes from subaccount updates.
733    ///
734    /// # Errors
735    ///
736    /// Returns an error if the unsubscription request fails.
737    pub async fn unsubscribe_subaccount(
738        &self,
739        address: &str,
740        subaccount_number: u32,
741    ) -> DydxWsResult<()> {
742        let id = format!("{address}/{subaccount_number}");
743        let topic = Self::topic(super::enums::DydxWsChannel::Subaccounts, Some(&id));
744        if !self.subscriptions.remove_reference(&topic) {
745            return Ok(());
746        }
747
748        let sub = super::messages::DydxSubscription {
749            op: super::enums::DydxWsOperation::Unsubscribe,
750            channel: super::enums::DydxWsChannel::Subaccounts,
751            id: Some(id),
752        };
753
754        self.send_and_track_unsubscribe(sub, &topic).await
755    }
756
757    /// Subscribes to block height updates.
758    ///
759    /// # Errors
760    ///
761    /// Returns an error if the subscription request fails.
762    ///
763    /// # References
764    ///
765    /// <https://docs.dydx.trade/developers/indexer/websockets#block-height-channel>
766    pub async fn subscribe_block_height(&self) -> DydxWsResult<()> {
767        let topic = Self::topic(super::enums::DydxWsChannel::BlockHeight, None);
768        if !self.subscriptions.add_reference(&topic) {
769            return Ok(());
770        }
771
772        let sub = super::messages::DydxSubscription {
773            op: super::enums::DydxWsOperation::Subscribe,
774            channel: super::enums::DydxWsChannel::BlockHeight,
775            id: None,
776        };
777
778        self.send_and_track_subscribe(sub, &topic).await
779    }
780
781    /// Unsubscribes from block height updates.
782    ///
783    /// # Errors
784    ///
785    /// Returns an error if the unsubscription request fails.
786    pub async fn unsubscribe_block_height(&self) -> DydxWsResult<()> {
787        let topic = Self::topic(super::enums::DydxWsChannel::BlockHeight, None);
788        if !self.subscriptions.remove_reference(&topic) {
789            return Ok(());
790        }
791
792        let sub = super::messages::DydxSubscription {
793            op: super::enums::DydxWsOperation::Unsubscribe,
794            channel: super::enums::DydxWsChannel::BlockHeight,
795            id: None,
796        };
797
798        self.send_and_track_unsubscribe(sub, &topic).await
799    }
800}