Skip to main content

nautilus_dydx/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket client for dYdX v4 API.
17//!
18//! This client provides streaming connectivity to dYdX's WebSocket API for both
19//! public market data and private account updates.
20//!
21//! # Authentication
22//!
23//! dYdX v4 uses Cosmos SDK wallet-based authentication. Unlike traditional exchanges:
24//! - **Public channels** require no authentication.
25//! - **Private channels** (subaccounts) only require the wallet address in the subscription message.
26//! - No signature or API key is needed for WebSocket connections themselves.
27//!
28//! # References
29//!
30//! <https://docs.dydx.trade/developers/indexer/websockets>
31
32/// Pre-interned rate limit key for subscription operations (subscribe/unsubscribe).
33///
34/// dYdX allows up to 2 subscription messages per second per connection.
35/// See: <https://docs.dydx.trade/developers/indexer/websockets#rate-limits>
36pub static DYDX_RATE_LIMIT_KEY_SUBSCRIPTION: LazyLock<[Ustr; 1]> =
37    LazyLock::new(|| [Ustr::from("subscription")]);
38
39/// WebSocket topic delimiter for dYdX (channel:symbol format).
40pub const DYDX_WS_TOPIC_DELIMITER: char = ':';
41
42/// Default WebSocket quota for dYdX subscriptions (2 messages per second).
43pub static DYDX_WS_SUBSCRIPTION_QUOTA: LazyLock<Quota> =
44    LazyLock::new(|| Quota::per_second(NonZeroU32::new(2).expect("non-zero")));
45
46use std::{
47    num::NonZeroU32,
48    sync::{
49        Arc, LazyLock,
50        atomic::{AtomicBool, AtomicU8, Ordering},
51    },
52};
53
54use arc_swap::ArcSwap;
55use nautilus_common::live::get_runtime;
56use nautilus_model::{
57    identifiers::{AccountId, InstrumentId},
58    instruments::InstrumentAny,
59};
60use nautilus_network::{
61    mode::ConnectionMode,
62    ratelimiter::quota::Quota,
63    websocket::{
64        AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
65    },
66};
67use ustr::Ustr;
68
69use super::{
70    enums::{DydxWsChannel, DydxWsOperation, NautilusWsMessage},
71    error::{DydxWsError, DydxWsResult},
72    handler::{FeedHandler, HandlerCommand},
73    messages::DydxSubscription,
74};
75use crate::{
76    common::{credential::DydxCredential, instrument_cache::InstrumentCache},
77    execution::encoder::ClientOrderIdEncoder,
78};
79
80/// WebSocket client for dYdX v4 market data and account streams.
81///
82/// # Authentication
83///
84/// dYdX v4 does not require traditional API key signatures for WebSocket connections.
85/// Public channels work without any credentials. Private channels (subaccounts) only
86/// need the wallet address included in the subscription message.
87///
88/// The [`DydxCredential`] stored in this client is used for:
89/// - Providing the wallet address for private channel subscriptions
90/// - Transaction signing (when placing orders via the validator node)
91///
92/// It is **NOT** used for WebSocket message signing or authentication.
93///
94/// # Architecture
95///
96/// This client follows a two-layer architecture:
97/// - **Outer client** (this struct): Orchestrates connection and maintains Python-accessible state
98/// - **Inner handler**: Owns WebSocketClient exclusively and processes messages in a dedicated task
99///
100/// Communication uses lock-free channels:
101/// - Commands flow from client → handler via `cmd_tx`
102/// - Parsed events flow from handler → client via `out_rx`
103#[derive(Debug)]
104#[cfg_attr(
105    feature = "python",
106    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx", from_py_object)
107)]
108pub struct DydxWebSocketClient {
109    /// The WebSocket connection URL.
110    url: String,
111    /// Optional credential for private channels (only wallet address is used).
112    credential: Option<Arc<DydxCredential>>,
113    /// Whether authentication is required for this client.
114    requires_auth: bool,
115    /// Authentication tracker for WebSocket connections.
116    auth_tracker: AuthTracker,
117    /// Subscription state tracker for managing channel subscriptions.
118    subscriptions: SubscriptionState,
119    /// Shared connection state (lock-free atomic).
120    connection_mode: Arc<ArcSwap<AtomicU8>>,
121    /// Manual disconnect signal.
122    signal: Arc<AtomicBool>,
123    /// Shared instrument cache for parsing market data.
124    ///
125    /// When constructed via `new_*_with_cache()`, this is shared with HTTP/execution clients.
126    /// When constructed via `new_public()` or `new_private()`, a new cache is created.
127    instrument_cache: Arc<InstrumentCache>,
128    /// Optional account ID for account message parsing.
129    account_id: Option<AccountId>,
130    /// Optional heartbeat interval in seconds.
131    heartbeat: Option<u64>,
132    /// Command channel sender to handler (wrapped in RwLock so updates are visible across clones).
133    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
134    /// Receiver for parsed Nautilus messages from handler.
135    out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
136    /// Background handler task handle.
137    handler_task: Option<tokio::task::JoinHandle<()>>,
138    /// Whether to timestamp bars at close time (open + interval).
139    bars_timestamp_on_close: bool,
140    /// Shared client order ID encoder for bidirectional mapping.
141    encoder: Arc<ClientOrderIdEncoder>,
142}
143
144impl Clone for DydxWebSocketClient {
145    fn clone(&self) -> Self {
146        Self {
147            url: self.url.clone(),
148            credential: self.credential.clone(),
149            requires_auth: self.requires_auth,
150            auth_tracker: self.auth_tracker.clone(),
151            subscriptions: self.subscriptions.clone(),
152            connection_mode: self.connection_mode.clone(),
153            signal: self.signal.clone(),
154            instrument_cache: self.instrument_cache.clone(),
155            account_id: self.account_id,
156            heartbeat: self.heartbeat,
157            cmd_tx: self.cmd_tx.clone(),
158            out_rx: None,       // Cannot clone receiver - only one owner allowed
159            handler_task: None, // Cannot clone task handle
160            bars_timestamp_on_close: self.bars_timestamp_on_close,
161            encoder: self.encoder.clone(),
162        }
163    }
164}
165
166impl DydxWebSocketClient {
167    /// Creates a new public WebSocket client for market data.
168    ///
169    /// This creates a new independent instrument cache. To share a cache with
170    /// the HTTP client, use [`Self::new_public_with_cache`] instead.
171    #[must_use]
172    pub fn new_public(url: String, heartbeat: Option<u64>) -> Self {
173        Self::new_public_with_cache(url, Arc::new(InstrumentCache::new()), heartbeat)
174    }
175
176    /// Creates a new public WebSocket client with a shared instrument cache.
177    ///
178    /// Use this when you want to share instrument data with the HTTP client.
179    #[must_use]
180    pub fn new_public_with_cache(
181        url: String,
182        instrument_cache: Arc<InstrumentCache>,
183        heartbeat: Option<u64>,
184    ) -> Self {
185        // Create dummy command channel (will be replaced on connect)
186        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
187
188        Self {
189            url,
190            credential: None,
191            requires_auth: false,
192            auth_tracker: AuthTracker::new(),
193            subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
194            connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
195                ConnectionMode::Closed as u8,
196            ))),
197            signal: Arc::new(AtomicBool::new(false)),
198            instrument_cache,
199            account_id: None,
200            heartbeat,
201            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
202            out_rx: None,
203            handler_task: None,
204            bars_timestamp_on_close: true,
205            encoder: Arc::new(ClientOrderIdEncoder::new()),
206        }
207    }
208
209    /// Creates a new private WebSocket client for account updates.
210    ///
211    /// This creates a new independent instrument cache. To share a cache with
212    /// the HTTP client, use [`Self::new_private_with_cache`] instead.
213    #[must_use]
214    pub fn new_private(
215        url: String,
216        credential: DydxCredential,
217        account_id: AccountId,
218        heartbeat: Option<u64>,
219    ) -> Self {
220        Self::new_private_with_cache(
221            url,
222            credential,
223            account_id,
224            Arc::new(InstrumentCache::new()),
225            heartbeat,
226        )
227    }
228
229    /// Creates a new private WebSocket client with a shared instrument cache.
230    ///
231    /// Use this when you want to share instrument data with the HTTP client.
232    #[must_use]
233    pub fn new_private_with_cache(
234        url: String,
235        credential: DydxCredential,
236        account_id: AccountId,
237        instrument_cache: Arc<InstrumentCache>,
238        heartbeat: Option<u64>,
239    ) -> Self {
240        // Create dummy command channel (will be replaced on connect)
241        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
242
243        Self {
244            url,
245            credential: Some(Arc::new(credential)),
246            requires_auth: true,
247            auth_tracker: AuthTracker::new(),
248            subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
249            connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
250                ConnectionMode::Closed as u8,
251            ))),
252            signal: Arc::new(AtomicBool::new(false)),
253            instrument_cache,
254            account_id: Some(account_id),
255            heartbeat,
256            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
257            out_rx: None,
258            handler_task: None,
259            bars_timestamp_on_close: true,
260            encoder: Arc::new(ClientOrderIdEncoder::new()),
261        }
262    }
263
264    /// Returns the credential associated with this client, if any.
265    #[must_use]
266    pub fn credential(&self) -> Option<&Arc<DydxCredential>> {
267        self.credential.as_ref()
268    }
269
270    /// Returns `true` when the client is connected.
271    #[must_use]
272    pub fn is_connected(&self) -> bool {
273        let mode = self.connection_mode.load();
274        let mode_u8 = mode.load(Ordering::Relaxed);
275        matches!(
276            mode_u8,
277            x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
278        )
279    }
280
281    /// Returns the URL of this WebSocket client.
282    #[must_use]
283    pub fn url(&self) -> &str {
284        &self.url
285    }
286
287    /// Returns a clone of the connection mode atomic reference.
288    ///
289    /// This is primarily used for Python bindings that need to monitor connection state.
290    #[must_use]
291    pub fn connection_mode_atomic(&self) -> Arc<ArcSwap<AtomicU8>> {
292        self.connection_mode.clone()
293    }
294
295    /// Sets whether to timestamp bars at close time (open + interval).
296    pub fn set_bars_timestamp_on_close(&mut self, value: bool) {
297        self.bars_timestamp_on_close = value;
298    }
299
300    /// Sets the account ID for account message parsing.
301    pub fn set_account_id(&mut self, account_id: AccountId) {
302        self.account_id = Some(account_id);
303    }
304
305    /// Returns the account ID if set.
306    #[must_use]
307    pub fn account_id(&self) -> Option<AccountId> {
308        self.account_id
309    }
310
311    /// Replaces the instrument cache with an externally shared one.
312    ///
313    /// Use this to share the HTTP client's cache (which includes CLOB pair ID
314    /// and market ticker indices) with the WebSocket client. Must be called
315    /// before `connect()`.
316    pub fn set_instrument_cache(&mut self, cache: Arc<InstrumentCache>) {
317        self.instrument_cache = cache;
318    }
319
320    /// Caches a single instrument.
321    ///
322    /// Any existing instrument with the same ID will be replaced.
323    /// Uses the shared `InstrumentCache` for symbol-based lookups.
324    pub fn cache_instrument(&self, instrument: InstrumentAny) {
325        self.instrument_cache
326            .insert_instrument_only(instrument.clone());
327
328        // Before connect() the handler isn't running; this send will fail and that's expected
329        // because connect() replays the instruments via InitializeInstruments
330        if let Ok(cmd_tx) = self.cmd_tx.try_read()
331            && let Err(e) = cmd_tx.send(HandlerCommand::UpdateInstrument(Box::new(instrument)))
332        {
333            log::debug!("Failed to send UpdateInstrument command to handler: {e}");
334        }
335    }
336
337    /// Caches multiple instruments.
338    ///
339    /// Any existing instruments with the same IDs will be replaced.
340    /// Uses the shared `InstrumentCache` for symbol-based lookups.
341    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
342        log::debug!(
343            "Caching {} instruments in WebSocket client",
344            instruments.len()
345        );
346        self.instrument_cache
347            .insert_instruments_only(instruments.clone());
348
349        // Before connect() the handler isn't running; this send will fail and that's expected
350        // because connect() replays the instruments via InitializeInstruments
351        if !instruments.is_empty()
352            && let Ok(cmd_tx) = self.cmd_tx.try_read()
353            && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments))
354        {
355            log::debug!("Failed to send InitializeInstruments command to handler: {e}");
356        }
357    }
358
359    /// Returns a reference to the shared instrument cache.
360    #[must_use]
361    pub fn instrument_cache(&self) -> &Arc<InstrumentCache> {
362        &self.instrument_cache
363    }
364
365    /// Returns a reference to the shared client order ID encoder.
366    #[must_use]
367    pub fn encoder(&self) -> &Arc<ClientOrderIdEncoder> {
368        &self.encoder
369    }
370
371    /// Returns all cached instruments.
372    ///
373    /// This is a snapshot of the current cache contents.
374    #[must_use]
375    pub fn all_instruments(&self) -> Vec<InstrumentAny> {
376        self.instrument_cache.all_instruments()
377    }
378
379    /// Returns the number of cached instruments.
380    #[must_use]
381    pub fn cached_instruments_count(&self) -> usize {
382        self.instrument_cache.len()
383    }
384
385    /// Retrieves an instrument from the cache by InstrumentId.
386    ///
387    /// Returns `None` if the instrument is not found.
388    #[must_use]
389    pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
390        self.instrument_cache.get(instrument_id)
391    }
392
393    /// Retrieves an instrument from the cache by market ticker (e.g., "BTC-USD").
394    ///
395    /// Returns `None` if the instrument is not found.
396    #[must_use]
397    pub fn get_instrument_by_market(&self, ticker: &str) -> Option<InstrumentAny> {
398        self.instrument_cache.get_by_market(ticker)
399    }
400
401    /// Takes ownership of the inbound typed message receiver.
402    /// Returns None if the receiver has already been taken or not connected.
403    pub fn take_receiver(
404        &mut self,
405    ) -> Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>> {
406        self.out_rx.take()
407    }
408
409    /// Returns a stream of typed WebSocket messages.
410    ///
411    /// Takes ownership of the message receiver and returns it as a `Stream`.
412    ///
413    /// # Panics
414    ///
415    /// Panics if the receiver has already been taken.
416    pub fn stream(
417        &mut self,
418    ) -> impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static {
419        let mut rx = self
420            .out_rx
421            .take()
422            .expect("Message stream receiver already taken or not connected");
423
424        async_stream::stream! {
425            while let Some(msg) = rx.recv().await {
426                yield msg;
427            }
428        }
429    }
430
431    /// Connects the websocket client in handler mode with automatic reconnection.
432    ///
433    /// Spawns a background handler task that owns the WebSocketClient and processes
434    /// raw messages into typed [`NautilusWsMessage`] values.
435    ///
436    /// # Errors
437    ///
438    /// Returns an error if the connection cannot be established.
439    pub async fn connect(&mut self) -> DydxWsResult<()> {
440        if self.is_connected() {
441            return Ok(());
442        }
443
444        // Reset stop signal from any previous disconnect
445        self.signal.store(false, Ordering::Relaxed);
446
447        let (message_handler, raw_rx) = channel_message_handler();
448
449        let cfg = WebSocketConfig {
450            url: self.url.clone(),
451            headers: vec![],
452            heartbeat: self.heartbeat,
453            heartbeat_msg: None,
454            reconnect_timeout_ms: Some(15_000),
455            reconnect_delay_initial_ms: Some(250),
456            reconnect_delay_max_ms: Some(5_000),
457            reconnect_backoff_factor: Some(2.0),
458            reconnect_jitter_ms: Some(200),
459            reconnect_max_attempts: None,
460        };
461
462        let client = WebSocketClient::connect(
463            cfg,
464            Some(message_handler),
465            None,
466            None,
467            vec![],
468            Some(*DYDX_WS_SUBSCRIPTION_QUOTA),
469        )
470        .await
471        .map_err(|e| DydxWsError::Transport(e.to_string()))?;
472
473        // Update connection state atomically
474        self.connection_mode.store(client.connection_mode_atomic());
475
476        // Create fresh channels for this connection
477        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
478        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
479
480        // Update the shared cmd_tx so all clones see the new sender
481        {
482            let mut guard = self.cmd_tx.write().await;
483            *guard = cmd_tx;
484        }
485        self.out_rx = Some(out_rx);
486
487        // Replay cached instruments to the new handler
488        if self.instrument_cache.is_empty() {
489            log::warn!("No cached instruments to replay to WebSocket handler");
490        } else {
491            let cached_instruments = self.instrument_cache.all_instruments();
492            log::debug!(
493                "Replaying {} cached instruments to WebSocket handler",
494                cached_instruments.len()
495            );
496            let cmd_tx_guard = self.cmd_tx.read().await;
497            if let Err(e) =
498                cmd_tx_guard.send(HandlerCommand::InitializeInstruments(cached_instruments))
499            {
500                log::error!("Failed to replay instruments to handler: {e}");
501            }
502        }
503
504        // Spawn handler task
505        let account_id = self.account_id;
506        let signal = self.signal.clone();
507        let subscriptions = self.subscriptions.clone();
508        let bars_timestamp_on_close = self.bars_timestamp_on_close;
509
510        let handler_task = get_runtime().spawn(async move {
511            let mut handler = FeedHandler::new(
512                account_id,
513                cmd_rx,
514                out_tx,
515                raw_rx,
516                client,
517                signal,
518                subscriptions,
519                bars_timestamp_on_close,
520            );
521            handler.run().await;
522        });
523
524        self.handler_task = Some(handler_task);
525        log::info!("Connected dYdX WebSocket: {}", self.url);
526        Ok(())
527    }
528
529    /// Disconnects the websocket client.
530    ///
531    /// # Errors
532    ///
533    /// Returns an error if the underlying client cannot be accessed.
534    pub async fn disconnect(&mut self) -> DydxWsResult<()> {
535        // Set stop signal
536        self.signal.store(true, Ordering::Relaxed);
537
538        // Reset connection mode to Closed so is_connected() returns false
539        // and subsequent connect() calls will create new channels
540        self.connection_mode
541            .store(Arc::new(AtomicU8::new(ConnectionMode::Closed as u8)));
542
543        // Abort handler task if it exists
544        if let Some(handle) = self.handler_task.take() {
545            handle.abort();
546        }
547
548        // Drop receiver to stop any consumers
549        self.out_rx = None;
550
551        log::info!("Disconnected dYdX WebSocket");
552        Ok(())
553    }
554
555    /// Sends a text message via the handler.
556    async fn send_text_inner(&self, text: &str) -> DydxWsResult<()> {
557        self.cmd_tx
558            .read()
559            .await
560            .send(HandlerCommand::SendText(text.to_string()))
561            .map_err(|e| {
562                DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
563            })?;
564        Ok(())
565    }
566
567    /// Sends a command to the handler.
568    ///
569    /// # Errors
570    ///
571    /// Returns an error if the handler task has terminated.
572    pub fn send_command(&self, cmd: HandlerCommand) -> DydxWsResult<()> {
573        if let Ok(guard) = self.cmd_tx.try_read() {
574            guard.send(cmd).map_err(|e| {
575                DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
576            })?;
577        } else {
578            return Err(DydxWsError::Transport(
579                "Failed to acquire lock on command channel".to_string(),
580            ));
581        }
582        Ok(())
583    }
584
585    fn ticker_from_instrument_id(instrument_id: &InstrumentId) -> String {
586        let mut s = instrument_id.symbol.as_str().to_string();
587        if let Some(stripped) = s.strip_suffix("-PERP") {
588            s = stripped.to_string();
589        }
590        s
591    }
592
593    fn topic(channel: DydxWsChannel, id: Option<&str>) -> String {
594        match id {
595            Some(id) => format!("{}{}{}", channel.as_ref(), DYDX_WS_TOPIC_DELIMITER, id),
596            None => channel.as_ref().to_string(),
597        }
598    }
599
600    async fn send_and_track_subscribe(
601        &self,
602        sub: DydxSubscription,
603        topic: &str,
604    ) -> DydxWsResult<()> {
605        self.subscriptions.mark_subscribe(topic);
606
607        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
608            let _ = cmd_tx.send(HandlerCommand::RegisterSubscription {
609                topic: topic.to_string(),
610                subscription: sub.clone(),
611            });
612        }
613
614        let payload = serde_json::to_string(&sub)?;
615        if let Err(e) = self.send_text_inner(&payload).await {
616            self.subscriptions.mark_failure(topic);
617            self.subscriptions.remove_reference(topic);
618            return Err(e);
619        }
620        Ok(())
621    }
622
623    async fn send_and_track_unsubscribe(
624        &self,
625        sub: DydxSubscription,
626        topic: &str,
627    ) -> DydxWsResult<()> {
628        self.subscriptions.mark_unsubscribe(topic);
629
630        let payload = serde_json::to_string(&sub)?;
631        if let Err(e) = self.send_text_inner(&payload).await {
632            self.subscriptions.add_reference(topic);
633            self.subscriptions.mark_subscribe(topic);
634            return Err(e);
635        }
636
637        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
638            let _ = cmd_tx.send(HandlerCommand::UnregisterSubscription {
639                topic: topic.to_string(),
640            });
641        }
642
643        Ok(())
644    }
645
646    /// Subscribes to public trade updates for a specific instrument.
647    ///
648    /// # Errors
649    ///
650    /// Returns an error if the subscription request fails.
651    ///
652    /// # References
653    ///
654    /// <https://docs.dydx.trade/developers/indexer/websockets#trades-channel>
655    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
656        let ticker = Self::ticker_from_instrument_id(&instrument_id);
657        let topic = Self::topic(DydxWsChannel::Trades, Some(&ticker));
658        if !self.subscriptions.add_reference(&topic) {
659            return Ok(());
660        }
661
662        let sub = DydxSubscription {
663            op: DydxWsOperation::Subscribe,
664            channel: DydxWsChannel::Trades,
665            id: Some(ticker),
666        };
667
668        self.send_and_track_subscribe(sub, &topic).await
669    }
670
671    /// Unsubscribes from public trade updates for a specific instrument.
672    ///
673    /// # Errors
674    ///
675    /// Returns an error if the unsubscription request fails.
676    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
677        let ticker = Self::ticker_from_instrument_id(&instrument_id);
678        let topic = Self::topic(DydxWsChannel::Trades, Some(&ticker));
679        if !self.subscriptions.remove_reference(&topic) {
680            return Ok(());
681        }
682
683        let sub = DydxSubscription {
684            op: DydxWsOperation::Unsubscribe,
685            channel: DydxWsChannel::Trades,
686            id: Some(ticker),
687        };
688
689        self.send_and_track_unsubscribe(sub, &topic).await
690    }
691
692    /// Subscribes to orderbook updates for a specific instrument.
693    ///
694    /// # Errors
695    ///
696    /// Returns an error if the subscription request fails.
697    ///
698    /// # References
699    ///
700    /// <https://docs.dydx.trade/developers/indexer/websockets#orderbook-channel>
701    pub async fn subscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
702        let ticker = Self::ticker_from_instrument_id(&instrument_id);
703        let topic = Self::topic(DydxWsChannel::Orderbook, Some(&ticker));
704        if !self.subscriptions.add_reference(&topic) {
705            return Ok(());
706        }
707
708        let sub = DydxSubscription {
709            op: DydxWsOperation::Subscribe,
710            channel: DydxWsChannel::Orderbook,
711            id: Some(ticker),
712        };
713
714        self.send_and_track_subscribe(sub, &topic).await
715    }
716
717    /// Unsubscribes from orderbook updates for a specific instrument.
718    ///
719    /// # Errors
720    ///
721    /// Returns an error if the unsubscription request fails.
722    pub async fn unsubscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
723        let ticker = Self::ticker_from_instrument_id(&instrument_id);
724        let topic = Self::topic(DydxWsChannel::Orderbook, Some(&ticker));
725        if !self.subscriptions.remove_reference(&topic) {
726            return Ok(());
727        }
728
729        let sub = DydxSubscription {
730            op: DydxWsOperation::Unsubscribe,
731            channel: DydxWsChannel::Orderbook,
732            id: Some(ticker),
733        };
734
735        self.send_and_track_unsubscribe(sub, &topic).await
736    }
737
738    /// Subscribes to candle/kline updates for a specific instrument.
739    ///
740    /// # Errors
741    ///
742    /// Returns an error if the subscription request fails.
743    ///
744    /// # References
745    ///
746    /// <https://docs.dydx.trade/developers/indexer/websockets#candles-channel>
747    pub async fn subscribe_candles(
748        &self,
749        instrument_id: InstrumentId,
750        resolution: &str,
751    ) -> DydxWsResult<()> {
752        let ticker = Self::ticker_from_instrument_id(&instrument_id);
753        let id = format!("{ticker}/{resolution}");
754        let topic = Self::topic(DydxWsChannel::Candles, Some(&id));
755        if !self.subscriptions.add_reference(&topic) {
756            return Ok(());
757        }
758
759        let sub = DydxSubscription {
760            op: DydxWsOperation::Subscribe,
761            channel: DydxWsChannel::Candles,
762            id: Some(id),
763        };
764
765        self.send_and_track_subscribe(sub, &topic).await
766    }
767
768    /// Unsubscribes from candle/kline updates for a specific instrument.
769    ///
770    /// # Errors
771    ///
772    /// Returns an error if the unsubscription request fails.
773    pub async fn unsubscribe_candles(
774        &self,
775        instrument_id: InstrumentId,
776        resolution: &str,
777    ) -> DydxWsResult<()> {
778        let ticker = Self::ticker_from_instrument_id(&instrument_id);
779        let id = format!("{ticker}/{resolution}");
780        let topic = Self::topic(DydxWsChannel::Candles, Some(&id));
781        if !self.subscriptions.remove_reference(&topic) {
782            return Ok(());
783        }
784
785        let sub = DydxSubscription {
786            op: DydxWsOperation::Unsubscribe,
787            channel: DydxWsChannel::Candles,
788            id: Some(id),
789        };
790
791        self.send_and_track_unsubscribe(sub, &topic).await
792    }
793
794    /// Subscribes to market updates for all instruments.
795    ///
796    /// # Errors
797    ///
798    /// Returns an error if the subscription request fails.
799    ///
800    /// # References
801    ///
802    /// <https://docs.dydx.trade/developers/indexer/websockets#markets-channel>
803    pub async fn subscribe_markets(&self) -> DydxWsResult<()> {
804        let topic = Self::topic(DydxWsChannel::Markets, None);
805        if !self.subscriptions.add_reference(&topic) {
806            return Ok(());
807        }
808
809        let sub = DydxSubscription {
810            op: DydxWsOperation::Subscribe,
811            channel: DydxWsChannel::Markets,
812            id: None,
813        };
814
815        self.send_and_track_subscribe(sub, &topic).await
816    }
817
818    /// Unsubscribes from market updates.
819    ///
820    /// # Errors
821    ///
822    /// Returns an error if the unsubscription request fails.
823    pub async fn unsubscribe_markets(&self) -> DydxWsResult<()> {
824        let topic = Self::topic(DydxWsChannel::Markets, None);
825        if !self.subscriptions.remove_reference(&topic) {
826            return Ok(());
827        }
828
829        let sub = DydxSubscription {
830            op: DydxWsOperation::Unsubscribe,
831            channel: DydxWsChannel::Markets,
832            id: None,
833        };
834
835        self.send_and_track_unsubscribe(sub, &topic).await
836    }
837
838    /// Subscribes to subaccount updates (orders, fills, positions, balances).
839    ///
840    /// This requires authentication and will only work for private WebSocket clients
841    /// created with [`Self::new_private`].
842    ///
843    /// # Errors
844    ///
845    /// Returns an error if the client was not created with credentials or if the
846    /// subscription request fails.
847    ///
848    /// # References
849    ///
850    /// <https://docs.dydx.trade/developers/indexer/websockets#subaccounts-channel>
851    pub async fn subscribe_subaccount(
852        &self,
853        address: &str,
854        subaccount_number: u32,
855    ) -> DydxWsResult<()> {
856        if !self.requires_auth {
857            return Err(DydxWsError::Authentication(
858                "Subaccount subscriptions require authentication. Use new_private() to create an authenticated client".to_string(),
859            ));
860        }
861        let id = format!("{address}/{subaccount_number}");
862        let topic = Self::topic(DydxWsChannel::Subaccounts, Some(&id));
863        if !self.subscriptions.add_reference(&topic) {
864            return Ok(());
865        }
866
867        let sub = DydxSubscription {
868            op: DydxWsOperation::Subscribe,
869            channel: DydxWsChannel::Subaccounts,
870            id: Some(id),
871        };
872
873        self.send_and_track_subscribe(sub, &topic).await
874    }
875
876    /// Unsubscribes from subaccount updates.
877    ///
878    /// # Errors
879    ///
880    /// Returns an error if the unsubscription request fails.
881    pub async fn unsubscribe_subaccount(
882        &self,
883        address: &str,
884        subaccount_number: u32,
885    ) -> DydxWsResult<()> {
886        let id = format!("{address}/{subaccount_number}");
887        let topic = Self::topic(DydxWsChannel::Subaccounts, Some(&id));
888        if !self.subscriptions.remove_reference(&topic) {
889            return Ok(());
890        }
891
892        let sub = DydxSubscription {
893            op: DydxWsOperation::Unsubscribe,
894            channel: DydxWsChannel::Subaccounts,
895            id: Some(id),
896        };
897
898        self.send_and_track_unsubscribe(sub, &topic).await
899    }
900
901    /// Subscribes to block height updates.
902    ///
903    /// # Errors
904    ///
905    /// Returns an error if the subscription request fails.
906    ///
907    /// # References
908    ///
909    /// <https://docs.dydx.trade/developers/indexer/websockets#block-height-channel>
910    pub async fn subscribe_block_height(&self) -> DydxWsResult<()> {
911        let topic = Self::topic(DydxWsChannel::BlockHeight, None);
912        if !self.subscriptions.add_reference(&topic) {
913            return Ok(());
914        }
915
916        let sub = DydxSubscription {
917            op: DydxWsOperation::Subscribe,
918            channel: DydxWsChannel::BlockHeight,
919            id: None,
920        };
921
922        self.send_and_track_subscribe(sub, &topic).await
923    }
924
925    /// Unsubscribes from block height updates.
926    ///
927    /// # Errors
928    ///
929    /// Returns an error if the unsubscription request fails.
930    pub async fn unsubscribe_block_height(&self) -> DydxWsResult<()> {
931        let topic = Self::topic(DydxWsChannel::BlockHeight, None);
932        if !self.subscriptions.remove_reference(&topic) {
933            return Ok(());
934        }
935
936        let sub = DydxSubscription {
937            op: DydxWsOperation::Unsubscribe,
938            channel: DydxWsChannel::BlockHeight,
939            id: None,
940        };
941
942        self.send_and_track_unsubscribe(sub, &topic).await
943    }
944}