1pub const DYDX_RATE_LIMIT_KEY_SUBSCRIPTION: &str = "subscription";
37
38pub const DYDX_WS_TOPIC_DELIMITER: char = ':';
40
41pub static DYDX_WS_SUBSCRIPTION_QUOTA: LazyLock<Quota> =
43 LazyLock::new(|| Quota::per_second(NonZeroU32::new(2).expect("non-zero")));
44
45use std::{
46 num::NonZeroU32,
47 sync::{
48 Arc, LazyLock,
49 atomic::{AtomicBool, AtomicU8, Ordering},
50 },
51};
52
53use arc_swap::ArcSwap;
54use dashmap::DashMap;
55use nautilus_model::{
56 identifiers::{AccountId, InstrumentId},
57 instruments::{Instrument, InstrumentAny},
58};
59use nautilus_network::{
60 mode::ConnectionMode,
61 ratelimiter::quota::Quota,
62 websocket::{
63 AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
64 },
65};
66use ustr::Ustr;
67
68use super::{
69 enums::NautilusWsMessage,
70 error::{DydxWsError, DydxWsResult},
71 handler::{FeedHandler, HandlerCommand},
72};
73use crate::common::credential::DydxCredential;
74
75#[derive(Debug)]
99#[cfg_attr(
100 feature = "python",
101 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx")
102)]
103pub struct DydxWebSocketClient {
104 url: String,
106 credential: Option<Arc<DydxCredential>>,
108 requires_auth: bool,
110 auth_tracker: AuthTracker,
112 subscriptions: SubscriptionState,
114 connection_mode: Arc<ArcSwap<AtomicU8>>,
116 signal: Arc<AtomicBool>,
118 instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
120 account_id: Option<AccountId>,
122 heartbeat: Option<u64>,
124 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
126 out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
128 handler_task: Option<tokio::task::JoinHandle<()>>,
130}
131
132impl Clone for DydxWebSocketClient {
133 fn clone(&self) -> Self {
134 Self {
135 url: self.url.clone(),
136 credential: self.credential.clone(),
137 requires_auth: self.requires_auth,
138 auth_tracker: self.auth_tracker.clone(),
139 subscriptions: self.subscriptions.clone(),
140 connection_mode: self.connection_mode.clone(),
141 signal: self.signal.clone(),
142 instruments_cache: self.instruments_cache.clone(),
143 account_id: self.account_id,
144 heartbeat: self.heartbeat,
145 cmd_tx: self.cmd_tx.clone(),
146 out_rx: None, handler_task: None, }
149 }
150}
151
152impl DydxWebSocketClient {
153 #[must_use]
155 pub fn new_public(url: String, _heartbeat: Option<u64>) -> Self {
156 use std::sync::atomic::AtomicU8;
157
158 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
160
161 Self {
162 url,
163 credential: None,
164 requires_auth: false,
165 auth_tracker: AuthTracker::new(),
166 subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
167 connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
168 ConnectionMode::Closed as u8,
169 ))),
170 signal: Arc::new(AtomicBool::new(false)),
171 instruments_cache: Arc::new(DashMap::new()),
172 account_id: None,
173 heartbeat: _heartbeat,
174 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
175 out_rx: None,
176 handler_task: None,
177 }
178 }
179
180 #[must_use]
182 pub fn new_private(
183 url: String,
184 credential: DydxCredential,
185 account_id: AccountId,
186 _heartbeat: Option<u64>,
187 ) -> Self {
188 use std::sync::atomic::AtomicU8;
189
190 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
192
193 Self {
194 url,
195 credential: Some(Arc::new(credential)),
196 requires_auth: true,
197 auth_tracker: AuthTracker::new(),
198 subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
199 connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
200 ConnectionMode::Closed as u8,
201 ))),
202 signal: Arc::new(AtomicBool::new(false)),
203 instruments_cache: Arc::new(DashMap::new()),
204 account_id: Some(account_id),
205 heartbeat: _heartbeat,
206 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
207 out_rx: None,
208 handler_task: None,
209 }
210 }
211
212 #[must_use]
214 pub fn credential(&self) -> Option<&Arc<DydxCredential>> {
215 self.credential.as_ref()
216 }
217
218 #[must_use]
220 pub fn is_connected(&self) -> bool {
221 let mode = self.connection_mode.load();
222 let mode_u8 = mode.load(Ordering::Relaxed);
223 matches!(
224 mode_u8,
225 x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
226 )
227 }
228
229 #[must_use]
231 pub fn url(&self) -> &str {
232 &self.url
233 }
234
235 #[must_use]
239 pub fn connection_mode_atomic(&self) -> Arc<ArcSwap<AtomicU8>> {
240 self.connection_mode.clone()
241 }
242
243 pub fn set_account_id(&mut self, account_id: AccountId) {
245 self.account_id = Some(account_id);
246 }
247
248 #[must_use]
250 pub fn account_id(&self) -> Option<AccountId> {
251 self.account_id
252 }
253
254 pub fn cache_instrument(&self, instrument: InstrumentAny) {
258 let symbol = instrument.id().symbol.inner();
259 self.instruments_cache.insert(symbol, instrument.clone());
260
261 if let Ok(cmd_tx) = self.cmd_tx.try_read()
264 && let Err(e) = cmd_tx.send(HandlerCommand::UpdateInstrument(Box::new(instrument)))
265 {
266 tracing::debug!("Failed to send UpdateInstrument command to handler: {e}");
267 }
268 }
269
270 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
274 for instrument in &instruments {
275 self.instruments_cache
276 .insert(instrument.id().symbol.inner(), instrument.clone());
277 }
278
279 if !instruments.is_empty()
282 && let Ok(cmd_tx) = self.cmd_tx.try_read()
283 && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments))
284 {
285 tracing::debug!("Failed to send InitializeInstruments command to handler: {e}");
286 }
287 }
288
289 #[must_use]
291 pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
292 &self.instruments_cache
293 }
294
295 #[must_use]
299 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
300 self.instruments_cache.get(symbol).map(|r| r.clone())
301 }
302
303 pub fn take_receiver(
306 &mut self,
307 ) -> Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>> {
308 self.out_rx.take()
309 }
310
311 pub async fn connect(&mut self) -> DydxWsResult<()> {
320 if self.is_connected() {
321 return Ok(());
322 }
323
324 self.signal.store(false, Ordering::Relaxed);
326
327 let (message_handler, raw_rx) = channel_message_handler();
328
329 let cfg = WebSocketConfig {
330 url: self.url.clone(),
331 headers: vec![],
332 message_handler: Some(message_handler),
333 heartbeat: self.heartbeat,
334 heartbeat_msg: None,
335 ping_handler: None,
336 reconnect_timeout_ms: Some(15_000),
337 reconnect_delay_initial_ms: Some(250),
338 reconnect_delay_max_ms: Some(5_000),
339 reconnect_backoff_factor: Some(2.0),
340 reconnect_jitter_ms: Some(200),
341 reconnect_max_attempts: None,
342 };
343
344 let client = WebSocketClient::connect(cfg, None, vec![], Some(*DYDX_WS_SUBSCRIPTION_QUOTA))
345 .await
346 .map_err(|e| DydxWsError::Transport(e.to_string()))?;
347
348 self.connection_mode.store(client.connection_mode_atomic());
350
351 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
353 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
354
355 {
357 let mut guard = self.cmd_tx.write().await;
358 *guard = cmd_tx;
359 }
360 self.out_rx = Some(out_rx);
361
362 if !self.instruments_cache.is_empty() {
364 let cached_instruments: Vec<InstrumentAny> = self
365 .instruments_cache
366 .iter()
367 .map(|entry| entry.value().clone())
368 .collect();
369 let cmd_tx_guard = self.cmd_tx.read().await;
370 if let Err(e) =
371 cmd_tx_guard.send(HandlerCommand::InitializeInstruments(cached_instruments))
372 {
373 tracing::error!("Failed to replay instruments to handler: {e}");
374 }
375 }
376
377 let account_id = self.account_id;
379 let signal = self.signal.clone();
380 let subscriptions = self.subscriptions.clone();
381
382 let handler_task = tokio::spawn(async move {
383 let mut handler = FeedHandler::new(
384 account_id,
385 cmd_rx,
386 out_tx,
387 raw_rx,
388 client,
389 signal,
390 subscriptions,
391 );
392 handler.run().await;
393 });
394
395 self.handler_task = Some(handler_task);
396 tracing::info!("Connected dYdX WebSocket: {}", self.url);
397 Ok(())
398 }
399
400 pub async fn disconnect(&mut self) -> DydxWsResult<()> {
406 self.signal.store(true, Ordering::Relaxed);
408
409 self.connection_mode
412 .store(Arc::new(AtomicU8::new(ConnectionMode::Closed as u8)));
413
414 if let Some(handle) = self.handler_task.take() {
416 handle.abort();
417 }
418
419 self.out_rx = None;
421
422 tracing::info!("Disconnected dYdX WebSocket");
423 Ok(())
424 }
425
426 async fn send_text_inner(&self, text: &str) -> DydxWsResult<()> {
428 self.cmd_tx
429 .read()
430 .await
431 .send(HandlerCommand::SendText(text.to_string()))
432 .map_err(|e| {
433 DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
434 })?;
435 Ok(())
436 }
437
438 pub fn send_command(&self, cmd: HandlerCommand) -> DydxWsResult<()> {
444 if let Ok(guard) = self.cmd_tx.try_read() {
445 guard.send(cmd).map_err(|e| {
446 DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
447 })?;
448 } else {
449 return Err(DydxWsError::Transport(
450 "Failed to acquire lock on command channel".to_string(),
451 ));
452 }
453 Ok(())
454 }
455
456 fn ticker_from_instrument_id(instrument_id: &InstrumentId) -> String {
457 let mut s = instrument_id.symbol.as_str().to_string();
458 if let Some(stripped) = s.strip_suffix("-PERP") {
459 s = stripped.to_string();
460 }
461 s
462 }
463
464 fn topic(channel: super::enums::DydxWsChannel, id: Option<&str>) -> String {
465 match id {
466 Some(id) => format!("{}{}{}", channel.as_ref(), DYDX_WS_TOPIC_DELIMITER, id),
467 None => channel.as_ref().to_string(),
468 }
469 }
470
471 async fn send_and_track_subscribe(
472 &self,
473 sub: super::messages::DydxSubscription,
474 topic: &str,
475 ) -> DydxWsResult<()> {
476 self.subscriptions.mark_subscribe(topic);
477 let payload = serde_json::to_string(&sub)?;
478 if let Err(e) = self.send_text_inner(&payload).await {
479 self.subscriptions.mark_failure(topic);
480 self.subscriptions.remove_reference(topic);
481 return Err(e);
482 }
483 Ok(())
484 }
485
486 async fn send_and_track_unsubscribe(
487 &self,
488 sub: super::messages::DydxSubscription,
489 topic: &str,
490 ) -> DydxWsResult<()> {
491 self.subscriptions.mark_unsubscribe(topic);
492 let payload = serde_json::to_string(&sub)?;
493 if let Err(e) = self.send_text_inner(&payload).await {
494 self.subscriptions.add_reference(topic);
496 self.subscriptions.mark_subscribe(topic);
497 return Err(e);
498 }
499 Ok(())
500 }
501
502 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
512 let ticker = Self::ticker_from_instrument_id(&instrument_id);
513 let topic = Self::topic(super::enums::DydxWsChannel::Trades, Some(&ticker));
514 if !self.subscriptions.add_reference(&topic) {
515 return Ok(());
516 }
517
518 let sub = super::messages::DydxSubscription {
519 op: super::enums::DydxWsOperation::Subscribe,
520 channel: super::enums::DydxWsChannel::Trades,
521 id: Some(ticker),
522 };
523
524 self.send_and_track_subscribe(sub, &topic).await
525 }
526
527 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
533 let ticker = Self::ticker_from_instrument_id(&instrument_id);
534 let topic = Self::topic(super::enums::DydxWsChannel::Trades, Some(&ticker));
535 if !self.subscriptions.remove_reference(&topic) {
536 return Ok(());
537 }
538
539 let sub = super::messages::DydxSubscription {
540 op: super::enums::DydxWsOperation::Unsubscribe,
541 channel: super::enums::DydxWsChannel::Trades,
542 id: Some(ticker),
543 };
544
545 self.send_and_track_unsubscribe(sub, &topic).await
546 }
547
548 pub async fn subscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
558 let ticker = Self::ticker_from_instrument_id(&instrument_id);
559 let topic = Self::topic(super::enums::DydxWsChannel::Orderbook, Some(&ticker));
560 if !self.subscriptions.add_reference(&topic) {
561 return Ok(());
562 }
563
564 let sub = super::messages::DydxSubscription {
565 op: super::enums::DydxWsOperation::Subscribe,
566 channel: super::enums::DydxWsChannel::Orderbook,
567 id: Some(ticker),
568 };
569
570 self.send_and_track_subscribe(sub, &topic).await
571 }
572
573 pub async fn unsubscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
579 let ticker = Self::ticker_from_instrument_id(&instrument_id);
580 let topic = Self::topic(super::enums::DydxWsChannel::Orderbook, Some(&ticker));
581 if !self.subscriptions.remove_reference(&topic) {
582 return Ok(());
583 }
584
585 let sub = super::messages::DydxSubscription {
586 op: super::enums::DydxWsOperation::Unsubscribe,
587 channel: super::enums::DydxWsChannel::Orderbook,
588 id: Some(ticker),
589 };
590
591 self.send_and_track_unsubscribe(sub, &topic).await
592 }
593
594 pub async fn subscribe_candles(
604 &self,
605 instrument_id: InstrumentId,
606 resolution: &str,
607 ) -> DydxWsResult<()> {
608 let ticker = Self::ticker_from_instrument_id(&instrument_id);
609 let id = format!("{ticker}/{resolution}");
610 let topic = Self::topic(super::enums::DydxWsChannel::Candles, Some(&id));
611 if !self.subscriptions.add_reference(&topic) {
612 return Ok(());
613 }
614
615 let sub = super::messages::DydxSubscription {
616 op: super::enums::DydxWsOperation::Subscribe,
617 channel: super::enums::DydxWsChannel::Candles,
618 id: Some(id),
619 };
620
621 self.send_and_track_subscribe(sub, &topic).await
622 }
623
624 pub async fn unsubscribe_candles(
630 &self,
631 instrument_id: InstrumentId,
632 resolution: &str,
633 ) -> DydxWsResult<()> {
634 let ticker = Self::ticker_from_instrument_id(&instrument_id);
635 let id = format!("{ticker}/{resolution}");
636 let topic = Self::topic(super::enums::DydxWsChannel::Candles, Some(&id));
637 if !self.subscriptions.remove_reference(&topic) {
638 return Ok(());
639 }
640
641 let sub = super::messages::DydxSubscription {
642 op: super::enums::DydxWsOperation::Unsubscribe,
643 channel: super::enums::DydxWsChannel::Candles,
644 id: Some(id),
645 };
646
647 self.send_and_track_unsubscribe(sub, &topic).await
648 }
649
650 pub async fn subscribe_markets(&self) -> DydxWsResult<()> {
660 let topic = Self::topic(super::enums::DydxWsChannel::Markets, None);
661 if !self.subscriptions.add_reference(&topic) {
662 return Ok(());
663 }
664
665 let sub = super::messages::DydxSubscription {
666 op: super::enums::DydxWsOperation::Subscribe,
667 channel: super::enums::DydxWsChannel::Markets,
668 id: None,
669 };
670
671 self.send_and_track_subscribe(sub, &topic).await
672 }
673
674 pub async fn unsubscribe_markets(&self) -> DydxWsResult<()> {
680 let topic = Self::topic(super::enums::DydxWsChannel::Markets, None);
681 if !self.subscriptions.remove_reference(&topic) {
682 return Ok(());
683 }
684
685 let sub = super::messages::DydxSubscription {
686 op: super::enums::DydxWsOperation::Unsubscribe,
687 channel: super::enums::DydxWsChannel::Markets,
688 id: None,
689 };
690
691 self.send_and_track_unsubscribe(sub, &topic).await
692 }
693
694 pub async fn subscribe_subaccount(
708 &self,
709 address: &str,
710 subaccount_number: u32,
711 ) -> DydxWsResult<()> {
712 if !self.requires_auth {
713 return Err(DydxWsError::Authentication(
714 "Subaccount subscriptions require authentication. Use new_private() to create an authenticated client".to_string(),
715 ));
716 }
717 let id = format!("{address}/{subaccount_number}");
718 let topic = Self::topic(super::enums::DydxWsChannel::Subaccounts, Some(&id));
719 if !self.subscriptions.add_reference(&topic) {
720 return Ok(());
721 }
722
723 let sub = super::messages::DydxSubscription {
724 op: super::enums::DydxWsOperation::Subscribe,
725 channel: super::enums::DydxWsChannel::Subaccounts,
726 id: Some(id),
727 };
728
729 self.send_and_track_subscribe(sub, &topic).await
730 }
731
732 pub async fn unsubscribe_subaccount(
738 &self,
739 address: &str,
740 subaccount_number: u32,
741 ) -> DydxWsResult<()> {
742 let id = format!("{address}/{subaccount_number}");
743 let topic = Self::topic(super::enums::DydxWsChannel::Subaccounts, Some(&id));
744 if !self.subscriptions.remove_reference(&topic) {
745 return Ok(());
746 }
747
748 let sub = super::messages::DydxSubscription {
749 op: super::enums::DydxWsOperation::Unsubscribe,
750 channel: super::enums::DydxWsChannel::Subaccounts,
751 id: Some(id),
752 };
753
754 self.send_and_track_unsubscribe(sub, &topic).await
755 }
756
757 pub async fn subscribe_block_height(&self) -> DydxWsResult<()> {
767 let topic = Self::topic(super::enums::DydxWsChannel::BlockHeight, None);
768 if !self.subscriptions.add_reference(&topic) {
769 return Ok(());
770 }
771
772 let sub = super::messages::DydxSubscription {
773 op: super::enums::DydxWsOperation::Subscribe,
774 channel: super::enums::DydxWsChannel::BlockHeight,
775 id: None,
776 };
777
778 self.send_and_track_subscribe(sub, &topic).await
779 }
780
781 pub async fn unsubscribe_block_height(&self) -> DydxWsResult<()> {
787 let topic = Self::topic(super::enums::DydxWsChannel::BlockHeight, None);
788 if !self.subscriptions.remove_reference(&topic) {
789 return Ok(());
790 }
791
792 let sub = super::messages::DydxSubscription {
793 op: super::enums::DydxWsOperation::Unsubscribe,
794 channel: super::enums::DydxWsChannel::BlockHeight,
795 id: None,
796 };
797
798 self.send_and_track_unsubscribe(sub, &topic).await
799 }
800}