1pub static DYDX_RATE_LIMIT_KEY_SUBSCRIPTION: LazyLock<[Ustr; 1]> =
37 LazyLock::new(|| [Ustr::from("subscription")]);
38
39pub const DYDX_WS_TOPIC_DELIMITER: char = ':';
41
42pub static DYDX_WS_SUBSCRIPTION_QUOTA: LazyLock<Quota> =
44 LazyLock::new(|| Quota::per_second(NonZeroU32::new(2).expect("non-zero")));
45
46use std::{
47 num::NonZeroU32,
48 sync::{
49 Arc, LazyLock,
50 atomic::{AtomicBool, AtomicU8, Ordering},
51 },
52};
53
54use arc_swap::ArcSwap;
55use nautilus_common::live::get_runtime;
56use nautilus_model::{
57 identifiers::{AccountId, InstrumentId},
58 instruments::InstrumentAny,
59};
60use nautilus_network::{
61 mode::ConnectionMode,
62 ratelimiter::quota::Quota,
63 websocket::{
64 AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
65 },
66};
67use ustr::Ustr;
68
69use super::{
70 enums::{DydxWsChannel, DydxWsOperation, NautilusWsMessage},
71 error::{DydxWsError, DydxWsResult},
72 handler::{FeedHandler, HandlerCommand},
73 messages::DydxSubscription,
74};
75use crate::{
76 common::{credential::DydxCredential, instrument_cache::InstrumentCache},
77 execution::encoder::ClientOrderIdEncoder,
78};
79
80#[derive(Debug)]
104#[cfg_attr(
105 feature = "python",
106 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx", from_py_object)
107)]
108pub struct DydxWebSocketClient {
109 url: String,
111 credential: Option<Arc<DydxCredential>>,
113 requires_auth: bool,
115 auth_tracker: AuthTracker,
117 subscriptions: SubscriptionState,
119 connection_mode: Arc<ArcSwap<AtomicU8>>,
121 signal: Arc<AtomicBool>,
123 instrument_cache: Arc<InstrumentCache>,
128 account_id: Option<AccountId>,
130 heartbeat: Option<u64>,
132 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
134 out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
136 handler_task: Option<tokio::task::JoinHandle<()>>,
138 bars_timestamp_on_close: bool,
140 encoder: Arc<ClientOrderIdEncoder>,
142}
143
144impl Clone for DydxWebSocketClient {
145 fn clone(&self) -> Self {
146 Self {
147 url: self.url.clone(),
148 credential: self.credential.clone(),
149 requires_auth: self.requires_auth,
150 auth_tracker: self.auth_tracker.clone(),
151 subscriptions: self.subscriptions.clone(),
152 connection_mode: self.connection_mode.clone(),
153 signal: self.signal.clone(),
154 instrument_cache: self.instrument_cache.clone(),
155 account_id: self.account_id,
156 heartbeat: self.heartbeat,
157 cmd_tx: self.cmd_tx.clone(),
158 out_rx: None, handler_task: None, bars_timestamp_on_close: self.bars_timestamp_on_close,
161 encoder: self.encoder.clone(),
162 }
163 }
164}
165
166impl DydxWebSocketClient {
167 #[must_use]
172 pub fn new_public(url: String, heartbeat: Option<u64>) -> Self {
173 Self::new_public_with_cache(url, Arc::new(InstrumentCache::new()), heartbeat)
174 }
175
176 #[must_use]
180 pub fn new_public_with_cache(
181 url: String,
182 instrument_cache: Arc<InstrumentCache>,
183 heartbeat: Option<u64>,
184 ) -> Self {
185 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
187
188 Self {
189 url,
190 credential: None,
191 requires_auth: false,
192 auth_tracker: AuthTracker::new(),
193 subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
194 connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
195 ConnectionMode::Closed as u8,
196 ))),
197 signal: Arc::new(AtomicBool::new(false)),
198 instrument_cache,
199 account_id: None,
200 heartbeat,
201 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
202 out_rx: None,
203 handler_task: None,
204 bars_timestamp_on_close: true,
205 encoder: Arc::new(ClientOrderIdEncoder::new()),
206 }
207 }
208
209 #[must_use]
214 pub fn new_private(
215 url: String,
216 credential: DydxCredential,
217 account_id: AccountId,
218 heartbeat: Option<u64>,
219 ) -> Self {
220 Self::new_private_with_cache(
221 url,
222 credential,
223 account_id,
224 Arc::new(InstrumentCache::new()),
225 heartbeat,
226 )
227 }
228
229 #[must_use]
233 pub fn new_private_with_cache(
234 url: String,
235 credential: DydxCredential,
236 account_id: AccountId,
237 instrument_cache: Arc<InstrumentCache>,
238 heartbeat: Option<u64>,
239 ) -> Self {
240 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
242
243 Self {
244 url,
245 credential: Some(Arc::new(credential)),
246 requires_auth: true,
247 auth_tracker: AuthTracker::new(),
248 subscriptions: SubscriptionState::new(DYDX_WS_TOPIC_DELIMITER),
249 connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
250 ConnectionMode::Closed as u8,
251 ))),
252 signal: Arc::new(AtomicBool::new(false)),
253 instrument_cache,
254 account_id: Some(account_id),
255 heartbeat,
256 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
257 out_rx: None,
258 handler_task: None,
259 bars_timestamp_on_close: true,
260 encoder: Arc::new(ClientOrderIdEncoder::new()),
261 }
262 }
263
264 #[must_use]
266 pub fn credential(&self) -> Option<&Arc<DydxCredential>> {
267 self.credential.as_ref()
268 }
269
270 #[must_use]
272 pub fn is_connected(&self) -> bool {
273 let mode = self.connection_mode.load();
274 let mode_u8 = mode.load(Ordering::Relaxed);
275 matches!(
276 mode_u8,
277 x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
278 )
279 }
280
281 #[must_use]
283 pub fn url(&self) -> &str {
284 &self.url
285 }
286
287 #[must_use]
291 pub fn connection_mode_atomic(&self) -> Arc<ArcSwap<AtomicU8>> {
292 self.connection_mode.clone()
293 }
294
295 pub fn set_bars_timestamp_on_close(&mut self, value: bool) {
297 self.bars_timestamp_on_close = value;
298 }
299
300 pub fn set_account_id(&mut self, account_id: AccountId) {
302 self.account_id = Some(account_id);
303 }
304
305 #[must_use]
307 pub fn account_id(&self) -> Option<AccountId> {
308 self.account_id
309 }
310
311 pub fn set_instrument_cache(&mut self, cache: Arc<InstrumentCache>) {
317 self.instrument_cache = cache;
318 }
319
320 pub fn cache_instrument(&self, instrument: InstrumentAny) {
325 self.instrument_cache
326 .insert_instrument_only(instrument.clone());
327
328 if let Ok(cmd_tx) = self.cmd_tx.try_read()
331 && let Err(e) = cmd_tx.send(HandlerCommand::UpdateInstrument(Box::new(instrument)))
332 {
333 log::debug!("Failed to send UpdateInstrument command to handler: {e}");
334 }
335 }
336
337 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
342 log::debug!(
343 "Caching {} instruments in WebSocket client",
344 instruments.len()
345 );
346 self.instrument_cache
347 .insert_instruments_only(instruments.clone());
348
349 if !instruments.is_empty()
352 && let Ok(cmd_tx) = self.cmd_tx.try_read()
353 && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments))
354 {
355 log::debug!("Failed to send InitializeInstruments command to handler: {e}");
356 }
357 }
358
359 #[must_use]
361 pub fn instrument_cache(&self) -> &Arc<InstrumentCache> {
362 &self.instrument_cache
363 }
364
365 #[must_use]
367 pub fn encoder(&self) -> &Arc<ClientOrderIdEncoder> {
368 &self.encoder
369 }
370
371 #[must_use]
375 pub fn all_instruments(&self) -> Vec<InstrumentAny> {
376 self.instrument_cache.all_instruments()
377 }
378
379 #[must_use]
381 pub fn cached_instruments_count(&self) -> usize {
382 self.instrument_cache.len()
383 }
384
385 #[must_use]
389 pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
390 self.instrument_cache.get(instrument_id)
391 }
392
393 #[must_use]
397 pub fn get_instrument_by_market(&self, ticker: &str) -> Option<InstrumentAny> {
398 self.instrument_cache.get_by_market(ticker)
399 }
400
401 pub fn take_receiver(
404 &mut self,
405 ) -> Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>> {
406 self.out_rx.take()
407 }
408
409 pub fn stream(
417 &mut self,
418 ) -> impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static {
419 let mut rx = self
420 .out_rx
421 .take()
422 .expect("Message stream receiver already taken or not connected");
423
424 async_stream::stream! {
425 while let Some(msg) = rx.recv().await {
426 yield msg;
427 }
428 }
429 }
430
431 pub async fn connect(&mut self) -> DydxWsResult<()> {
440 if self.is_connected() {
441 return Ok(());
442 }
443
444 self.signal.store(false, Ordering::Relaxed);
446
447 let (message_handler, raw_rx) = channel_message_handler();
448
449 let cfg = WebSocketConfig {
450 url: self.url.clone(),
451 headers: vec![],
452 heartbeat: self.heartbeat,
453 heartbeat_msg: None,
454 reconnect_timeout_ms: Some(15_000),
455 reconnect_delay_initial_ms: Some(250),
456 reconnect_delay_max_ms: Some(5_000),
457 reconnect_backoff_factor: Some(2.0),
458 reconnect_jitter_ms: Some(200),
459 reconnect_max_attempts: None,
460 };
461
462 let client = WebSocketClient::connect(
463 cfg,
464 Some(message_handler),
465 None,
466 None,
467 vec![],
468 Some(*DYDX_WS_SUBSCRIPTION_QUOTA),
469 )
470 .await
471 .map_err(|e| DydxWsError::Transport(e.to_string()))?;
472
473 self.connection_mode.store(client.connection_mode_atomic());
475
476 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
478 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
479
480 {
482 let mut guard = self.cmd_tx.write().await;
483 *guard = cmd_tx;
484 }
485 self.out_rx = Some(out_rx);
486
487 if self.instrument_cache.is_empty() {
489 log::warn!("No cached instruments to replay to WebSocket handler");
490 } else {
491 let cached_instruments = self.instrument_cache.all_instruments();
492 log::debug!(
493 "Replaying {} cached instruments to WebSocket handler",
494 cached_instruments.len()
495 );
496 let cmd_tx_guard = self.cmd_tx.read().await;
497 if let Err(e) =
498 cmd_tx_guard.send(HandlerCommand::InitializeInstruments(cached_instruments))
499 {
500 log::error!("Failed to replay instruments to handler: {e}");
501 }
502 }
503
504 let account_id = self.account_id;
506 let signal = self.signal.clone();
507 let subscriptions = self.subscriptions.clone();
508 let bars_timestamp_on_close = self.bars_timestamp_on_close;
509
510 let handler_task = get_runtime().spawn(async move {
511 let mut handler = FeedHandler::new(
512 account_id,
513 cmd_rx,
514 out_tx,
515 raw_rx,
516 client,
517 signal,
518 subscriptions,
519 bars_timestamp_on_close,
520 );
521 handler.run().await;
522 });
523
524 self.handler_task = Some(handler_task);
525 log::info!("Connected dYdX WebSocket: {}", self.url);
526 Ok(())
527 }
528
529 pub async fn disconnect(&mut self) -> DydxWsResult<()> {
535 self.signal.store(true, Ordering::Relaxed);
537
538 self.connection_mode
541 .store(Arc::new(AtomicU8::new(ConnectionMode::Closed as u8)));
542
543 if let Some(handle) = self.handler_task.take() {
545 handle.abort();
546 }
547
548 self.out_rx = None;
550
551 log::info!("Disconnected dYdX WebSocket");
552 Ok(())
553 }
554
555 async fn send_text_inner(&self, text: &str) -> DydxWsResult<()> {
557 self.cmd_tx
558 .read()
559 .await
560 .send(HandlerCommand::SendText(text.to_string()))
561 .map_err(|e| {
562 DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
563 })?;
564 Ok(())
565 }
566
567 pub fn send_command(&self, cmd: HandlerCommand) -> DydxWsResult<()> {
573 if let Ok(guard) = self.cmd_tx.try_read() {
574 guard.send(cmd).map_err(|e| {
575 DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
576 })?;
577 } else {
578 return Err(DydxWsError::Transport(
579 "Failed to acquire lock on command channel".to_string(),
580 ));
581 }
582 Ok(())
583 }
584
585 fn ticker_from_instrument_id(instrument_id: &InstrumentId) -> String {
586 let mut s = instrument_id.symbol.as_str().to_string();
587 if let Some(stripped) = s.strip_suffix("-PERP") {
588 s = stripped.to_string();
589 }
590 s
591 }
592
593 fn topic(channel: DydxWsChannel, id: Option<&str>) -> String {
594 match id {
595 Some(id) => format!("{}{}{}", channel.as_ref(), DYDX_WS_TOPIC_DELIMITER, id),
596 None => channel.as_ref().to_string(),
597 }
598 }
599
600 async fn send_and_track_subscribe(
601 &self,
602 sub: DydxSubscription,
603 topic: &str,
604 ) -> DydxWsResult<()> {
605 self.subscriptions.mark_subscribe(topic);
606
607 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
608 let _ = cmd_tx.send(HandlerCommand::RegisterSubscription {
609 topic: topic.to_string(),
610 subscription: sub.clone(),
611 });
612 }
613
614 let payload = serde_json::to_string(&sub)?;
615 if let Err(e) = self.send_text_inner(&payload).await {
616 self.subscriptions.mark_failure(topic);
617 self.subscriptions.remove_reference(topic);
618 return Err(e);
619 }
620 Ok(())
621 }
622
623 async fn send_and_track_unsubscribe(
624 &self,
625 sub: DydxSubscription,
626 topic: &str,
627 ) -> DydxWsResult<()> {
628 self.subscriptions.mark_unsubscribe(topic);
629
630 let payload = serde_json::to_string(&sub)?;
631 if let Err(e) = self.send_text_inner(&payload).await {
632 self.subscriptions.add_reference(topic);
633 self.subscriptions.mark_subscribe(topic);
634 return Err(e);
635 }
636
637 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
638 let _ = cmd_tx.send(HandlerCommand::UnregisterSubscription {
639 topic: topic.to_string(),
640 });
641 }
642
643 Ok(())
644 }
645
646 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
656 let ticker = Self::ticker_from_instrument_id(&instrument_id);
657 let topic = Self::topic(DydxWsChannel::Trades, Some(&ticker));
658 if !self.subscriptions.add_reference(&topic) {
659 return Ok(());
660 }
661
662 let sub = DydxSubscription {
663 op: DydxWsOperation::Subscribe,
664 channel: DydxWsChannel::Trades,
665 id: Some(ticker),
666 };
667
668 self.send_and_track_subscribe(sub, &topic).await
669 }
670
671 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
677 let ticker = Self::ticker_from_instrument_id(&instrument_id);
678 let topic = Self::topic(DydxWsChannel::Trades, Some(&ticker));
679 if !self.subscriptions.remove_reference(&topic) {
680 return Ok(());
681 }
682
683 let sub = DydxSubscription {
684 op: DydxWsOperation::Unsubscribe,
685 channel: DydxWsChannel::Trades,
686 id: Some(ticker),
687 };
688
689 self.send_and_track_unsubscribe(sub, &topic).await
690 }
691
692 pub async fn subscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
702 let ticker = Self::ticker_from_instrument_id(&instrument_id);
703 let topic = Self::topic(DydxWsChannel::Orderbook, Some(&ticker));
704 if !self.subscriptions.add_reference(&topic) {
705 return Ok(());
706 }
707
708 let sub = DydxSubscription {
709 op: DydxWsOperation::Subscribe,
710 channel: DydxWsChannel::Orderbook,
711 id: Some(ticker),
712 };
713
714 self.send_and_track_subscribe(sub, &topic).await
715 }
716
717 pub async fn unsubscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
723 let ticker = Self::ticker_from_instrument_id(&instrument_id);
724 let topic = Self::topic(DydxWsChannel::Orderbook, Some(&ticker));
725 if !self.subscriptions.remove_reference(&topic) {
726 return Ok(());
727 }
728
729 let sub = DydxSubscription {
730 op: DydxWsOperation::Unsubscribe,
731 channel: DydxWsChannel::Orderbook,
732 id: Some(ticker),
733 };
734
735 self.send_and_track_unsubscribe(sub, &topic).await
736 }
737
738 pub async fn subscribe_candles(
748 &self,
749 instrument_id: InstrumentId,
750 resolution: &str,
751 ) -> DydxWsResult<()> {
752 let ticker = Self::ticker_from_instrument_id(&instrument_id);
753 let id = format!("{ticker}/{resolution}");
754 let topic = Self::topic(DydxWsChannel::Candles, Some(&id));
755 if !self.subscriptions.add_reference(&topic) {
756 return Ok(());
757 }
758
759 let sub = DydxSubscription {
760 op: DydxWsOperation::Subscribe,
761 channel: DydxWsChannel::Candles,
762 id: Some(id),
763 };
764
765 self.send_and_track_subscribe(sub, &topic).await
766 }
767
768 pub async fn unsubscribe_candles(
774 &self,
775 instrument_id: InstrumentId,
776 resolution: &str,
777 ) -> DydxWsResult<()> {
778 let ticker = Self::ticker_from_instrument_id(&instrument_id);
779 let id = format!("{ticker}/{resolution}");
780 let topic = Self::topic(DydxWsChannel::Candles, Some(&id));
781 if !self.subscriptions.remove_reference(&topic) {
782 return Ok(());
783 }
784
785 let sub = DydxSubscription {
786 op: DydxWsOperation::Unsubscribe,
787 channel: DydxWsChannel::Candles,
788 id: Some(id),
789 };
790
791 self.send_and_track_unsubscribe(sub, &topic).await
792 }
793
794 pub async fn subscribe_markets(&self) -> DydxWsResult<()> {
804 let topic = Self::topic(DydxWsChannel::Markets, None);
805 if !self.subscriptions.add_reference(&topic) {
806 return Ok(());
807 }
808
809 let sub = DydxSubscription {
810 op: DydxWsOperation::Subscribe,
811 channel: DydxWsChannel::Markets,
812 id: None,
813 };
814
815 self.send_and_track_subscribe(sub, &topic).await
816 }
817
818 pub async fn unsubscribe_markets(&self) -> DydxWsResult<()> {
824 let topic = Self::topic(DydxWsChannel::Markets, None);
825 if !self.subscriptions.remove_reference(&topic) {
826 return Ok(());
827 }
828
829 let sub = DydxSubscription {
830 op: DydxWsOperation::Unsubscribe,
831 channel: DydxWsChannel::Markets,
832 id: None,
833 };
834
835 self.send_and_track_unsubscribe(sub, &topic).await
836 }
837
838 pub async fn subscribe_subaccount(
852 &self,
853 address: &str,
854 subaccount_number: u32,
855 ) -> DydxWsResult<()> {
856 if !self.requires_auth {
857 return Err(DydxWsError::Authentication(
858 "Subaccount subscriptions require authentication. Use new_private() to create an authenticated client".to_string(),
859 ));
860 }
861 let id = format!("{address}/{subaccount_number}");
862 let topic = Self::topic(DydxWsChannel::Subaccounts, Some(&id));
863 if !self.subscriptions.add_reference(&topic) {
864 return Ok(());
865 }
866
867 let sub = DydxSubscription {
868 op: DydxWsOperation::Subscribe,
869 channel: DydxWsChannel::Subaccounts,
870 id: Some(id),
871 };
872
873 self.send_and_track_subscribe(sub, &topic).await
874 }
875
876 pub async fn unsubscribe_subaccount(
882 &self,
883 address: &str,
884 subaccount_number: u32,
885 ) -> DydxWsResult<()> {
886 let id = format!("{address}/{subaccount_number}");
887 let topic = Self::topic(DydxWsChannel::Subaccounts, Some(&id));
888 if !self.subscriptions.remove_reference(&topic) {
889 return Ok(());
890 }
891
892 let sub = DydxSubscription {
893 op: DydxWsOperation::Unsubscribe,
894 channel: DydxWsChannel::Subaccounts,
895 id: Some(id),
896 };
897
898 self.send_and_track_unsubscribe(sub, &topic).await
899 }
900
901 pub async fn subscribe_block_height(&self) -> DydxWsResult<()> {
911 let topic = Self::topic(DydxWsChannel::BlockHeight, None);
912 if !self.subscriptions.add_reference(&topic) {
913 return Ok(());
914 }
915
916 let sub = DydxSubscription {
917 op: DydxWsOperation::Subscribe,
918 channel: DydxWsChannel::BlockHeight,
919 id: None,
920 };
921
922 self.send_and_track_subscribe(sub, &topic).await
923 }
924
925 pub async fn unsubscribe_block_height(&self) -> DydxWsResult<()> {
931 let topic = Self::topic(DydxWsChannel::BlockHeight, None);
932 if !self.subscriptions.remove_reference(&topic) {
933 return Ok(());
934 }
935
936 let sub = DydxSubscription {
937 op: DydxWsOperation::Unsubscribe,
938 channel: DydxWsChannel::BlockHeight,
939 id: None,
940 };
941
942 self.send_and_track_unsubscribe(sub, &topic).await
943 }
944}