1use std::sync::{
33 Arc,
34 atomic::{AtomicBool, AtomicU8, Ordering},
35};
36
37use arc_swap::ArcSwap;
38use dashmap::DashMap;
39use nautilus_model::{
40 identifiers::{AccountId, InstrumentId},
41 instruments::{Instrument, InstrumentAny},
42};
43use nautilus_network::{
44 mode::ConnectionMode,
45 websocket::{
46 AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
47 },
48};
49use ustr::Ustr;
50
51use super::{
52 error::{DydxWsError, DydxWsResult},
53 handler::{FeedHandler, HandlerCommand},
54 messages::NautilusWsMessage,
55};
56use crate::common::credential::DydxCredential;
57
58#[derive(Debug)]
82#[cfg_attr(
83 feature = "python",
84 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx")
85)]
86pub struct DydxWebSocketClient {
87 url: String,
89 credential: Option<Arc<DydxCredential>>,
91 requires_auth: bool,
93 #[allow(dead_code)]
95 auth_tracker: AuthTracker,
96 #[allow(dead_code)]
98 subscriptions: SubscriptionState,
99 connection_mode: Arc<ArcSwap<AtomicU8>>,
101 signal: Arc<AtomicBool>,
103 instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
105 account_id: Option<AccountId>,
107 heartbeat: Option<u64>,
109 cmd_tx: Arc<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>,
111 out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
113 handler_task: Option<tokio::task::JoinHandle<()>>,
115}
116
117impl Clone for DydxWebSocketClient {
118 fn clone(&self) -> Self {
119 Self {
120 url: self.url.clone(),
121 credential: self.credential.clone(),
122 requires_auth: self.requires_auth,
123 auth_tracker: AuthTracker::new(),
124 subscriptions: SubscriptionState::new(':'),
125 connection_mode: self.connection_mode.clone(),
126 signal: self.signal.clone(),
127 instruments_cache: self.instruments_cache.clone(),
128 account_id: self.account_id,
129 heartbeat: self.heartbeat,
130 cmd_tx: self.cmd_tx.clone(),
131 out_rx: None, handler_task: None, }
134 }
135}
136
137impl DydxWebSocketClient {
138 #[must_use]
140 pub fn new_public(url: String, _heartbeat: Option<u64>) -> Self {
141 use std::sync::atomic::AtomicU8;
142
143 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
145
146 Self {
147 url,
148 credential: None,
149 requires_auth: false,
150 auth_tracker: AuthTracker::new(),
151 subscriptions: SubscriptionState::new(':'), connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
153 ConnectionMode::Closed as u8,
154 ))),
155 signal: Arc::new(AtomicBool::new(false)),
156 instruments_cache: Arc::new(DashMap::new()),
157 account_id: None,
158 heartbeat: _heartbeat,
159 cmd_tx: Arc::new(cmd_tx),
160 out_rx: None,
161 handler_task: None,
162 }
163 }
164
165 #[must_use]
167 pub fn new_private(
168 url: String,
169 credential: DydxCredential,
170 account_id: AccountId,
171 _heartbeat: Option<u64>,
172 ) -> Self {
173 use std::sync::atomic::AtomicU8;
174
175 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
177
178 Self {
179 url,
180 credential: Some(Arc::new(credential)),
181 requires_auth: true,
182 auth_tracker: AuthTracker::new(),
183 subscriptions: SubscriptionState::new(':'), connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
185 ConnectionMode::Closed as u8,
186 ))),
187 signal: Arc::new(AtomicBool::new(false)),
188 instruments_cache: Arc::new(DashMap::new()),
189 account_id: Some(account_id),
190 heartbeat: _heartbeat,
191 cmd_tx: Arc::new(cmd_tx),
192 out_rx: None,
193 handler_task: None,
194 }
195 }
196
197 #[must_use]
199 pub fn credential(&self) -> Option<&Arc<DydxCredential>> {
200 self.credential.as_ref()
201 }
202
203 #[must_use]
205 pub fn is_connected(&self) -> bool {
206 let mode = self.connection_mode.load();
207 let mode_u8 = mode.load(Ordering::Relaxed);
208 matches!(
209 mode_u8,
210 x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
211 )
212 }
213
214 #[must_use]
216 pub fn url(&self) -> &str {
217 &self.url
218 }
219
220 #[must_use]
224 pub fn connection_mode_atomic(&self) -> Arc<ArcSwap<AtomicU8>> {
225 self.connection_mode.clone()
226 }
227
228 pub fn set_account_id(&mut self, account_id: AccountId) {
230 self.account_id = Some(account_id);
231 }
232
233 #[must_use]
235 pub fn account_id(&self) -> Option<AccountId> {
236 self.account_id
237 }
238
239 pub fn cache_instrument(&self, instrument: InstrumentAny) {
243 let symbol = instrument.id().symbol.inner();
244 self.instruments_cache.insert(symbol, instrument.clone());
245
246 if let Err(e) = self
248 .cmd_tx
249 .send(HandlerCommand::UpdateInstrument(Box::new(instrument)))
250 {
251 tracing::debug!("Failed to send UpdateInstrument command to handler: {e}");
252 }
253 }
254
255 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
259 for instrument in &instruments {
260 self.instruments_cache
261 .insert(instrument.id().symbol.inner(), instrument.clone());
262 }
263
264 if let Err(e) = self
266 .cmd_tx
267 .send(HandlerCommand::InitializeInstruments(instruments))
268 {
269 tracing::debug!("Failed to send InitializeInstruments command to handler: {e}");
270 }
271 }
272
273 #[must_use]
275 pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
276 &self.instruments_cache
277 }
278
279 #[must_use]
283 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
284 self.instruments_cache.get(symbol).map(|r| r.clone())
285 }
286
287 pub fn take_receiver(
290 &mut self,
291 ) -> Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>> {
292 self.out_rx.take()
293 }
294
295 pub async fn connect(&mut self) -> DydxWsResult<()> {
304 if self.is_connected() {
305 return Ok(());
306 }
307
308 let (message_handler, raw_rx) = channel_message_handler();
309
310 let cfg = WebSocketConfig {
311 url: self.url.clone(),
312 headers: vec![],
313 message_handler: Some(message_handler),
314 heartbeat: self.heartbeat,
315 heartbeat_msg: None,
316 ping_handler: None,
317 reconnect_timeout_ms: Some(15_000),
318 reconnect_delay_initial_ms: Some(250),
319 reconnect_delay_max_ms: Some(5_000),
320 reconnect_backoff_factor: Some(2.0),
321 reconnect_jitter_ms: Some(200),
322 reconnect_max_attempts: None,
323 };
324
325 let client = WebSocketClient::connect(cfg, None, vec![], None)
326 .await
327 .map_err(|e| DydxWsError::Transport(e.to_string()))?;
328
329 self.connection_mode.store(client.connection_mode_atomic());
331
332 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
334 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
335
336 self.cmd_tx = Arc::new(cmd_tx.clone());
337 self.out_rx = Some(out_rx);
338
339 if !self.instruments_cache.is_empty() {
341 let cached_instruments: Vec<InstrumentAny> = self
342 .instruments_cache
343 .iter()
344 .map(|entry| entry.value().clone())
345 .collect();
346 if let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(cached_instruments)) {
347 tracing::error!("Failed to replay instruments to handler: {e}");
348 }
349 }
350
351 let account_id = self.account_id;
353 let signal = self.signal.clone();
354
355 let handler_task = tokio::spawn(async move {
356 let mut handler = FeedHandler::new(account_id, cmd_rx, out_tx, raw_rx, client, signal);
357 handler.run().await;
358 });
359
360 self.handler_task = Some(handler_task);
361 tracing::info!("Connected dYdX WebSocket: {}", self.url);
362 Ok(())
363 }
364
365 pub async fn disconnect(&mut self) -> DydxWsResult<()> {
371 self.signal.store(true, Ordering::Relaxed);
373
374 if let Some(handle) = self.handler_task.take() {
376 handle.abort();
377 }
378
379 self.out_rx = None;
381
382 tracing::info!("Disconnected dYdX WebSocket");
383 Ok(())
384 }
385
386 async fn send_text_inner(&self, text: &str) -> DydxWsResult<()> {
388 self.cmd_tx
389 .send(HandlerCommand::SendText(text.to_string()))
390 .map_err(|e| {
391 DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
392 })?;
393 Ok(())
394 }
395
396 pub fn send_command(&self, cmd: HandlerCommand) -> DydxWsResult<()> {
402 self.cmd_tx.send(cmd).map_err(|e| {
403 DydxWsError::Transport(format!("Failed to send command to handler: {e}"))
404 })?;
405 Ok(())
406 }
407
408 fn ticker_from_instrument_id(instrument_id: &InstrumentId) -> String {
409 let mut s = instrument_id.symbol.as_str().to_string();
410 if let Some(stripped) = s.strip_suffix("-PERP") {
411 s = stripped.to_string();
412 }
413 s
414 }
415
416 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
426 let ticker = Self::ticker_from_instrument_id(&instrument_id);
427 let sub = super::messages::DydxSubscription {
428 op: super::enums::DydxWsOperation::Subscribe,
429 channel: super::enums::DydxWsChannel::Trades,
430 id: Some(ticker),
431 };
432 let payload = serde_json::to_string(&sub)?;
433 self.send_text_inner(&payload).await
434 }
435
436 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
442 let ticker = Self::ticker_from_instrument_id(&instrument_id);
443 let sub = super::messages::DydxSubscription {
444 op: super::enums::DydxWsOperation::Unsubscribe,
445 channel: super::enums::DydxWsChannel::Trades,
446 id: Some(ticker),
447 };
448 let payload = serde_json::to_string(&sub)?;
449 self.send_text_inner(&payload).await
450 }
451
452 pub async fn subscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
462 let ticker = Self::ticker_from_instrument_id(&instrument_id);
463 let sub = super::messages::DydxSubscription {
464 op: super::enums::DydxWsOperation::Subscribe,
465 channel: super::enums::DydxWsChannel::Orderbook,
466 id: Some(ticker),
467 };
468 let payload = serde_json::to_string(&sub)?;
469 self.send_text_inner(&payload).await
470 }
471
472 pub async fn unsubscribe_orderbook(&self, instrument_id: InstrumentId) -> DydxWsResult<()> {
478 let ticker = Self::ticker_from_instrument_id(&instrument_id);
479 let sub = super::messages::DydxSubscription {
480 op: super::enums::DydxWsOperation::Unsubscribe,
481 channel: super::enums::DydxWsChannel::Orderbook,
482 id: Some(ticker),
483 };
484 let payload = serde_json::to_string(&sub)?;
485 self.send_text_inner(&payload).await
486 }
487
488 pub async fn subscribe_candles(
498 &self,
499 instrument_id: InstrumentId,
500 resolution: &str,
501 ) -> DydxWsResult<()> {
502 let ticker = Self::ticker_from_instrument_id(&instrument_id);
503 let id = format!("{ticker}/{resolution}");
504 let sub = super::messages::DydxSubscription {
505 op: super::enums::DydxWsOperation::Subscribe,
506 channel: super::enums::DydxWsChannel::Candles,
507 id: Some(id),
508 };
509 let payload = serde_json::to_string(&sub)?;
510 self.send_text_inner(&payload).await
511 }
512
513 pub async fn unsubscribe_candles(
519 &self,
520 instrument_id: InstrumentId,
521 resolution: &str,
522 ) -> DydxWsResult<()> {
523 let ticker = Self::ticker_from_instrument_id(&instrument_id);
524 let id = format!("{ticker}/{resolution}");
525 let sub = super::messages::DydxSubscription {
526 op: super::enums::DydxWsOperation::Unsubscribe,
527 channel: super::enums::DydxWsChannel::Candles,
528 id: Some(id),
529 };
530 let payload = serde_json::to_string(&sub)?;
531 self.send_text_inner(&payload).await
532 }
533
534 pub async fn subscribe_markets(&self) -> DydxWsResult<()> {
544 let sub = super::messages::DydxSubscription {
545 op: super::enums::DydxWsOperation::Subscribe,
546 channel: super::enums::DydxWsChannel::Markets,
547 id: None,
548 };
549 let payload = serde_json::to_string(&sub)?;
550 self.send_text_inner(&payload).await
551 }
552
553 pub async fn unsubscribe_markets(&self) -> DydxWsResult<()> {
559 let sub = super::messages::DydxSubscription {
560 op: super::enums::DydxWsOperation::Unsubscribe,
561 channel: super::enums::DydxWsChannel::Markets,
562 id: None,
563 };
564 let payload = serde_json::to_string(&sub)?;
565 self.send_text_inner(&payload).await
566 }
567
568 pub async fn subscribe_subaccount(
582 &self,
583 address: &str,
584 subaccount_number: u32,
585 ) -> DydxWsResult<()> {
586 if !self.requires_auth {
587 return Err(DydxWsError::Authentication(
588 "Subaccount subscriptions require authentication. Use new_private() to create an authenticated client".to_string(),
589 ));
590 }
591 let id = format!("{address}/{subaccount_number}");
592 let sub = super::messages::DydxSubscription {
593 op: super::enums::DydxWsOperation::Subscribe,
594 channel: super::enums::DydxWsChannel::Subaccounts,
595 id: Some(id),
596 };
597 let payload = serde_json::to_string(&sub)?;
598 self.send_text_inner(&payload).await
599 }
600
601 pub async fn unsubscribe_subaccount(
607 &self,
608 address: &str,
609 subaccount_number: u32,
610 ) -> DydxWsResult<()> {
611 let id = format!("{address}/{subaccount_number}");
612 let sub = super::messages::DydxSubscription {
613 op: super::enums::DydxWsOperation::Unsubscribe,
614 channel: super::enums::DydxWsChannel::Subaccounts,
615 id: Some(id),
616 };
617 let payload = serde_json::to_string(&sub)?;
618 self.send_text_inner(&payload).await
619 }
620
621 pub async fn subscribe_block_height(&self) -> DydxWsResult<()> {
631 let sub = super::messages::DydxSubscription {
632 op: super::enums::DydxWsOperation::Subscribe,
633 channel: super::enums::DydxWsChannel::BlockHeight,
634 id: None,
635 };
636 let payload = serde_json::to_string(&sub)?;
637 self.send_text_inner(&payload).await
638 }
639
640 pub async fn unsubscribe_block_height(&self) -> DydxWsResult<()> {
646 let sub = super::messages::DydxSubscription {
647 op: super::enums::DydxWsOperation::Unsubscribe,
648 channel: super::enums::DydxWsChannel::BlockHeight,
649 id: None,
650 };
651 let payload = serde_json::to_string(&sub)?;
652 self.send_text_inner(&payload).await
653 }
654}