1pub const DYDX_RATE_LIMIT_KEY_SUBSCRIPTION: &str = "subscription";
37
38pub const DYDX_WS_TOPIC_DELIMITER: char = ':';
40
41pub static DYDX_WS_SUBSCRIPTION_QUOTA: LazyLock<Quota> =
43 LazyLock::new(|| Quota::per_second(NonZeroU32::new(2).expect("non-zero")));
44
45use std::{
46 num::NonZeroU32,
47 sync::{
48 Arc, LazyLock,
49 atomic::{AtomicBool, AtomicU8, Ordering},
50 },
51};
52
53use arc_swap::ArcSwap;
54use dashmap::DashMap;
55use nautilus_common::live::get_runtime;
56use nautilus_model::{
57 identifiers::{AccountId, InstrumentId},
58 instruments::{Instrument, InstrumentAny},
59};
60use nautilus_network::{
61 mode::ConnectionMode,
62 ratelimiter::quota::Quota,
63 websocket::{
64 AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
65 },
66};
67use ustr::Ustr;
68
69use super::{
70 enums::NautilusWsMessage,
71 error::{DydxWsError, DydxWsResult},
72 handler::{FeedHandler, HandlerCommand},
73};
74use crate::common::credential::DydxCredential;
75
76#[derive(Debug)]
100#[cfg_attr(
101 feature = "python",
102 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx")
103)]
104pub struct DydxWebSocketClient {
105 url: String,
107 credential: Option<Arc<DydxCredential>>,
109 requires_auth: bool,
111 auth_tracker: AuthTracker,
113 subscriptions: SubscriptionState,
115 connection_mode: Arc<ArcSwap<AtomicU8>>,
117 signal: Arc<AtomicBool>,
119 instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
121 account_id: Option<AccountId>,
123 heartbeat: Option<u64>,
125 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
127 out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
129 handler_task: Option<tokio::task::JoinHandle<()>>,
131}
132
133impl Clone for DydxWebSocketClient {
134 fn clone(&self) -> Self {
135 Self {
136 url: self.url.clone(),
137 credential: self.credential.clone(),
138 requires_auth: self.requires_auth,
139 auth_tracker: self.auth_tracker.clone(),
140 subscriptions: self.subscriptions.clone(),
141 connection_mode: self.connection_mode.clone(),
142 signal: self.signal.clone(),
143 instruments_cache: self.instruments_cache.clone(),
144 account_id: self.account_id,
145 heartbeat: self.heartbeat,
146 cmd_tx: self.cmd_tx.clone(),
147 out_rx: None, handler_task: None, }
150 }
151}
152
153impl DydxWebSocketClient {
154 #[must_use]
156 pub fn new_public(url: String, _heartbeat: Option<u64>) -> Self {
157 use std::sync::atomic::AtomicU8;
158
159 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
161
162 Self {
163 url,
164 credential: None,
165 requires_auth: false,
166 auth_tracker: AuthTracker::new(),
167 subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
168 connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
169 ConnectionMode::Closed as u8,
170 ))),
171 signal: Arc::new(AtomicBool::new(false)),
172 instruments_cache: Arc::new(DashMap::new()),
173 account_id: None,
174 heartbeat: _heartbeat,
175 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
176 out_rx: None,
177 handler_task: None,
178 }
179 }
180
181 #[must_use]
183 pub fn new_private(
184 url: String,
185 credential: DydxCredential,
186 account_id: AccountId,
187 _heartbeat: Option<u64>,
188 ) -> Self {
189 use std::sync::atomic::AtomicU8;
190
191 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
193
194 Self {
195 url,
196 credential: Some(Arc::new(credential)),
197 requires_auth: true,
198 auth_tracker: AuthTracker::new(),
199 subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
200 connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
201 ConnectionMode::Closed as u8,
202 ))),
203 signal: Arc::new(AtomicBool::new(false)),
204 instruments_cache: Arc::new(DashMap::new()),
205 account_id: Some(account_id),
206 heartbeat: _heartbeat,
207 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
208 out_rx: None,
209 handler_task: None,
210 }
211 }
212
213 #[must_use]
215 pub fn credential(&self) -> Option<&Arc<DydxCredential>> {
216 self.credential.as_ref()
217 }
218
219 #[must_use]
221 pub fn is_connected(&self) -> bool {
222 let mode = self.connection_mode.load();
223 let mode_u8 = mode.load(Ordering::Relaxed);
224 matches!(
225 mode_u8,
226 x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
227 )
228 }
229
230 #[must_use]
232 pub fn url(&self) -> &str {
233 &self.url
234 }
235
236 #[must_use]
240 pub fn connection_mode_atomic(&self) -> Arc<ArcSwap<AtomicU8>> {
241 self.connection_mode.clone()
242 }
243
244 pub fn set_account_id(&mut self, account_id: AccountId) {
246 self.account_id = Some(account_id);
247 }
248
249 #[must_use]
251 pub fn account_id(&self) -> Option<AccountId> {
252 self.account_id
253 }
254
255 pub fn cache_instrument(&self, instrument: InstrumentAny) {
259 let symbol = instrument.id().symbol.inner();
260 self.instruments_cache.insert(symbol, instrument.clone());
261
262 if let Ok(cmd_tx) = self.cmd_tx.try_read()
265 && let Err(e) = cmd_tx.send(HandlerCommand::UpdateInstrument(Box::new(instrument)))
266 {
267 tracing::debug!("Failed to send UpdateInstrument command to handler: {e}");
268 }
269 }
270
271 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
275 for instrument in &instruments {
276 self.instruments_cache
277 .insert(instrument.id().symbol.inner(), instrument.clone());
278 }
279
280 if !instruments.is_empty()
283 && let Ok(cmd_tx) = self.cmd_tx.try_read()
284 && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments))
285 {
286 tracing::debug!("Failed to send InitializeInstruments command to handler: {e}");
287 }
288 }
289
290 #[must_use]
292 pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
293 &self.instruments_cache
294 }
295
296 #[must_use]
300 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
301 self.instruments_cache.get(symbol).map(|r| r.clone())
302 }
303
304 pub fn take_receiver(
307 &mut self,
308 ) -> Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>> {
309 self.out_rx.take()
310 }
311
312 pub async fn connect(&mut self) -> DydxWsResult<()> {
321 if self.is_connected() {
322 return Ok(());
323 }
324
325 self.signal.store(false, Ordering::Relaxed);
327
328 let (message_handler, raw_rx) = channel_message_handler();
329
330 let cfg = WebSocketConfig {
331 url: self.url.clone(),
332 headers: vec![],
333 heartbeat: self.heartbeat,
334 heartbeat_msg: None,
335 reconnect_timeout_ms: Some(15_000),
336 reconnect_delay_initial_ms: Some(250),
337 reconnect_delay_max_ms: Some(5_000),
338 reconnect_backoff_factor: Some(2.0),
339 reconnect_jitter_ms: Some(200),
340 reconnect_max_attempts: None,
341 };
342
343 let client = WebSocketClient::connect(
344 cfg,
345 Some(message_handler),
346 None,
347 None,
348 vec![],
349 Some(*DYDX_WS_SUBSCRIPTION_QUOTA),
350 )
351 .await
352 .map_err(|e| DydxWsError::Transport(e.to_string()))?;
353
354 self.connection_mode.store(client.connection_mode_atomic());
356
357 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
359 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
360
361 {
363 let mut guard = self.cmd_tx.write().await;
364 *guard = cmd_tx;
365 }
366 self.out_rx = Some(out_rx);
367
368 if !self.instruments_cache.is_empty() {
370 let cached_instruments: Vec<InstrumentAny> = self
371 .instruments_cache
372 .iter()
373 .map(|entry| entry.value().clone())
374 .collect();
375 let cmd_tx_guard = self.cmd_tx.read().await;
376 if let Err(e) =
377 cmd_tx_guard.send(HandlerCommand::InitializeInstruments(cached_instruments))
378 {
379 tracing::error!("Failed to replay instruments to handler: {e}");
380 }
381 }
382
383 let account_id = self.account_id;
385 let signal = self.signal.clone();
386 let subscriptions = self.subscriptions.clone();
387
388 let handler_task = get_runtime().spawn(async move {
389 let mut handler = FeedHandler::new(
390 account_id,
391 cmd_rx,
392 out_tx,
393 raw_rx,
394 client,
395 signal,
396 subscriptions,
397 );
398 handler.run().await;
399 });
400
401 self.handler_task = Some(handler_task);
402 tracing::info!("Connected dYdX WebSocket: {}", self.url);
403 Ok(())
404 }
405
406 pub async fn disconnect(&mut self) -> DydxWsResult<()> {
412 self.signal.store(true, Ordering::Relaxed);
414
415 self.connection_mode
418 .store(Arc::new(AtomicU8::new(ConnectionMode::Closed as u8)));
419
420 if let Some(handle) = self.handler_task.take() {
422 handle.abort();
423 }
424
425 self.out_rx = None;
427
428 tracing::info!("Disconnected dYdX WebSocket");
429 Ok(())
430 }
431
432 async fn send_text_inner(&self, text: &str) -> DydxWsResult<()> {
434 self.cmd_tx
435 .read()
436 .await
437 .send(HandlerCommand::SendText(text.to_string()))
438 .map_err(|e| {
439 DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
440 })?;
441 Ok(())
442 }
443
444 pub fn send_command(&self, cmd: HandlerCommand) -> DydxWsResult<()> {
450 if let Ok(guard) = self.cmd_tx.try_read() {
451 guard.send(cmd).map_err(|e| {
452 DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
453 })?;
454 } else {
455 return Err(DydxWsError::Transport(
456 "Failed to acquire lock on command channel".to_string(),
457 ));
458 }
459 Ok(())
460 }
461
462 fn ticker_from_instrument_id(instrument_id: &InstrumentId) -> String {
463 let mut s = instrument_id.symbol.as_str().to_string();
464 if let Some(stripped) = s.strip_suffix("-PERP") {
465 s = stripped.to_string();
466 }
467 s
468 }
469
470 fn topic(channel: super::enums::DydxWsChannel, id: Option<&str>) -> String {
471 match id {
472 Some(id) => format!("{}{}{}", channel.as_ref(), DYDX_WS_TOPIC_DELIMITER, id),
473 None => channel.as_ref().to_string(),
474 }
475 }
476
477 async fn send_and_track_subscribe(
478 &self,
479 sub: super::messages::DydxSubscription,
480 topic: &str,
481 ) -> DydxWsResult<()> {
482 self.subscriptions.mark_subscribe(topic);
483 let payload = serde_json::to_string(&sub)?;
484 if let Err(e) = self.send_text_inner(&payload).await {
485 self.subscriptions.mark_failure(topic);
486 self.subscriptions.remove_reference(topic);
487 return Err(e);
488 }
489 Ok(())
490 }
491
492 async fn send_and_track_unsubscribe(
493 &self,
494 sub: super::messages::DydxSubscription,
495 topic: &str,
496 ) -> DydxWsResult<()> {
497 self.subscriptions.mark_unsubscribe(topic);
498 let payload = serde_json::to_string(&sub)?;
499 if let Err(e) = self.send_text_inner(&payload).await {
500 self.subscriptions.add_reference(topic);
502 self.subscriptions.mark_subscribe(topic);
503 return Err(e);
504 }
505 Ok(())
506 }
507
508 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
518 let ticker = Self::ticker_from_instrument_id(&instrument_id);
519 let topic = Self::topic(super::enums::DydxWsChannel::Trades, Some(&ticker));
520 if !self.subscriptions.add_reference(&topic) {
521 return Ok(());
522 }
523
524 let sub = super::messages::DydxSubscription {
525 op: super::enums::DydxWsOperation::Subscribe,
526 channel: super::enums::DydxWsChannel::Trades,
527 id: Some(ticker),
528 };
529
530 self.send_and_track_subscribe(sub, &topic).await
531 }
532
533 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
539 let ticker = Self::ticker_from_instrument_id(&instrument_id);
540 let topic = Self::topic(super::enums::DydxWsChannel::Trades, Some(&ticker));
541 if !self.subscriptions.remove_reference(&topic) {
542 return Ok(());
543 }
544
545 let sub = super::messages::DydxSubscription {
546 op: super::enums::DydxWsOperation::Unsubscribe,
547 channel: super::enums::DydxWsChannel::Trades,
548 id: Some(ticker),
549 };
550
551 self.send_and_track_unsubscribe(sub, &topic).await
552 }
553
554 pub async fn subscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
564 let ticker = Self::ticker_from_instrument_id(&instrument_id);
565 let topic = Self::topic(super::enums::DydxWsChannel::Orderbook, Some(&ticker));
566 if !self.subscriptions.add_reference(&topic) {
567 return Ok(());
568 }
569
570 let sub = super::messages::DydxSubscription {
571 op: super::enums::DydxWsOperation::Subscribe,
572 channel: super::enums::DydxWsChannel::Orderbook,
573 id: Some(ticker),
574 };
575
576 self.send_and_track_subscribe(sub, &topic).await
577 }
578
579 pub async fn unsubscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
585 let ticker = Self::ticker_from_instrument_id(&instrument_id);
586 let topic = Self::topic(super::enums::DydxWsChannel::Orderbook, Some(&ticker));
587 if !self.subscriptions.remove_reference(&topic) {
588 return Ok(());
589 }
590
591 let sub = super::messages::DydxSubscription {
592 op: super::enums::DydxWsOperation::Unsubscribe,
593 channel: super::enums::DydxWsChannel::Orderbook,
594 id: Some(ticker),
595 };
596
597 self.send_and_track_unsubscribe(sub, &topic).await
598 }
599
600 pub async fn subscribe_candles(
610 &self,
611 instrument_id: InstrumentId,
612 resolution: &str,
613 ) -> DydxWsResult<()> {
614 let ticker = Self::ticker_from_instrument_id(&instrument_id);
615 let id = format!("{ticker}/{resolution}");
616 let topic = Self::topic(super::enums::DydxWsChannel::Candles, Some(&id));
617 if !self.subscriptions.add_reference(&topic) {
618 return Ok(());
619 }
620
621 let sub = super::messages::DydxSubscription {
622 op: super::enums::DydxWsOperation::Subscribe,
623 channel: super::enums::DydxWsChannel::Candles,
624 id: Some(id),
625 };
626
627 self.send_and_track_subscribe(sub, &topic).await
628 }
629
630 pub async fn unsubscribe_candles(
636 &self,
637 instrument_id: InstrumentId,
638 resolution: &str,
639 ) -> DydxWsResult<()> {
640 let ticker = Self::ticker_from_instrument_id(&instrument_id);
641 let id = format!("{ticker}/{resolution}");
642 let topic = Self::topic(super::enums::DydxWsChannel::Candles, Some(&id));
643 if !self.subscriptions.remove_reference(&topic) {
644 return Ok(());
645 }
646
647 let sub = super::messages::DydxSubscription {
648 op: super::enums::DydxWsOperation::Unsubscribe,
649 channel: super::enums::DydxWsChannel::Candles,
650 id: Some(id),
651 };
652
653 self.send_and_track_unsubscribe(sub, &topic).await
654 }
655
656 pub async fn subscribe_markets(&self) -> DydxWsResult<()> {
666 let topic = Self::topic(super::enums::DydxWsChannel::Markets, None);
667 if !self.subscriptions.add_reference(&topic) {
668 return Ok(());
669 }
670
671 let sub = super::messages::DydxSubscription {
672 op: super::enums::DydxWsOperation::Subscribe,
673 channel: super::enums::DydxWsChannel::Markets,
674 id: None,
675 };
676
677 self.send_and_track_subscribe(sub, &topic).await
678 }
679
680 pub async fn unsubscribe_markets(&self) -> DydxWsResult<()> {
686 let topic = Self::topic(super::enums::DydxWsChannel::Markets, None);
687 if !self.subscriptions.remove_reference(&topic) {
688 return Ok(());
689 }
690
691 let sub = super::messages::DydxSubscription {
692 op: super::enums::DydxWsOperation::Unsubscribe,
693 channel: super::enums::DydxWsChannel::Markets,
694 id: None,
695 };
696
697 self.send_and_track_unsubscribe(sub, &topic).await
698 }
699
700 pub async fn subscribe_subaccount(
714 &self,
715 address: &str,
716 subaccount_number: u32,
717 ) -> DydxWsResult<()> {
718 if !self.requires_auth {
719 return Err(DydxWsError::Authentication(
720 "Subaccount subscriptions require authentication. Use new_private() to create an authenticated client".to_string(),
721 ));
722 }
723 let id = format!("{address}/{subaccount_number}");
724 let topic = Self::topic(super::enums::DydxWsChannel::Subaccounts, Some(&id));
725 if !self.subscriptions.add_reference(&topic) {
726 return Ok(());
727 }
728
729 let sub = super::messages::DydxSubscription {
730 op: super::enums::DydxWsOperation::Subscribe,
731 channel: super::enums::DydxWsChannel::Subaccounts,
732 id: Some(id),
733 };
734
735 self.send_and_track_subscribe(sub, &topic).await
736 }
737
738 pub async fn unsubscribe_subaccount(
744 &self,
745 address: &str,
746 subaccount_number: u32,
747 ) -> DydxWsResult<()> {
748 let id = format!("{address}/{subaccount_number}");
749 let topic = Self::topic(super::enums::DydxWsChannel::Subaccounts, Some(&id));
750 if !self.subscriptions.remove_reference(&topic) {
751 return Ok(());
752 }
753
754 let sub = super::messages::DydxSubscription {
755 op: super::enums::DydxWsOperation::Unsubscribe,
756 channel: super::enums::DydxWsChannel::Subaccounts,
757 id: Some(id),
758 };
759
760 self.send_and_track_unsubscribe(sub, &topic).await
761 }
762
763 pub async fn subscribe_block_height(&self) -> DydxWsResult<()> {
773 let topic = Self::topic(super::enums::DydxWsChannel::BlockHeight, None);
774 if !self.subscriptions.add_reference(&topic) {
775 return Ok(());
776 }
777
778 let sub = super::messages::DydxSubscription {
779 op: super::enums::DydxWsOperation::Subscribe,
780 channel: super::enums::DydxWsChannel::BlockHeight,
781 id: None,
782 };
783
784 self.send_and_track_subscribe(sub, &topic).await
785 }
786
787 pub async fn unsubscribe_block_height(&self) -> DydxWsResult<()> {
793 let topic = Self::topic(super::enums::DydxWsChannel::BlockHeight, None);
794 if !self.subscriptions.remove_reference(&topic) {
795 return Ok(());
796 }
797
798 let sub = super::messages::DydxSubscription {
799 op: super::enums::DydxWsOperation::Unsubscribe,
800 channel: super::enums::DydxWsChannel::BlockHeight,
801 id: None,
802 };
803
804 self.send_and_track_unsubscribe(sub, &topic).await
805 }
806}