nautilus_dydx/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket client for dYdX v4 API.
17//!
18//! This client provides streaming connectivity to dYdX's WebSocket API for both
19//! public market data and private account updates.
20//!
21//! # Authentication
22//!
23//! dYdX v4 uses Cosmos SDK wallet-based authentication. Unlike traditional exchanges:
24//! - **Public channels** require no authentication.
25//! - **Private channels** (subaccounts) only require the wallet address in the subscription message.
26//! - No signature or API key is needed for WebSocket connections themselves.
27//!
28//! # References
29//!
30//! <https://docs.dydx.trade/developers/indexer/websockets>
31
32use std::sync::{
33    Arc,
34    atomic::{AtomicBool, AtomicU8, Ordering},
35};
36
37use arc_swap::ArcSwap;
38use dashmap::DashMap;
39use nautilus_model::{
40    identifiers::{AccountId, InstrumentId},
41    instruments::{Instrument, InstrumentAny},
42};
43use nautilus_network::{
44    mode::ConnectionMode,
45    websocket::{
46        AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
47    },
48};
49use ustr::Ustr;
50
51use super::{
52    error::{DydxWsError, DydxWsResult},
53    handler::{FeedHandler, HandlerCommand},
54    messages::NautilusWsMessage,
55};
56use crate::common::credential::DydxCredential;
57
58/// WebSocket client for dYdX v4 market data and account streams.
59///
60/// # Authentication
61///
62/// dYdX v4 does not require traditional API key signatures for WebSocket connections.
63/// Public channels work without any credentials. Private channels (subaccounts) only
64/// need the wallet address included in the subscription message.
65///
66/// The [`DydxCredential`] stored in this client is used for:
67/// - Providing the wallet address for private channel subscriptions
68/// - Transaction signing (when placing orders via the validator node)
69///
70/// It is **NOT** used for WebSocket message signing or authentication.
71///
72/// # Architecture
73///
74/// This client follows a two-layer architecture:
75/// - **Outer client** (this struct): Orchestrates connection and maintains Python-accessible state
76/// - **Inner handler**: Owns WebSocketClient exclusively and processes messages in a dedicated task
77///
78/// Communication uses lock-free channels:
79/// - Commands flow from client → handler via `cmd_tx`
80/// - Parsed events flow from handler → client via `out_rx`
81#[derive(Debug)]
82#[cfg_attr(
83    feature = "python",
84    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx")
85)]
86pub struct DydxWebSocketClient {
87    /// The WebSocket connection URL.
88    url: String,
89    /// Optional credential for private channels (only wallet address is used).
90    credential: Option<Arc<DydxCredential>>,
91    /// Whether authentication is required for this client.
92    requires_auth: bool,
93    /// Authentication tracker for WebSocket connections.
94    #[allow(dead_code)]
95    auth_tracker: AuthTracker,
96    /// Subscription state tracker for managing channel subscriptions.
97    #[allow(dead_code)]
98    subscriptions: SubscriptionState,
99    /// Shared connection state (lock-free atomic).
100    connection_mode: Arc<ArcSwap<AtomicU8>>,
101    /// Manual disconnect signal.
102    signal: Arc<AtomicBool>,
103    /// Cached instruments for parsing market data (Python-accessible).
104    instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
105    /// Optional account ID for account message parsing.
106    account_id: Option<AccountId>,
107    /// Optional heartbeat interval in seconds.
108    heartbeat: Option<u64>,
109    /// Command channel sender to handler.
110    cmd_tx: Arc<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>,
111    /// Receiver for parsed Nautilus messages from handler.
112    out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
113    /// Background handler task handle.
114    handler_task: Option<tokio::task::JoinHandle<()>>,
115}
116
117impl Clone for DydxWebSocketClient {
118    fn clone(&self) -> Self {
119        Self {
120            url: self.url.clone(),
121            credential: self.credential.clone(),
122            requires_auth: self.requires_auth,
123            auth_tracker: AuthTracker::new(),
124            subscriptions: SubscriptionState::new(':'),
125            connection_mode: self.connection_mode.clone(),
126            signal: self.signal.clone(),
127            instruments_cache: self.instruments_cache.clone(),
128            account_id: self.account_id,
129            heartbeat: self.heartbeat,
130            cmd_tx: self.cmd_tx.clone(),
131            out_rx: None,       // Cannot clone receiver - only one owner allowed
132            handler_task: None, // Cannot clone task handle
133        }
134    }
135}
136
137impl DydxWebSocketClient {
138    /// Creates a new public WebSocket client for market data.
139    #[must_use]
140    pub fn new_public(url: String, _heartbeat: Option<u64>) -> Self {
141        use std::sync::atomic::AtomicU8;
142
143        // Create dummy command channel (will be replaced on connect)
144        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
145
146        Self {
147            url,
148            credential: None,
149            requires_auth: false,
150            auth_tracker: AuthTracker::new(),
151            subscriptions: SubscriptionState::new(':'), // dYdX uses ":" as delimiter (e.g., "v4_markets:BTC-USD")
152            connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
153                ConnectionMode::Closed as u8,
154            ))),
155            signal: Arc::new(AtomicBool::new(false)),
156            instruments_cache: Arc::new(DashMap::new()),
157            account_id: None,
158            heartbeat: _heartbeat,
159            cmd_tx: Arc::new(cmd_tx),
160            out_rx: None,
161            handler_task: None,
162        }
163    }
164
165    /// Creates a new private WebSocket client for account updates.
166    #[must_use]
167    pub fn new_private(
168        url: String,
169        credential: DydxCredential,
170        account_id: AccountId,
171        _heartbeat: Option<u64>,
172    ) -> Self {
173        use std::sync::atomic::AtomicU8;
174
175        // Create dummy command channel (will be replaced on connect)
176        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
177
178        Self {
179            url,
180            credential: Some(Arc::new(credential)),
181            requires_auth: true,
182            auth_tracker: AuthTracker::new(),
183            subscriptions: SubscriptionState::new(':'), // dYdX uses ":" as delimiter
184            connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
185                ConnectionMode::Closed as u8,
186            ))),
187            signal: Arc::new(AtomicBool::new(false)),
188            instruments_cache: Arc::new(DashMap::new()),
189            account_id: Some(account_id),
190            heartbeat: _heartbeat,
191            cmd_tx: Arc::new(cmd_tx),
192            out_rx: None,
193            handler_task: None,
194        }
195    }
196
197    /// Returns the credential associated with this client, if any.
198    #[must_use]
199    pub fn credential(&self) -> Option<&Arc<DydxCredential>> {
200        self.credential.as_ref()
201    }
202
203    /// Returns `true` when the client is connected.
204    #[must_use]
205    pub fn is_connected(&self) -> bool {
206        let mode = self.connection_mode.load();
207        let mode_u8 = mode.load(Ordering::Relaxed);
208        matches!(
209            mode_u8,
210            x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
211        )
212    }
213
214    /// Returns the URL of this WebSocket client.
215    #[must_use]
216    pub fn url(&self) -> &str {
217        &self.url
218    }
219
220    /// Returns a clone of the connection mode atomic reference.
221    ///
222    /// This is primarily used for Python bindings that need to monitor connection state.
223    #[must_use]
224    pub fn connection_mode_atomic(&self) -> Arc<ArcSwap<AtomicU8>> {
225        self.connection_mode.clone()
226    }
227
228    /// Sets the account ID for account message parsing.
229    pub fn set_account_id(&mut self, account_id: AccountId) {
230        self.account_id = Some(account_id);
231    }
232
233    /// Returns the account ID if set.
234    #[must_use]
235    pub fn account_id(&self) -> Option<AccountId> {
236        self.account_id
237    }
238
239    /// Caches a single instrument.
240    ///
241    /// Any existing instrument with the same ID will be replaced.
242    pub fn cache_instrument(&self, instrument: InstrumentAny) {
243        let symbol = instrument.id().symbol.inner();
244        self.instruments_cache.insert(symbol, instrument.clone());
245
246        // Send command to handler if connected
247        if let Err(e) = self
248            .cmd_tx
249            .send(HandlerCommand::UpdateInstrument(Box::new(instrument)))
250        {
251            tracing::debug!("Failed to send UpdateInstrument command to handler: {e}");
252        }
253    }
254
255    /// Caches multiple instruments.
256    ///
257    /// Any existing instruments with the same IDs will be replaced.
258    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
259        for instrument in &instruments {
260            self.instruments_cache
261                .insert(instrument.id().symbol.inner(), instrument.clone());
262        }
263
264        // Send command to handler if connected
265        if let Err(e) = self
266            .cmd_tx
267            .send(HandlerCommand::InitializeInstruments(instruments))
268        {
269            tracing::debug!("Failed to send InitializeInstruments command to handler: {e}");
270        }
271    }
272
273    /// Returns a reference to the instruments cache.
274    #[must_use]
275    pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
276        &self.instruments_cache
277    }
278
279    /// Retrieves an instrument from the cache by symbol.
280    ///
281    /// Returns `None` if the instrument is not found.
282    #[must_use]
283    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
284        self.instruments_cache.get(symbol).map(|r| r.clone())
285    }
286
287    /// Takes ownership of the inbound typed message receiver.
288    /// Returns None if the receiver has already been taken or not connected.
289    pub fn take_receiver(
290        &mut self,
291    ) -> Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>> {
292        self.out_rx.take()
293    }
294
295    /// Connects the websocket client in handler mode with automatic reconnection.
296    ///
297    /// Spawns a background handler task that owns the WebSocketClient and processes
298    /// raw messages into typed [`NautilusWsMessage`] values.
299    ///
300    /// # Errors
301    ///
302    /// Returns an error if the connection cannot be established.
303    pub async fn connect(&mut self) -> DydxWsResult<()> {
304        if self.is_connected() {
305            return Ok(());
306        }
307
308        let (message_handler, raw_rx) = channel_message_handler();
309
310        let cfg = WebSocketConfig {
311            url: self.url.clone(),
312            headers: vec![],
313            message_handler: Some(message_handler),
314            heartbeat: self.heartbeat,
315            heartbeat_msg: None,
316            ping_handler: None,
317            reconnect_timeout_ms: Some(15_000),
318            reconnect_delay_initial_ms: Some(250),
319            reconnect_delay_max_ms: Some(5_000),
320            reconnect_backoff_factor: Some(2.0),
321            reconnect_jitter_ms: Some(200),
322            reconnect_max_attempts: None,
323        };
324
325        let client = WebSocketClient::connect(cfg, None, vec![], None)
326            .await
327            .map_err(|e| DydxWsError::Transport(e.to_string()))?;
328
329        // Update connection state atomically
330        self.connection_mode.store(client.connection_mode_atomic());
331
332        // Create fresh channels for this connection
333        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
334        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
335
336        self.cmd_tx = Arc::new(cmd_tx.clone());
337        self.out_rx = Some(out_rx);
338
339        // Replay cached instruments to the new handler
340        if !self.instruments_cache.is_empty() {
341            let cached_instruments: Vec<InstrumentAny> = self
342                .instruments_cache
343                .iter()
344                .map(|entry| entry.value().clone())
345                .collect();
346            if let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(cached_instruments)) {
347                tracing::error!("Failed to replay instruments to handler: {e}");
348            }
349        }
350
351        // Spawn handler task
352        let account_id = self.account_id;
353        let signal = self.signal.clone();
354
355        let handler_task = tokio::spawn(async move {
356            let mut handler = FeedHandler::new(account_id, cmd_rx, out_tx, raw_rx, client, signal);
357            handler.run().await;
358        });
359
360        self.handler_task = Some(handler_task);
361        tracing::info!("Connected dYdX WebSocket: {}", self.url);
362        Ok(())
363    }
364
365    /// Disconnects the websocket client.
366    ///
367    /// # Errors
368    ///
369    /// Returns an error if the underlying client cannot be accessed.
370    pub async fn disconnect(&mut self) -> DydxWsResult<()> {
371        // Set stop signal
372        self.signal.store(true, Ordering::Relaxed);
373
374        // Abort handler task if it exists
375        if let Some(handle) = self.handler_task.take() {
376            handle.abort();
377        }
378
379        // Drop receiver to stop any consumers
380        self.out_rx = None;
381
382        tracing::info!("Disconnected dYdX WebSocket");
383        Ok(())
384    }
385
386    /// Sends a text message via the handler.
387    async fn send_text_inner(&self, text: &str) -> DydxWsResult<()> {
388        self.cmd_tx
389            .send(HandlerCommand::SendText(text.to_string()))
390            .map_err(|e| {
391                DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
392            })?;
393        Ok(())
394    }
395
396    /// Sends a command to the handler.
397    ///
398    /// # Errors
399    ///
400    /// Returns an error if the handler task has terminated.
401    pub fn send_command(&self, cmd: HandlerCommand) -> DydxWsResult<()> {
402        self.cmd_tx.send(cmd).map_err(|e| {
403            DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
404        })?;
405        Ok(())
406    }
407
408    fn ticker_from_instrument_id(instrument_id: &InstrumentId) -> String {
409        let mut s = instrument_id.symbol.as_str().to_string();
410        if let Some(stripped) = s.strip_suffix("-PERP") {
411            s = stripped.to_string();
412        }
413        s
414    }
415
416    /// Subscribes to public trade updates for a specific instrument.
417    ///
418    /// # Errors
419    ///
420    /// Returns an error if the subscription request fails.
421    ///
422    /// # References
423    ///
424    /// <https://docs.dydx.trade/developers/indexer/websockets#trades-channel>
425    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
426        let ticker = Self::ticker_from_instrument_id(&instrument_id);
427        let sub = super::messages::DydxSubscription {
428            op: super::enums::DydxWsOperation::Subscribe,
429            channel: super::enums::DydxWsChannel::Trades,
430            id: Some(ticker),
431        };
432        let payload = serde_json::to_string(&sub)?;
433        self.send_text_inner(&payload).await
434    }
435
436    /// Unsubscribes from public trade updates for a specific instrument.
437    ///
438    /// # Errors
439    ///
440    /// Returns an error if the unsubscription request fails.
441    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
442        let ticker = Self::ticker_from_instrument_id(&instrument_id);
443        let sub = super::messages::DydxSubscription {
444            op: super::enums::DydxWsOperation::Unsubscribe,
445            channel: super::enums::DydxWsChannel::Trades,
446            id: Some(ticker),
447        };
448        let payload = serde_json::to_string(&sub)?;
449        self.send_text_inner(&payload).await
450    }
451
452    /// Subscribes to orderbook updates for a specific instrument.
453    ///
454    /// # Errors
455    ///
456    /// Returns an error if the subscription request fails.
457    ///
458    /// # References
459    ///
460    /// <https://docs.dydx.trade/developers/indexer/websockets#orderbook-channel>
461    pub async fn subscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
462        let ticker = Self::ticker_from_instrument_id(&instrument_id);
463        let sub = super::messages::DydxSubscription {
464            op: super::enums::DydxWsOperation::Subscribe,
465            channel: super::enums::DydxWsChannel::Orderbook,
466            id: Some(ticker),
467        };
468        let payload = serde_json::to_string(&sub)?;
469        self.send_text_inner(&payload).await
470    }
471
472    /// Unsubscribes from orderbook updates for a specific instrument.
473    ///
474    /// # Errors
475    ///
476    /// Returns an error if the unsubscription request fails.
477    pub async fn unsubscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
478        let ticker = Self::ticker_from_instrument_id(&instrument_id);
479        let sub = super::messages::DydxSubscription {
480            op: super::enums::DydxWsOperation::Unsubscribe,
481            channel: super::enums::DydxWsChannel::Orderbook,
482            id: Some(ticker),
483        };
484        let payload = serde_json::to_string(&sub)?;
485        self.send_text_inner(&payload).await
486    }
487
488    /// Subscribes to candle/kline updates for a specific instrument.
489    ///
490    /// # Errors
491    ///
492    /// Returns an error if the subscription request fails.
493    ///
494    /// # References
495    ///
496    /// <https://docs.dydx.trade/developers/indexer/websockets#candles-channel>
497    pub async fn subscribe_candles(
498        &self,
499        instrument_id: InstrumentId,
500        resolution: &str,
501    ) -> DydxWsResult<()> {
502        let ticker = Self::ticker_from_instrument_id(&instrument_id);
503        let id = format!("{ticker}/{resolution}");
504        let sub = super::messages::DydxSubscription {
505            op: super::enums::DydxWsOperation::Subscribe,
506            channel: super::enums::DydxWsChannel::Candles,
507            id: Some(id),
508        };
509        let payload = serde_json::to_string(&sub)?;
510        self.send_text_inner(&payload).await
511    }
512
513    /// Unsubscribes from candle/kline updates for a specific instrument.
514    ///
515    /// # Errors
516    ///
517    /// Returns an error if the unsubscription request fails.
518    pub async fn unsubscribe_candles(
519        &self,
520        instrument_id: InstrumentId,
521        resolution: &str,
522    ) -> DydxWsResult<()> {
523        let ticker = Self::ticker_from_instrument_id(&instrument_id);
524        let id = format!("{ticker}/{resolution}");
525        let sub = super::messages::DydxSubscription {
526            op: super::enums::DydxWsOperation::Unsubscribe,
527            channel: super::enums::DydxWsChannel::Candles,
528            id: Some(id),
529        };
530        let payload = serde_json::to_string(&sub)?;
531        self.send_text_inner(&payload).await
532    }
533
534    /// Subscribes to market updates for all instruments.
535    ///
536    /// # Errors
537    ///
538    /// Returns an error if the subscription request fails.
539    ///
540    /// # References
541    ///
542    /// <https://docs.dydx.trade/developers/indexer/websockets#markets-channel>
543    pub async fn subscribe_markets(&self) -> DydxWsResult<()> {
544        let sub = super::messages::DydxSubscription {
545            op: super::enums::DydxWsOperation::Subscribe,
546            channel: super::enums::DydxWsChannel::Markets,
547            id: None,
548        };
549        let payload = serde_json::to_string(&sub)?;
550        self.send_text_inner(&payload).await
551    }
552
553    /// Unsubscribes from market updates.
554    ///
555    /// # Errors
556    ///
557    /// Returns an error if the unsubscription request fails.
558    pub async fn unsubscribe_markets(&self) -> DydxWsResult<()> {
559        let sub = super::messages::DydxSubscription {
560            op: super::enums::DydxWsOperation::Unsubscribe,
561            channel: super::enums::DydxWsChannel::Markets,
562            id: None,
563        };
564        let payload = serde_json::to_string(&sub)?;
565        self.send_text_inner(&payload).await
566    }
567
568    /// Subscribes to subaccount updates (orders, fills, positions, balances).
569    ///
570    /// This requires authentication and will only work for private WebSocket clients
571    /// created with [`Self::new_private`].
572    ///
573    /// # Errors
574    ///
575    /// Returns an error if the client was not created with credentials or if the
576    /// subscription request fails.
577    ///
578    /// # References
579    ///
580    /// <https://docs.dydx.trade/developers/indexer/websockets#subaccounts-channel>
581    pub async fn subscribe_subaccount(
582        &self,
583        address: &str,
584        subaccount_number: u32,
585    ) -> DydxWsResult<()> {
586        if !self.requires_auth {
587            return Err(DydxWsError::Authentication(
588                "Subaccount subscriptions require authentication. Use new_private() to create an authenticated client".to_string(),
589            ));
590        }
591        let id = format!("{address}/{subaccount_number}");
592        let sub = super::messages::DydxSubscription {
593            op: super::enums::DydxWsOperation::Subscribe,
594            channel: super::enums::DydxWsChannel::Subaccounts,
595            id: Some(id),
596        };
597        let payload = serde_json::to_string(&sub)?;
598        self.send_text_inner(&payload).await
599    }
600
601    /// Unsubscribes from subaccount updates.
602    ///
603    /// # Errors
604    ///
605    /// Returns an error if the unsubscription request fails.
606    pub async fn unsubscribe_subaccount(
607        &self,
608        address: &str,
609        subaccount_number: u32,
610    ) -> DydxWsResult<()> {
611        let id = format!("{address}/{subaccount_number}");
612        let sub = super::messages::DydxSubscription {
613            op: super::enums::DydxWsOperation::Unsubscribe,
614            channel: super::enums::DydxWsChannel::Subaccounts,
615            id: Some(id),
616        };
617        let payload = serde_json::to_string(&sub)?;
618        self.send_text_inner(&payload).await
619    }
620
621    /// Subscribes to block height updates.
622    ///
623    /// # Errors
624    ///
625    /// Returns an error if the subscription request fails.
626    ///
627    /// # References
628    ///
629    /// <https://docs.dydx.trade/developers/indexer/websockets#block-height-channel>
630    pub async fn subscribe_block_height(&self) -> DydxWsResult<()> {
631        let sub = super::messages::DydxSubscription {
632            op: super::enums::DydxWsOperation::Subscribe,
633            channel: super::enums::DydxWsChannel::BlockHeight,
634            id: None,
635        };
636        let payload = serde_json::to_string(&sub)?;
637        self.send_text_inner(&payload).await
638    }
639
640    /// Unsubscribes from block height updates.
641    ///
642    /// # Errors
643    ///
644    /// Returns an error if the unsubscription request fails.
645    pub async fn unsubscribe_block_height(&self) -> DydxWsResult<()> {
646        let sub = super::messages::DydxSubscription {
647            op: super::enums::DydxWsOperation::Unsubscribe,
648            channel: super::enums::DydxWsChannel::BlockHeight,
649            id: None,
650        };
651        let payload = serde_json::to_string(&sub)?;
652        self.send_text_inner(&payload).await
653    }
654}