1pub const DYDX_RATE_LIMIT_KEY_SUBSCRIPTION: &str = "subscription";
37
38pub const DYDX_WS_TOPIC_DELIMITER: char = ':';
40
41pub static DYDX_WS_SUBSCRIPTION_QUOTA: LazyLock<Quota> =
43 LazyLock::new(|| Quota::per_second(NonZeroU32::new(2).expect("non-zero")));
44
45use std::{
46 num::NonZeroU32,
47 sync::{
48 Arc, LazyLock,
49 atomic::{AtomicBool, AtomicU8, Ordering},
50 },
51};
52
53use arc_swap::ArcSwap;
54use dashmap::DashMap;
55use nautilus_common::live::get_runtime;
56use nautilus_model::{
57 identifiers::{AccountId, InstrumentId},
58 instruments::{Instrument, InstrumentAny},
59};
60use nautilus_network::{
61 mode::ConnectionMode,
62 ratelimiter::quota::Quota,
63 websocket::{
64 AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
65 },
66};
67use ustr::Ustr;
68
69use super::{
70 enums::NautilusWsMessage,
71 error::{DydxWsError, DydxWsResult},
72 handler::{FeedHandler, HandlerCommand},
73};
74use crate::common::credential::DydxCredential;
75
76#[derive(Debug)]
100#[cfg_attr(
101 feature = "python",
102 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx")
103)]
104pub struct DydxWebSocketClient {
105 url: String,
107 credential: Option<Arc<DydxCredential>>,
109 requires_auth: bool,
111 auth_tracker: AuthTracker,
113 subscriptions: SubscriptionState,
115 connection_mode: Arc<ArcSwap<AtomicU8>>,
117 signal: Arc<AtomicBool>,
119 instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
121 account_id: Option<AccountId>,
123 heartbeat: Option<u64>,
125 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
127 out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
129 handler_task: Option<tokio::task::JoinHandle<()>>,
131}
132
133impl Clone for DydxWebSocketClient {
134 fn clone(&self) -> Self {
135 Self {
136 url: self.url.clone(),
137 credential: self.credential.clone(),
138 requires_auth: self.requires_auth,
139 auth_tracker: self.auth_tracker.clone(),
140 subscriptions: self.subscriptions.clone(),
141 connection_mode: self.connection_mode.clone(),
142 signal: self.signal.clone(),
143 instruments_cache: self.instruments_cache.clone(),
144 account_id: self.account_id,
145 heartbeat: self.heartbeat,
146 cmd_tx: self.cmd_tx.clone(),
147 out_rx: None, handler_task: None, }
150 }
151}
152
153impl DydxWebSocketClient {
154 #[must_use]
156 pub fn new_public(url: String, _heartbeat: Option<u64>) -> Self {
157 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
159
160 Self {
161 url,
162 credential: None,
163 requires_auth: false,
164 auth_tracker: AuthTracker::new(),
165 subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
166 connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
167 ConnectionMode::Closed as u8,
168 ))),
169 signal: Arc::new(AtomicBool::new(false)),
170 instruments_cache: Arc::new(DashMap::new()),
171 account_id: None,
172 heartbeat: _heartbeat,
173 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
174 out_rx: None,
175 handler_task: None,
176 }
177 }
178
179 #[must_use]
181 pub fn new_private(
182 url: String,
183 credential: DydxCredential,
184 account_id: AccountId,
185 _heartbeat: Option<u64>,
186 ) -> Self {
187 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
189
190 Self {
191 url,
192 credential: Some(Arc::new(credential)),
193 requires_auth: true,
194 auth_tracker: AuthTracker::new(),
195 subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
196 connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
197 ConnectionMode::Closed as u8,
198 ))),
199 signal: Arc::new(AtomicBool::new(false)),
200 instruments_cache: Arc::new(DashMap::new()),
201 account_id: Some(account_id),
202 heartbeat: _heartbeat,
203 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
204 out_rx: None,
205 handler_task: None,
206 }
207 }
208
209 #[must_use]
211 pub fn credential(&self) -> Option<&Arc<DydxCredential>> {
212 self.credential.as_ref()
213 }
214
215 #[must_use]
217 pub fn is_connected(&self) -> bool {
218 let mode = self.connection_mode.load();
219 let mode_u8 = mode.load(Ordering::Relaxed);
220 matches!(
221 mode_u8,
222 x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
223 )
224 }
225
226 #[must_use]
228 pub fn url(&self) -> &str {
229 &self.url
230 }
231
232 #[must_use]
236 pub fn connection_mode_atomic(&self) -> Arc<ArcSwap<AtomicU8>> {
237 self.connection_mode.clone()
238 }
239
240 pub fn set_account_id(&mut self, account_id: AccountId) {
242 self.account_id = Some(account_id);
243 }
244
245 #[must_use]
247 pub fn account_id(&self) -> Option<AccountId> {
248 self.account_id
249 }
250
251 pub fn cache_instrument(&self, instrument: InstrumentAny) {
255 let symbol = instrument.id().symbol.inner();
256 self.instruments_cache.insert(symbol, instrument.clone());
257
258 if let Ok(cmd_tx) = self.cmd_tx.try_read()
261 && let Err(e) = cmd_tx.send(HandlerCommand::UpdateInstrument(Box::new(instrument)))
262 {
263 log::debug!("Failed to send UpdateInstrument command to handler: {e}");
264 }
265 }
266
267 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
271 for instrument in &instruments {
272 self.instruments_cache
273 .insert(instrument.id().symbol.inner(), instrument.clone());
274 }
275
276 if !instruments.is_empty()
279 && let Ok(cmd_tx) = self.cmd_tx.try_read()
280 && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments))
281 {
282 log::debug!("Failed to send InitializeInstruments command to handler: {e}");
283 }
284 }
285
286 #[must_use]
288 pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
289 &self.instruments_cache
290 }
291
292 #[must_use]
296 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
297 self.instruments_cache.get(symbol).map(|r| r.clone())
298 }
299
300 pub fn take_receiver(
303 &mut self,
304 ) -> Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>> {
305 self.out_rx.take()
306 }
307
308 pub async fn connect(&mut self) -> DydxWsResult<()> {
317 if self.is_connected() {
318 return Ok(());
319 }
320
321 self.signal.store(false, Ordering::Relaxed);
323
324 let (message_handler, raw_rx) = channel_message_handler();
325
326 let cfg = WebSocketConfig {
327 url: self.url.clone(),
328 headers: vec![],
329 heartbeat: self.heartbeat,
330 heartbeat_msg: None,
331 reconnect_timeout_ms: Some(15_000),
332 reconnect_delay_initial_ms: Some(250),
333 reconnect_delay_max_ms: Some(5_000),
334 reconnect_backoff_factor: Some(2.0),
335 reconnect_jitter_ms: Some(200),
336 reconnect_max_attempts: None,
337 };
338
339 let client = WebSocketClient::connect(
340 cfg,
341 Some(message_handler),
342 None,
343 None,
344 vec![],
345 Some(*DYDX_WS_SUBSCRIPTION_QUOTA),
346 )
347 .await
348 .map_err(|e| DydxWsError::Transport(e.to_string()))?;
349
350 self.connection_mode.store(client.connection_mode_atomic());
352
353 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
355 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
356
357 {
359 let mut guard = self.cmd_tx.write().await;
360 *guard = cmd_tx;
361 }
362 self.out_rx = Some(out_rx);
363
364 if !self.instruments_cache.is_empty() {
366 let cached_instruments: Vec<InstrumentAny> = self
367 .instruments_cache
368 .iter()
369 .map(|entry| entry.value().clone())
370 .collect();
371 let cmd_tx_guard = self.cmd_tx.read().await;
372 if let Err(e) =
373 cmd_tx_guard.send(HandlerCommand::InitializeInstruments(cached_instruments))
374 {
375 log::error!("Failed to replay instruments to handler: {e}");
376 }
377 }
378
379 let account_id = self.account_id;
381 let signal = self.signal.clone();
382 let subscriptions = self.subscriptions.clone();
383
384 let handler_task = get_runtime().spawn(async move {
385 let mut handler = FeedHandler::new(
386 account_id,
387 cmd_rx,
388 out_tx,
389 raw_rx,
390 client,
391 signal,
392 subscriptions,
393 );
394 handler.run().await;
395 });
396
397 self.handler_task = Some(handler_task);
398 log::info!("Connected dYdX WebSocket: {}", self.url);
399 Ok(())
400 }
401
402 pub async fn disconnect(&mut self) -> DydxWsResult<()> {
408 self.signal.store(true, Ordering::Relaxed);
410
411 self.connection_mode
414 .store(Arc::new(AtomicU8::new(ConnectionMode::Closed as u8)));
415
416 if let Some(handle) = self.handler_task.take() {
418 handle.abort();
419 }
420
421 self.out_rx = None;
423
424 log::info!("Disconnected dYdX WebSocket");
425 Ok(())
426 }
427
428 async fn send_text_inner(&self, text: &str) -> DydxWsResult<()> {
430 self.cmd_tx
431 .read()
432 .await
433 .send(HandlerCommand::SendText(text.to_string()))
434 .map_err(|e| {
435 DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
436 })?;
437 Ok(())
438 }
439
440 pub fn send_command(&self, cmd: HandlerCommand) -> DydxWsResult<()> {
446 if let Ok(guard) = self.cmd_tx.try_read() {
447 guard.send(cmd).map_err(|e| {
448 DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
449 })?;
450 } else {
451 return Err(DydxWsError::Transport(
452 "Failed to acquire lock on command channel".to_string(),
453 ));
454 }
455 Ok(())
456 }
457
458 fn ticker_from_instrument_id(instrument_id: &InstrumentId) -> String {
459 let mut s = instrument_id.symbol.as_str().to_string();
460 if let Some(stripped) = s.strip_suffix("-PERP") {
461 s = stripped.to_string();
462 }
463 s
464 }
465
466 fn topic(channel: super::enums::DydxWsChannel, id: Option<&str>) -> String {
467 match id {
468 Some(id) => format!("{}{}{}", channel.as_ref(), DYDX_WS_TOPIC_DELIMITER, id),
469 None => channel.as_ref().to_string(),
470 }
471 }
472
473 async fn send_and_track_subscribe(
474 &self,
475 sub: super::messages::DydxSubscription,
476 topic: &str,
477 ) -> DydxWsResult<()> {
478 self.subscriptions.mark_subscribe(topic);
479 let payload = serde_json::to_string(&sub)?;
480 if let Err(e) = self.send_text_inner(&payload).await {
481 self.subscriptions.mark_failure(topic);
482 self.subscriptions.remove_reference(topic);
483 return Err(e);
484 }
485 Ok(())
486 }
487
488 async fn send_and_track_unsubscribe(
489 &self,
490 sub: super::messages::DydxSubscription,
491 topic: &str,
492 ) -> DydxWsResult<()> {
493 self.subscriptions.mark_unsubscribe(topic);
494 let payload = serde_json::to_string(&sub)?;
495 if let Err(e) = self.send_text_inner(&payload).await {
496 self.subscriptions.add_reference(topic);
498 self.subscriptions.mark_subscribe(topic);
499 return Err(e);
500 }
501 Ok(())
502 }
503
504 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
514 let ticker = Self::ticker_from_instrument_id(&instrument_id);
515 let topic = Self::topic(super::enums::DydxWsChannel::Trades, Some(&ticker));
516 if !self.subscriptions.add_reference(&topic) {
517 return Ok(());
518 }
519
520 let sub = super::messages::DydxSubscription {
521 op: super::enums::DydxWsOperation::Subscribe,
522 channel: super::enums::DydxWsChannel::Trades,
523 id: Some(ticker),
524 };
525
526 self.send_and_track_subscribe(sub, &topic).await
527 }
528
529 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
535 let ticker = Self::ticker_from_instrument_id(&instrument_id);
536 let topic = Self::topic(super::enums::DydxWsChannel::Trades, Some(&ticker));
537 if !self.subscriptions.remove_reference(&topic) {
538 return Ok(());
539 }
540
541 let sub = super::messages::DydxSubscription {
542 op: super::enums::DydxWsOperation::Unsubscribe,
543 channel: super::enums::DydxWsChannel::Trades,
544 id: Some(ticker),
545 };
546
547 self.send_and_track_unsubscribe(sub, &topic).await
548 }
549
550 pub async fn subscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
560 let ticker = Self::ticker_from_instrument_id(&instrument_id);
561 let topic = Self::topic(super::enums::DydxWsChannel::Orderbook, Some(&ticker));
562 if !self.subscriptions.add_reference(&topic) {
563 return Ok(());
564 }
565
566 let sub = super::messages::DydxSubscription {
567 op: super::enums::DydxWsOperation::Subscribe,
568 channel: super::enums::DydxWsChannel::Orderbook,
569 id: Some(ticker),
570 };
571
572 self.send_and_track_subscribe(sub, &topic).await
573 }
574
575 pub async fn unsubscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
581 let ticker = Self::ticker_from_instrument_id(&instrument_id);
582 let topic = Self::topic(super::enums::DydxWsChannel::Orderbook, Some(&ticker));
583 if !self.subscriptions.remove_reference(&topic) {
584 return Ok(());
585 }
586
587 let sub = super::messages::DydxSubscription {
588 op: super::enums::DydxWsOperation::Unsubscribe,
589 channel: super::enums::DydxWsChannel::Orderbook,
590 id: Some(ticker),
591 };
592
593 self.send_and_track_unsubscribe(sub, &topic).await
594 }
595
596 pub async fn subscribe_candles(
606 &self,
607 instrument_id: InstrumentId,
608 resolution: &str,
609 ) -> DydxWsResult<()> {
610 let ticker = Self::ticker_from_instrument_id(&instrument_id);
611 let id = format!("{ticker}/{resolution}");
612 let topic = Self::topic(super::enums::DydxWsChannel::Candles, Some(&id));
613 if !self.subscriptions.add_reference(&topic) {
614 return Ok(());
615 }
616
617 let sub = super::messages::DydxSubscription {
618 op: super::enums::DydxWsOperation::Subscribe,
619 channel: super::enums::DydxWsChannel::Candles,
620 id: Some(id),
621 };
622
623 self.send_and_track_subscribe(sub, &topic).await
624 }
625
626 pub async fn unsubscribe_candles(
632 &self,
633 instrument_id: InstrumentId,
634 resolution: &str,
635 ) -> DydxWsResult<()> {
636 let ticker = Self::ticker_from_instrument_id(&instrument_id);
637 let id = format!("{ticker}/{resolution}");
638 let topic = Self::topic(super::enums::DydxWsChannel::Candles, Some(&id));
639 if !self.subscriptions.remove_reference(&topic) {
640 return Ok(());
641 }
642
643 let sub = super::messages::DydxSubscription {
644 op: super::enums::DydxWsOperation::Unsubscribe,
645 channel: super::enums::DydxWsChannel::Candles,
646 id: Some(id),
647 };
648
649 self.send_and_track_unsubscribe(sub, &topic).await
650 }
651
652 pub async fn subscribe_markets(&self) -> DydxWsResult<()> {
662 let topic = Self::topic(super::enums::DydxWsChannel::Markets, None);
663 if !self.subscriptions.add_reference(&topic) {
664 return Ok(());
665 }
666
667 let sub = super::messages::DydxSubscription {
668 op: super::enums::DydxWsOperation::Subscribe,
669 channel: super::enums::DydxWsChannel::Markets,
670 id: None,
671 };
672
673 self.send_and_track_subscribe(sub, &topic).await
674 }
675
676 pub async fn unsubscribe_markets(&self) -> DydxWsResult<()> {
682 let topic = Self::topic(super::enums::DydxWsChannel::Markets, None);
683 if !self.subscriptions.remove_reference(&topic) {
684 return Ok(());
685 }
686
687 let sub = super::messages::DydxSubscription {
688 op: super::enums::DydxWsOperation::Unsubscribe,
689 channel: super::enums::DydxWsChannel::Markets,
690 id: None,
691 };
692
693 self.send_and_track_unsubscribe(sub, &topic).await
694 }
695
696 pub async fn subscribe_subaccount(
710 &self,
711 address: &str,
712 subaccount_number: u32,
713 ) -> DydxWsResult<()> {
714 if !self.requires_auth {
715 return Err(DydxWsError::Authentication(
716 "Subaccount subscriptions require authentication. Use new_private() to create an authenticated client".to_string(),
717 ));
718 }
719 let id = format!("{address}/{subaccount_number}");
720 let topic = Self::topic(super::enums::DydxWsChannel::Subaccounts, Some(&id));
721 if !self.subscriptions.add_reference(&topic) {
722 return Ok(());
723 }
724
725 let sub = super::messages::DydxSubscription {
726 op: super::enums::DydxWsOperation::Subscribe,
727 channel: super::enums::DydxWsChannel::Subaccounts,
728 id: Some(id),
729 };
730
731 self.send_and_track_subscribe(sub, &topic).await
732 }
733
734 pub async fn unsubscribe_subaccount(
740 &self,
741 address: &str,
742 subaccount_number: u32,
743 ) -> DydxWsResult<()> {
744 let id = format!("{address}/{subaccount_number}");
745 let topic = Self::topic(super::enums::DydxWsChannel::Subaccounts, Some(&id));
746 if !self.subscriptions.remove_reference(&topic) {
747 return Ok(());
748 }
749
750 let sub = super::messages::DydxSubscription {
751 op: super::enums::DydxWsOperation::Unsubscribe,
752 channel: super::enums::DydxWsChannel::Subaccounts,
753 id: Some(id),
754 };
755
756 self.send_and_track_unsubscribe(sub, &topic).await
757 }
758
759 pub async fn subscribe_block_height(&self) -> DydxWsResult<()> {
769 let topic = Self::topic(super::enums::DydxWsChannel::BlockHeight, None);
770 if !self.subscriptions.add_reference(&topic) {
771 return Ok(());
772 }
773
774 let sub = super::messages::DydxSubscription {
775 op: super::enums::DydxWsOperation::Subscribe,
776 channel: super::enums::DydxWsChannel::BlockHeight,
777 id: None,
778 };
779
780 self.send_and_track_subscribe(sub, &topic).await
781 }
782
783 pub async fn unsubscribe_block_height(&self) -> DydxWsResult<()> {
789 let topic = Self::topic(super::enums::DydxWsChannel::BlockHeight, None);
790 if !self.subscriptions.remove_reference(&topic) {
791 return Ok(());
792 }
793
794 let sub = super::messages::DydxSubscription {
795 op: super::enums::DydxWsOperation::Unsubscribe,
796 channel: super::enums::DydxWsChannel::BlockHeight,
797 id: None,
798 };
799
800 self.send_and_track_unsubscribe(sub, &topic).await
801 }
802}