nautilus_dydx/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket client for dYdX v4 API.
17//!
18//! This client provides streaming connectivity to dYdX's WebSocket API for both
19//! public market data and private account updates.
20//!
21//! # Authentication
22//!
23//! dYdX v4 uses Cosmos SDK wallet-based authentication. Unlike traditional exchanges:
24//! - **Public channels** require no authentication.
25//! - **Private channels** (subaccounts) only require the wallet address in the subscription message.
26//! - No signature or API key is needed for WebSocket connections themselves.
27//!
28//! # References
29//!
30//! <https://docs.dydx.trade/developers/indexer/websockets>
31
32/// Rate limit key for subscription operations (subscribe/unsubscribe).
33///
34/// dYdX allows up to 2 subscription messages per second per connection.
35/// See: <https://docs.dydx.trade/developers/indexer/websockets#rate-limits>
36pub const DYDX_RATE_LIMIT_KEY_SUBSCRIPTION: &str = "subscription";
37
38/// WebSocket topic delimiter for dYdX (channel:symbol format).
39pub const DYDX_WS_TOPIC_DELIMITER: char = ':';
40
41/// Default WebSocket quota for dYdX subscriptions (2 messages per second).
42pub static DYDX_WS_SUBSCRIPTION_QUOTA: LazyLock<Quota> =
43    LazyLock::new(|| Quota::per_second(NonZeroU32::new(2).expect("non-zero")));
44
45use std::{
46    num::NonZeroU32,
47    sync::{
48        Arc, LazyLock,
49        atomic::{AtomicBool, AtomicU8, Ordering},
50    },
51};
52
53use arc_swap::ArcSwap;
54use dashmap::DashMap;
55use nautilus_common::live::get_runtime;
56use nautilus_model::{
57    identifiers::{AccountId, InstrumentId},
58    instruments::{Instrument, InstrumentAny},
59};
60use nautilus_network::{
61    mode::ConnectionMode,
62    ratelimiter::quota::Quota,
63    websocket::{
64        AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
65    },
66};
67use ustr::Ustr;
68
69use super::{
70    enums::NautilusWsMessage,
71    error::{DydxWsError, DydxWsResult},
72    handler::{FeedHandler, HandlerCommand},
73};
74use crate::common::credential::DydxCredential;
75
76/// WebSocket client for dYdX v4 market data and account streams.
77///
78/// # Authentication
79///
80/// dYdX v4 does not require traditional API key signatures for WebSocket connections.
81/// Public channels work without any credentials. Private channels (subaccounts) only
82/// need the wallet address included in the subscription message.
83///
84/// The [`DydxCredential`] stored in this client is used for:
85/// - Providing the wallet address for private channel subscriptions
86/// - Transaction signing (when placing orders via the validator node)
87///
88/// It is **NOT** used for WebSocket message signing or authentication.
89///
90/// # Architecture
91///
92/// This client follows a two-layer architecture:
93/// - **Outer client** (this struct): Orchestrates connection and maintains Python-accessible state
94/// - **Inner handler**: Owns WebSocketClient exclusively and processes messages in a dedicated task
95///
96/// Communication uses lock-free channels:
97/// - Commands flow from client → handler via `cmd_tx`
98/// - Parsed events flow from handler → client via `out_rx`
99#[derive(Debug)]
100#[cfg_attr(
101    feature = "python",
102    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx")
103)]
104pub struct DydxWebSocketClient {
105    /// The WebSocket connection URL.
106    url: String,
107    /// Optional credential for private channels (only wallet address is used).
108    credential: Option<Arc<DydxCredential>>,
109    /// Whether authentication is required for this client.
110    requires_auth: bool,
111    /// Authentication tracker for WebSocket connections.
112    auth_tracker: AuthTracker,
113    /// Subscription state tracker for managing channel subscriptions.
114    subscriptions: SubscriptionState,
115    /// Shared connection state (lock-free atomic).
116    connection_mode: Arc<ArcSwap<AtomicU8>>,
117    /// Manual disconnect signal.
118    signal: Arc<AtomicBool>,
119    /// Cached instruments for parsing market data (Python-accessible).
120    instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
121    /// Optional account ID for account message parsing.
122    account_id: Option<AccountId>,
123    /// Optional heartbeat interval in seconds.
124    heartbeat: Option<u64>,
125    /// Command channel sender to handler (wrapped in RwLock so updates are visible across clones).
126    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
127    /// Receiver for parsed Nautilus messages from handler.
128    out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
129    /// Background handler task handle.
130    handler_task: Option<tokio::task::JoinHandle<()>>,
131}
132
133impl Clone for DydxWebSocketClient {
134    fn clone(&self) -> Self {
135        Self {
136            url: self.url.clone(),
137            credential: self.credential.clone(),
138            requires_auth: self.requires_auth,
139            auth_tracker: self.auth_tracker.clone(),
140            subscriptions: self.subscriptions.clone(),
141            connection_mode: self.connection_mode.clone(),
142            signal: self.signal.clone(),
143            instruments_cache: self.instruments_cache.clone(),
144            account_id: self.account_id,
145            heartbeat: self.heartbeat,
146            cmd_tx: self.cmd_tx.clone(),
147            out_rx: None,       // Cannot clone receiver - only one owner allowed
148            handler_task: None, // Cannot clone task handle
149        }
150    }
151}
152
153impl DydxWebSocketClient {
154    /// Creates a new public WebSocket client for market data.
155    #[must_use]
156    pub fn new_public(url: String, _heartbeat: Option<u64>) -> Self {
157        // Create dummy command channel (will be replaced on connect)
158        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
159
160        Self {
161            url,
162            credential: None,
163            requires_auth: false,
164            auth_tracker: AuthTracker::new(),
165            subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
166            connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
167                ConnectionMode::Closed as u8,
168            ))),
169            signal: Arc::new(AtomicBool::new(false)),
170            instruments_cache: Arc::new(DashMap::new()),
171            account_id: None,
172            heartbeat: _heartbeat,
173            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
174            out_rx: None,
175            handler_task: None,
176        }
177    }
178
179    /// Creates a new private WebSocket client for account updates.
180    #[must_use]
181    pub fn new_private(
182        url: String,
183        credential: DydxCredential,
184        account_id: AccountId,
185        _heartbeat: Option<u64>,
186    ) -> Self {
187        // Create dummy command channel (will be replaced on connect)
188        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
189
190        Self {
191            url,
192            credential: Some(Arc::new(credential)),
193            requires_auth: true,
194            auth_tracker: AuthTracker::new(),
195            subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
196            connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
197                ConnectionMode::Closed as u8,
198            ))),
199            signal: Arc::new(AtomicBool::new(false)),
200            instruments_cache: Arc::new(DashMap::new()),
201            account_id: Some(account_id),
202            heartbeat: _heartbeat,
203            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
204            out_rx: None,
205            handler_task: None,
206        }
207    }
208
209    /// Returns the credential associated with this client, if any.
210    #[must_use]
211    pub fn credential(&self) -> Option<&Arc<DydxCredential>> {
212        self.credential.as_ref()
213    }
214
215    /// Returns `true` when the client is connected.
216    #[must_use]
217    pub fn is_connected(&self) -> bool {
218        let mode = self.connection_mode.load();
219        let mode_u8 = mode.load(Ordering::Relaxed);
220        matches!(
221            mode_u8,
222            x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
223        )
224    }
225
226    /// Returns the URL of this WebSocket client.
227    #[must_use]
228    pub fn url(&self) -> &str {
229        &self.url
230    }
231
232    /// Returns a clone of the connection mode atomic reference.
233    ///
234    /// This is primarily used for Python bindings that need to monitor connection state.
235    #[must_use]
236    pub fn connection_mode_atomic(&self) -> Arc<ArcSwap<AtomicU8>> {
237        self.connection_mode.clone()
238    }
239
240    /// Sets the account ID for account message parsing.
241    pub fn set_account_id(&mut self, account_id: AccountId) {
242        self.account_id = Some(account_id);
243    }
244
245    /// Returns the account ID if set.
246    #[must_use]
247    pub fn account_id(&self) -> Option<AccountId> {
248        self.account_id
249    }
250
251    /// Caches a single instrument.
252    ///
253    /// Any existing instrument with the same ID will be replaced.
254    pub fn cache_instrument(&self, instrument: InstrumentAny) {
255        let symbol = instrument.id().symbol.inner();
256        self.instruments_cache.insert(symbol, instrument.clone());
257
258        // Before connect() the handler isn't running; this send will fail and that's expected
259        // because connect() replays the instruments via InitializeInstruments
260        if let Ok(cmd_tx) = self.cmd_tx.try_read()
261            && let Err(e) = cmd_tx.send(HandlerCommand::UpdateInstrument(Box::new(instrument)))
262        {
263            log::debug!("Failed to send UpdateInstrument command to handler: {e}");
264        }
265    }
266
267    /// Caches multiple instruments.
268    ///
269    /// Any existing instruments with the same IDs will be replaced.
270    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
271        for instrument in &instruments {
272            self.instruments_cache
273                .insert(instrument.id().symbol.inner(), instrument.clone());
274        }
275
276        // Before connect() the handler isn't running; this send will fail and that's expected
277        // because connect() replays the instruments via InitializeInstruments
278        if !instruments.is_empty()
279            && let Ok(cmd_tx) = self.cmd_tx.try_read()
280            && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments))
281        {
282            log::debug!("Failed to send InitializeInstruments command to handler: {e}");
283        }
284    }
285
286    /// Returns a reference to the instruments cache.
287    #[must_use]
288    pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
289        &self.instruments_cache
290    }
291
292    /// Retrieves an instrument from the cache by symbol.
293    ///
294    /// Returns `None` if the instrument is not found.
295    #[must_use]
296    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
297        self.instruments_cache.get(symbol).map(|r| r.clone())
298    }
299
300    /// Takes ownership of the inbound typed message receiver.
301    /// Returns None if the receiver has already been taken or not connected.
302    pub fn take_receiver(
303        &mut self,
304    ) -> Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>> {
305        self.out_rx.take()
306    }
307
308    /// Connects the websocket client in handler mode with automatic reconnection.
309    ///
310    /// Spawns a background handler task that owns the WebSocketClient and processes
311    /// raw messages into typed [`NautilusWsMessage`] values.
312    ///
313    /// # Errors
314    ///
315    /// Returns an error if the connection cannot be established.
316    pub async fn connect(&mut self) -> DydxWsResult<()> {
317        if self.is_connected() {
318            return Ok(());
319        }
320
321        // Reset stop signal from any previous disconnect
322        self.signal.store(false, Ordering::Relaxed);
323
324        let (message_handler, raw_rx) = channel_message_handler();
325
326        let cfg = WebSocketConfig {
327            url: self.url.clone(),
328            headers: vec![],
329            heartbeat: self.heartbeat,
330            heartbeat_msg: None,
331            reconnect_timeout_ms: Some(15_000),
332            reconnect_delay_initial_ms: Some(250),
333            reconnect_delay_max_ms: Some(5_000),
334            reconnect_backoff_factor: Some(2.0),
335            reconnect_jitter_ms: Some(200),
336            reconnect_max_attempts: None,
337        };
338
339        let client = WebSocketClient::connect(
340            cfg,
341            Some(message_handler),
342            None,
343            None,
344            vec![],
345            Some(*DYDX_WS_SUBSCRIPTION_QUOTA),
346        )
347        .await
348        .map_err(|e| DydxWsError::Transport(e.to_string()))?;
349
350        // Update connection state atomically
351        self.connection_mode.store(client.connection_mode_atomic());
352
353        // Create fresh channels for this connection
354        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
355        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
356
357        // Update the shared cmd_tx so all clones see the new sender
358        {
359            let mut guard = self.cmd_tx.write().await;
360            *guard = cmd_tx;
361        }
362        self.out_rx = Some(out_rx);
363
364        // Replay cached instruments to the new handler
365        if !self.instruments_cache.is_empty() {
366            let cached_instruments: Vec<InstrumentAny> = self
367                .instruments_cache
368                .iter()
369                .map(|entry| entry.value().clone())
370                .collect();
371            let cmd_tx_guard = self.cmd_tx.read().await;
372            if let Err(e) =
373                cmd_tx_guard.send(HandlerCommand::InitializeInstruments(cached_instruments))
374            {
375                log::error!("Failed to replay instruments to handler: {e}");
376            }
377        }
378
379        // Spawn handler task
380        let account_id = self.account_id;
381        let signal = self.signal.clone();
382        let subscriptions = self.subscriptions.clone();
383
384        let handler_task = get_runtime().spawn(async move {
385            let mut handler = FeedHandler::new(
386                account_id,
387                cmd_rx,
388                out_tx,
389                raw_rx,
390                client,
391                signal,
392                subscriptions,
393            );
394            handler.run().await;
395        });
396
397        self.handler_task = Some(handler_task);
398        log::info!("Connected dYdX WebSocket: {}", self.url);
399        Ok(())
400    }
401
402    /// Disconnects the websocket client.
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if the underlying client cannot be accessed.
407    pub async fn disconnect(&mut self) -> DydxWsResult<()> {
408        // Set stop signal
409        self.signal.store(true, Ordering::Relaxed);
410
411        // Reset connection mode to Closed so is_connected() returns false
412        // and subsequent connect() calls will create new channels
413        self.connection_mode
414            .store(Arc::new(AtomicU8::new(ConnectionMode::Closed as u8)));
415
416        // Abort handler task if it exists
417        if let Some(handle) = self.handler_task.take() {
418            handle.abort();
419        }
420
421        // Drop receiver to stop any consumers
422        self.out_rx = None;
423
424        log::info!("Disconnected dYdX WebSocket");
425        Ok(())
426    }
427
428    /// Sends a text message via the handler.
429    async fn send_text_inner(&self, text: &str) -> DydxWsResult<()> {
430        self.cmd_tx
431            .read()
432            .await
433            .send(HandlerCommand::SendText(text.to_string()))
434            .map_err(|e| {
435                DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
436            })?;
437        Ok(())
438    }
439
440    /// Sends a command to the handler.
441    ///
442    /// # Errors
443    ///
444    /// Returns an error if the handler task has terminated.
445    pub fn send_command(&self, cmd: HandlerCommand) -> DydxWsResult<()> {
446        if let Ok(guard) = self.cmd_tx.try_read() {
447            guard.send(cmd).map_err(|e| {
448                DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
449            })?;
450        } else {
451            return Err(DydxWsError::Transport(
452                "Failed to acquire lock on command channel".to_string(),
453            ));
454        }
455        Ok(())
456    }
457
458    fn ticker_from_instrument_id(instrument_id: &InstrumentId) -> String {
459        let mut s = instrument_id.symbol.as_str().to_string();
460        if let Some(stripped) = s.strip_suffix("-PERP") {
461            s = stripped.to_string();
462        }
463        s
464    }
465
466    fn topic(channel: super::enums::DydxWsChannel, id: Option<&str>) -> String {
467        match id {
468            Some(id) => format!("{}{}{}", channel.as_ref(), DYDX_WS_TOPIC_DELIMITER, id),
469            None => channel.as_ref().to_string(),
470        }
471    }
472
473    async fn send_and_track_subscribe(
474        &self,
475        sub: super::messages::DydxSubscription,
476        topic: &str,
477    ) -> DydxWsResult<()> {
478        self.subscriptions.mark_subscribe(topic);
479        let payload = serde_json::to_string(&sub)?;
480        if let Err(e) = self.send_text_inner(&payload).await {
481            self.subscriptions.mark_failure(topic);
482            self.subscriptions.remove_reference(topic);
483            return Err(e);
484        }
485        Ok(())
486    }
487
488    async fn send_and_track_unsubscribe(
489        &self,
490        sub: super::messages::DydxSubscription,
491        topic: &str,
492    ) -> DydxWsResult<()> {
493        self.subscriptions.mark_unsubscribe(topic);
494        let payload = serde_json::to_string(&sub)?;
495        if let Err(e) = self.send_text_inner(&payload).await {
496            // Restore reference so the subscription remains active if the unsubscribe fails.
497            self.subscriptions.add_reference(topic);
498            self.subscriptions.mark_subscribe(topic);
499            return Err(e);
500        }
501        Ok(())
502    }
503
504    /// Subscribes to public trade updates for a specific instrument.
505    ///
506    /// # Errors
507    ///
508    /// Returns an error if the subscription request fails.
509    ///
510    /// # References
511    ///
512    /// <https://docs.dydx.trade/developers/indexer/websockets#trades-channel>
513    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
514        let ticker = Self::ticker_from_instrument_id(&instrument_id);
515        let topic = Self::topic(super::enums::DydxWsChannel::Trades, Some(&ticker));
516        if !self.subscriptions.add_reference(&topic) {
517            return Ok(());
518        }
519
520        let sub = super::messages::DydxSubscription {
521            op: super::enums::DydxWsOperation::Subscribe,
522            channel: super::enums::DydxWsChannel::Trades,
523            id: Some(ticker),
524        };
525
526        self.send_and_track_subscribe(sub, &topic).await
527    }
528
529    /// Unsubscribes from public trade updates for a specific instrument.
530    ///
531    /// # Errors
532    ///
533    /// Returns an error if the unsubscription request fails.
534    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
535        let ticker = Self::ticker_from_instrument_id(&instrument_id);
536        let topic = Self::topic(super::enums::DydxWsChannel::Trades, Some(&ticker));
537        if !self.subscriptions.remove_reference(&topic) {
538            return Ok(());
539        }
540
541        let sub = super::messages::DydxSubscription {
542            op: super::enums::DydxWsOperation::Unsubscribe,
543            channel: super::enums::DydxWsChannel::Trades,
544            id: Some(ticker),
545        };
546
547        self.send_and_track_unsubscribe(sub, &topic).await
548    }
549
550    /// Subscribes to orderbook updates for a specific instrument.
551    ///
552    /// # Errors
553    ///
554    /// Returns an error if the subscription request fails.
555    ///
556    /// # References
557    ///
558    /// <https://docs.dydx.trade/developers/indexer/websockets#orderbook-channel>
559    pub async fn subscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
560        let ticker = Self::ticker_from_instrument_id(&instrument_id);
561        let topic = Self::topic(super::enums::DydxWsChannel::Orderbook, Some(&ticker));
562        if !self.subscriptions.add_reference(&topic) {
563            return Ok(());
564        }
565
566        let sub = super::messages::DydxSubscription {
567            op: super::enums::DydxWsOperation::Subscribe,
568            channel: super::enums::DydxWsChannel::Orderbook,
569            id: Some(ticker),
570        };
571
572        self.send_and_track_subscribe(sub, &topic).await
573    }
574
575    /// Unsubscribes from orderbook updates for a specific instrument.
576    ///
577    /// # Errors
578    ///
579    /// Returns an error if the unsubscription request fails.
580    pub async fn unsubscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
581        let ticker = Self::ticker_from_instrument_id(&instrument_id);
582        let topic = Self::topic(super::enums::DydxWsChannel::Orderbook, Some(&ticker));
583        if !self.subscriptions.remove_reference(&topic) {
584            return Ok(());
585        }
586
587        let sub = super::messages::DydxSubscription {
588            op: super::enums::DydxWsOperation::Unsubscribe,
589            channel: super::enums::DydxWsChannel::Orderbook,
590            id: Some(ticker),
591        };
592
593        self.send_and_track_unsubscribe(sub, &topic).await
594    }
595
596    /// Subscribes to candle/kline updates for a specific instrument.
597    ///
598    /// # Errors
599    ///
600    /// Returns an error if the subscription request fails.
601    ///
602    /// # References
603    ///
604    /// <https://docs.dydx.trade/developers/indexer/websockets#candles-channel>
605    pub async fn subscribe_candles(
606        &self,
607        instrument_id: InstrumentId,
608        resolution: &str,
609    ) -> DydxWsResult<()> {
610        let ticker = Self::ticker_from_instrument_id(&instrument_id);
611        let id = format!("{ticker}/{resolution}");
612        let topic = Self::topic(super::enums::DydxWsChannel::Candles, Some(&id));
613        if !self.subscriptions.add_reference(&topic) {
614            return Ok(());
615        }
616
617        let sub = super::messages::DydxSubscription {
618            op: super::enums::DydxWsOperation::Subscribe,
619            channel: super::enums::DydxWsChannel::Candles,
620            id: Some(id),
621        };
622
623        self.send_and_track_subscribe(sub, &topic).await
624    }
625
626    /// Unsubscribes from candle/kline updates for a specific instrument.
627    ///
628    /// # Errors
629    ///
630    /// Returns an error if the unsubscription request fails.
631    pub async fn unsubscribe_candles(
632        &self,
633        instrument_id: InstrumentId,
634        resolution: &str,
635    ) -> DydxWsResult<()> {
636        let ticker = Self::ticker_from_instrument_id(&instrument_id);
637        let id = format!("{ticker}/{resolution}");
638        let topic = Self::topic(super::enums::DydxWsChannel::Candles, Some(&id));
639        if !self.subscriptions.remove_reference(&topic) {
640            return Ok(());
641        }
642
643        let sub = super::messages::DydxSubscription {
644            op: super::enums::DydxWsOperation::Unsubscribe,
645            channel: super::enums::DydxWsChannel::Candles,
646            id: Some(id),
647        };
648
649        self.send_and_track_unsubscribe(sub, &topic).await
650    }
651
652    /// Subscribes to market updates for all instruments.
653    ///
654    /// # Errors
655    ///
656    /// Returns an error if the subscription request fails.
657    ///
658    /// # References
659    ///
660    /// <https://docs.dydx.trade/developers/indexer/websockets#markets-channel>
661    pub async fn subscribe_markets(&self) -> DydxWsResult<()> {
662        let topic = Self::topic(super::enums::DydxWsChannel::Markets, None);
663        if !self.subscriptions.add_reference(&topic) {
664            return Ok(());
665        }
666
667        let sub = super::messages::DydxSubscription {
668            op: super::enums::DydxWsOperation::Subscribe,
669            channel: super::enums::DydxWsChannel::Markets,
670            id: None,
671        };
672
673        self.send_and_track_subscribe(sub, &topic).await
674    }
675
676    /// Unsubscribes from market updates.
677    ///
678    /// # Errors
679    ///
680    /// Returns an error if the unsubscription request fails.
681    pub async fn unsubscribe_markets(&self) -> DydxWsResult<()> {
682        let topic = Self::topic(super::enums::DydxWsChannel::Markets, None);
683        if !self.subscriptions.remove_reference(&topic) {
684            return Ok(());
685        }
686
687        let sub = super::messages::DydxSubscription {
688            op: super::enums::DydxWsOperation::Unsubscribe,
689            channel: super::enums::DydxWsChannel::Markets,
690            id: None,
691        };
692
693        self.send_and_track_unsubscribe(sub, &topic).await
694    }
695
696    /// Subscribes to subaccount updates (orders, fills, positions, balances).
697    ///
698    /// This requires authentication and will only work for private WebSocket clients
699    /// created with [`Self::new_private`].
700    ///
701    /// # Errors
702    ///
703    /// Returns an error if the client was not created with credentials or if the
704    /// subscription request fails.
705    ///
706    /// # References
707    ///
708    /// <https://docs.dydx.trade/developers/indexer/websockets#subaccounts-channel>
709    pub async fn subscribe_subaccount(
710        &self,
711        address: &str,
712        subaccount_number: u32,
713    ) -> DydxWsResult<()> {
714        if !self.requires_auth {
715            return Err(DydxWsError::Authentication(
716                "Subaccount subscriptions require authentication. Use new_private() to create an authenticated client".to_string(),
717            ));
718        }
719        let id = format!("{address}/{subaccount_number}");
720        let topic = Self::topic(super::enums::DydxWsChannel::Subaccounts, Some(&id));
721        if !self.subscriptions.add_reference(&topic) {
722            return Ok(());
723        }
724
725        let sub = super::messages::DydxSubscription {
726            op: super::enums::DydxWsOperation::Subscribe,
727            channel: super::enums::DydxWsChannel::Subaccounts,
728            id: Some(id),
729        };
730
731        self.send_and_track_subscribe(sub, &topic).await
732    }
733
734    /// Unsubscribes from subaccount updates.
735    ///
736    /// # Errors
737    ///
738    /// Returns an error if the unsubscription request fails.
739    pub async fn unsubscribe_subaccount(
740        &self,
741        address: &str,
742        subaccount_number: u32,
743    ) -> DydxWsResult<()> {
744        let id = format!("{address}/{subaccount_number}");
745        let topic = Self::topic(super::enums::DydxWsChannel::Subaccounts, Some(&id));
746        if !self.subscriptions.remove_reference(&topic) {
747            return Ok(());
748        }
749
750        let sub = super::messages::DydxSubscription {
751            op: super::enums::DydxWsOperation::Unsubscribe,
752            channel: super::enums::DydxWsChannel::Subaccounts,
753            id: Some(id),
754        };
755
756        self.send_and_track_unsubscribe(sub, &topic).await
757    }
758
759    /// Subscribes to block height updates.
760    ///
761    /// # Errors
762    ///
763    /// Returns an error if the subscription request fails.
764    ///
765    /// # References
766    ///
767    /// <https://docs.dydx.trade/developers/indexer/websockets#block-height-channel>
768    pub async fn subscribe_block_height(&self) -> DydxWsResult<()> {
769        let topic = Self::topic(super::enums::DydxWsChannel::BlockHeight, None);
770        if !self.subscriptions.add_reference(&topic) {
771            return Ok(());
772        }
773
774        let sub = super::messages::DydxSubscription {
775            op: super::enums::DydxWsOperation::Subscribe,
776            channel: super::enums::DydxWsChannel::BlockHeight,
777            id: None,
778        };
779
780        self.send_and_track_subscribe(sub, &topic).await
781    }
782
783    /// Unsubscribes from block height updates.
784    ///
785    /// # Errors
786    ///
787    /// Returns an error if the unsubscription request fails.
788    pub async fn unsubscribe_block_height(&self) -> DydxWsResult<()> {
789        let topic = Self::topic(super::enums::DydxWsChannel::BlockHeight, None);
790        if !self.subscriptions.remove_reference(&topic) {
791            return Ok(());
792        }
793
794        let sub = super::messages::DydxSubscription {
795            op: super::enums::DydxWsOperation::Unsubscribe,
796            channel: super::enums::DydxWsChannel::BlockHeight,
797            id: None,
798        };
799
800        self.send_and_track_unsubscribe(sub, &topic).await
801    }
802}