1use std::{
25 collections::VecDeque,
26 fmt::Debug,
27 sync::{
28 Arc,
29 atomic::{AtomicBool, Ordering},
30 },
31};
32
33use ahash::AHashMap;
34use nautilus_core::time::get_atomic_clock_realtime;
35use nautilus_model::{
36 data::{
37 Bar, BarType, Data, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate, OrderBookDeltas,
38 },
39 identifiers::{AccountId, InstrumentId},
40 instruments::{Instrument, InstrumentAny},
41 types::Price,
42};
43use nautilus_network::{
44 RECONNECTED,
45 retry::{RetryManager, create_websocket_retry_manager},
46 websocket::{SubscriptionState, WebSocketClient},
47};
48use rust_decimal::Decimal;
49use tokio_tungstenite::tungstenite::Message;
50use ustr::Ustr;
51
52use super::{
53 DydxWsError, DydxWsResult,
54 client::DYDX_RATE_LIMIT_KEY_SUBSCRIPTION,
55 enums::{DydxWsChannel, DydxWsMessage, NautilusWsMessage},
56 error::DydxWebSocketError,
57 messages::{
58 DydxCandle, DydxMarketsContents, DydxOrderbookContents, DydxOrderbookSnapshotContents,
59 DydxSubscription, DydxTradeContents, DydxWsBlockHeightMessage, DydxWsCandlesMessage,
60 DydxWsChannelBatchDataMsg, DydxWsChannelDataMsg, DydxWsConnectedMsg, DydxWsFeedMessage,
61 DydxWsGenericMsg, DydxWsMarketsMessage, DydxWsOrderbookMessage,
62 DydxWsParentSubaccountsMessage, DydxWsSubaccountsChannelContents,
63 DydxWsSubaccountsChannelData, DydxWsSubaccountsMessage, DydxWsSubaccountsSubscribed,
64 DydxWsSubscriptionMsg, DydxWsTradesMessage,
65 },
66 parse as ws_parse,
67};
68use crate::common::parse::parse_instrument_id;
69
70#[derive(Debug, Clone)]
72pub enum HandlerCommand {
73 UpdateInstrument(Box<InstrumentAny>),
75 InitializeInstruments(Vec<InstrumentAny>),
77 RegisterBarType { topic: String, bar_type: BarType },
79 UnregisterBarType { topic: String },
81 RegisterSubscription {
83 topic: String,
84 subscription: DydxSubscription,
85 },
86 UnregisterSubscription { topic: String },
88 SendText(String),
90}
91
92pub struct FeedHandler {
97 account_id: Option<AccountId>,
99 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
101 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
103 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
105 client: WebSocketClient,
107 signal: Arc<AtomicBool>,
109 retry_manager: RetryManager<DydxWsError>,
111 instruments: AHashMap<Ustr, InstrumentAny>,
113 bar_types: AHashMap<String, BarType>,
115 subscriptions: SubscriptionState,
117 subscription_messages: AHashMap<String, DydxSubscription>,
119 message_buffer: VecDeque<NautilusWsMessage>,
121 book_sequence: AHashMap<String, u64>,
123 pending_bars: AHashMap<String, Bar>,
125 bars_timestamp_on_close: bool,
127}
128
129impl Debug for FeedHandler {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 f.debug_struct(stringify!(FeedHandler))
132 .field("account_id", &self.account_id)
133 .field("instruments_count", &self.instruments.len())
134 .field("bar_types_count", &self.bar_types.len())
135 .finish_non_exhaustive()
136 }
137}
138
139impl FeedHandler {
140 #[must_use]
142 #[allow(clippy::too_many_arguments)]
143 pub fn new(
144 account_id: Option<AccountId>,
145 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
146 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
147 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
148 client: WebSocketClient,
149 signal: Arc<AtomicBool>,
150 subscriptions: SubscriptionState,
151 bars_timestamp_on_close: bool,
152 ) -> Self {
153 Self {
154 account_id,
155 cmd_rx,
156 out_tx,
157 raw_rx,
158 client,
159 signal,
160 retry_manager: create_websocket_retry_manager(),
161 instruments: AHashMap::new(),
162 bar_types: AHashMap::new(),
163 subscriptions,
164 subscription_messages: AHashMap::new(),
165 message_buffer: VecDeque::new(),
166 book_sequence: AHashMap::new(),
167 pending_bars: AHashMap::new(),
168 bars_timestamp_on_close,
169 }
170 }
171
172 async fn send_with_retry(
176 &self,
177 payload: String,
178 rate_limit_keys: Option<&[Ustr]>,
179 ) -> Result<(), DydxWsError> {
180 let keys_owned: Option<Vec<Ustr>> = rate_limit_keys.map(|k| k.to_vec());
181 self.retry_manager
182 .execute_with_retry(
183 "websocket_send",
184 || {
185 let payload = payload.clone();
186 let keys = keys_owned.clone();
187 async move {
188 self.client
189 .send_text(payload, keys.as_deref())
190 .await
191 .map_err(|e| DydxWsError::ClientError(format!("Send failed: {e}")))
192 }
193 },
194 should_retry_dydx_error,
195 create_dydx_timeout_error,
196 )
197 .await
198 }
199
200 pub async fn run(&mut self) {
208 log::debug!("WebSocket handler started");
209 loop {
210 if !self.message_buffer.is_empty() {
212 let nautilus_msg = self.message_buffer.pop_front().unwrap();
213 if self.out_tx.send(nautilus_msg).is_err() {
214 log::debug!("Receiver dropped, stopping handler");
215 break;
216 }
217 continue;
218 }
219
220 tokio::select! {
221 Some(cmd) = self.cmd_rx.recv() => {
222 self.handle_command(cmd).await;
223 }
224
225 Some(msg) = self.raw_rx.recv() => {
226 log::trace!("Handler received raw message");
227 let nautilus_msgs = self.process_raw_message(msg).await;
228 if !nautilus_msgs.is_empty() {
229 let mut iter = nautilus_msgs.into_iter();
230 let first = iter.next().expect("non-empty vec has first element");
232 self.message_buffer.extend(iter);
233 log::trace!("Handler sending message: {:?}", std::mem::discriminant(&first));
234 if self.out_tx.send(first).is_err() {
235 log::debug!("Receiver dropped, stopping handler");
236 break;
237 }
238 }
239 }
240
241 else => {
242 log::debug!("Handler shutting down: channels closed");
243 break;
244 }
245 }
246
247 if self.signal.load(Ordering::Relaxed) {
248 log::debug!("Handler received stop signal");
249 break;
250 }
251 }
252 }
253
254 async fn process_raw_message(&mut self, msg: Message) -> Vec<NautilusWsMessage> {
256 match msg {
257 Message::Text(txt) => {
258 if txt == RECONNECTED {
259 self.clear_state();
260 if let Err(e) = self.replay_subscriptions().await {
261 log::error!("Failed to replay subscriptions after reconnect: {e}");
262 }
263 return vec![NautilusWsMessage::Reconnected];
264 }
265
266 match serde_json::from_str::<DydxWsFeedMessage>(&txt) {
268 Ok(feed_msg) => {
269 return self.handle_feed_message(feed_msg);
270 }
271 Err(e) => {
272 if txt.contains("v4_subaccounts") {
274 log::warn!(
275 "[WS_DESER] Failed to parse v4_subaccounts as DydxWsFeedMessage: {e}\nRaw: {txt}"
276 );
277 }
278 }
279 }
280
281 match serde_json::from_str::<serde_json::Value>(&txt) {
283 Ok(val) => match serde_json::from_value::<DydxWsGenericMsg>(val.clone()) {
284 Ok(meta) => {
285 let result = if meta.is_connected() {
286 serde_json::from_value::<DydxWsConnectedMsg>(val)
287 .map(DydxWsMessage::Connected)
288 } else if meta.is_subscribed() {
289 log::debug!("Processing subscribed message via fallback path");
290 if let Ok(sub_msg) =
291 serde_json::from_value::<DydxWsSubscriptionMsg>(val.clone())
292 {
293 if sub_msg.channel == DydxWsChannel::Subaccounts {
294 log::debug!("Parsing subaccounts subscription (fallback)");
295 serde_json::from_value::<DydxWsSubaccountsSubscribed>(val)
296 .map(DydxWsMessage::SubaccountsSubscribed)
297 .or_else(|e| {
298 log::warn!(
299 "Failed to parse subaccounts subscription: {e}"
300 );
301 Ok(DydxWsMessage::Subscribed(sub_msg))
302 })
303 } else {
304 Ok(DydxWsMessage::Subscribed(sub_msg))
305 }
306 } else {
307 serde_json::from_value::<DydxWsSubscriptionMsg>(val)
308 .map(DydxWsMessage::Subscribed)
309 }
310 } else if meta.is_unsubscribed() {
311 serde_json::from_value::<DydxWsSubscriptionMsg>(val)
312 .map(DydxWsMessage::Unsubscribed)
313 } else if meta.is_error() {
314 serde_json::from_value::<DydxWebSocketError>(val)
315 .map(DydxWsMessage::Error)
316 } else if meta.is_unknown() {
317 log::warn!("Received unknown WebSocket message type: {txt}",);
318 Ok(DydxWsMessage::Raw(val))
319 } else {
320 Ok(DydxWsMessage::Raw(val))
321 };
322
323 match result {
324 Ok(dydx_msg) => self.handle_dydx_message(dydx_msg).await,
325 Err(e) => {
326 log::error!(
327 "Failed to parse WebSocket message: {e}. Message type: {:?}, Channel: {:?}. Raw: {txt}",
328 meta.msg_type,
329 meta.channel,
330 );
331 vec![]
332 }
333 }
334 }
335 Err(e) => {
336 log::error!(
337 "Failed to parse WebSocket message envelope (DydxWsGenericMsg): {e}\nRaw JSON:\n{txt}"
338 );
339 vec![]
340 }
341 },
342 Err(e) => {
343 let err = DydxWebSocketError::from_message(e.to_string());
344 vec![NautilusWsMessage::Error(err)]
345 }
346 }
347 }
348 Message::Pong(_data) => vec![],
349 Message::Ping(_data) => vec![], Message::Binary(_bin) => vec![], Message::Close(_frame) => {
352 log::info!("WebSocket close frame received");
353 vec![]
354 }
355 Message::Frame(_) => vec![],
356 }
357 }
358
359 async fn handle_dydx_message(&mut self, msg: DydxWsMessage) -> Vec<NautilusWsMessage> {
361 match self.handle_message(msg).await {
362 Ok(msgs) => msgs,
363 Err(e) => {
364 log::error!("Error handling message: {e}");
365 vec![]
366 }
367 }
368 }
369
370 fn handle_feed_message(&mut self, feed_msg: DydxWsFeedMessage) -> Vec<NautilusWsMessage> {
372 log::trace!(
373 "Handling feed message: {:?}",
374 std::mem::discriminant(&feed_msg)
375 );
376 match feed_msg {
377 DydxWsFeedMessage::Subaccounts(msg) => self.handle_subaccounts(msg),
378 DydxWsFeedMessage::Orderbook(msg) => self.handle_orderbook(msg),
379 DydxWsFeedMessage::Trades(msg) => self.handle_trades(msg),
380 DydxWsFeedMessage::Markets(msg) => self.handle_markets_feed(msg),
381 DydxWsFeedMessage::Candles(msg) => self.handle_candles_feed(msg),
382 DydxWsFeedMessage::ParentSubaccounts(msg) => self.handle_parent_subaccounts(msg),
383 DydxWsFeedMessage::BlockHeight(msg) => self.handle_block_height_feed(msg),
384 }
385 }
386
387 fn handle_subaccounts(&self, msg: DydxWsSubaccountsMessage) -> Vec<NautilusWsMessage> {
389 match msg {
390 DydxWsSubaccountsMessage::Subscribed(data) => {
391 let topic =
392 self.topic_from_msg(&DydxWsChannel::Subaccounts, &Some(data.id.clone()));
393 self.subscriptions.confirm_subscribe(&topic);
394 self.process_subaccounts_subscribed(&data)
395 }
396 DydxWsSubaccountsMessage::ChannelData(data) => {
397 self.process_subaccounts_channel_data(data)
398 }
399 DydxWsSubaccountsMessage::Unsubscribed(data) => {
400 let topic = self.topic_from_msg(&DydxWsChannel::Subaccounts, &data.id);
401 self.subscriptions.confirm_unsubscribe(&topic);
402 vec![]
403 }
404 }
405 }
406
407 fn handle_orderbook(&mut self, msg: DydxWsOrderbookMessage) -> Vec<NautilusWsMessage> {
409 match msg {
410 DydxWsOrderbookMessage::Subscribed(data) => {
411 let topic = self.topic_from_msg(&DydxWsChannel::Orderbook, &data.id);
412 self.subscriptions.confirm_subscribe(&topic);
413 if let Some(id) = &data.id {
415 self.book_sequence.insert(id.clone(), data.message_id);
416 }
417 self.parse_orderbook_from_data(&data, true)
418 }
419 DydxWsOrderbookMessage::ChannelData(data) => {
420 if let Some(id) = &data.id {
421 if let Some(last_id) = self.book_sequence.get(id)
422 && data.message_id <= *last_id
423 {
424 log::warn!(
425 "Orderbook sequence regression for {id}: last {last_id}, received {}",
426 data.message_id
427 );
428 }
429 self.book_sequence.insert(id.clone(), data.message_id);
430 }
431 self.parse_orderbook_from_data(&data, false)
432 }
433 DydxWsOrderbookMessage::ChannelBatchData(data) => {
434 if let Some(id) = &data.id {
435 if let Some(last_id) = self.book_sequence.get(id)
436 && data.message_id <= *last_id
437 {
438 log::warn!(
439 "Orderbook batch sequence regression for {id}: last {last_id}, received {}",
440 data.message_id
441 );
442 }
443 self.book_sequence.insert(id.clone(), data.message_id);
444 }
445 self.parse_orderbook_batch_from_data(&data)
446 }
447 DydxWsOrderbookMessage::Unsubscribed(data) => {
448 let topic = self.topic_from_msg(&DydxWsChannel::Orderbook, &data.id);
449 self.subscriptions.confirm_unsubscribe(&topic);
450 if let Some(id) = &data.id {
451 self.book_sequence.remove(id);
452 }
453 vec![]
454 }
455 }
456 }
457
458 fn handle_trades(&self, msg: DydxWsTradesMessage) -> Vec<NautilusWsMessage> {
460 match msg {
461 DydxWsTradesMessage::Subscribed(data) => {
462 let topic = self.topic_from_msg(&DydxWsChannel::Trades, &data.id);
463 self.subscriptions.confirm_subscribe(&topic);
464 self.parse_trades_from_data(&data)
465 }
466 DydxWsTradesMessage::ChannelData(data) => self.parse_trades_from_data(&data),
467 DydxWsTradesMessage::Unsubscribed(data) => {
468 let topic = self.topic_from_msg(&DydxWsChannel::Trades, &data.id);
469 self.subscriptions.confirm_unsubscribe(&topic);
470 vec![]
471 }
472 }
473 }
474
475 fn handle_markets_feed(&self, msg: DydxWsMarketsMessage) -> Vec<NautilusWsMessage> {
477 match msg {
478 DydxWsMarketsMessage::Subscribed(data) => {
479 let topic = self.topic_from_msg(&DydxWsChannel::Markets, &data.id);
480 self.subscriptions.confirm_subscribe(&topic);
481 self.parse_markets_from_data(&data)
482 }
483 DydxWsMarketsMessage::ChannelData(data) => self.parse_markets_from_data(&data),
484 DydxWsMarketsMessage::Unsubscribed(data) => {
485 let topic = self.topic_from_msg(&DydxWsChannel::Markets, &data.id);
486 self.subscriptions.confirm_unsubscribe(&topic);
487 vec![]
488 }
489 }
490 }
491
492 fn handle_candles_feed(&mut self, msg: DydxWsCandlesMessage) -> Vec<NautilusWsMessage> {
497 match msg {
498 DydxWsCandlesMessage::Subscribed(data) => {
499 let topic = self.topic_from_msg(&DydxWsChannel::Candles, &data.id);
500 self.subscriptions.confirm_subscribe(&topic);
501 vec![]
502 }
503 DydxWsCandlesMessage::ChannelData(data) => self.parse_candles_from_data(&data),
504 DydxWsCandlesMessage::Unsubscribed(data) => {
505 let topic = self.topic_from_msg(&DydxWsChannel::Candles, &data.id);
506 self.subscriptions.confirm_unsubscribe(&topic);
507 vec![]
508 }
509 }
510 }
511
512 fn handle_parent_subaccounts(
514 &self,
515 msg: DydxWsParentSubaccountsMessage,
516 ) -> Vec<NautilusWsMessage> {
517 match msg {
518 DydxWsParentSubaccountsMessage::Subscribed(data) => {
519 let topic = self.topic_from_msg(&DydxWsChannel::ParentSubaccounts, &data.id);
520 self.subscriptions.confirm_subscribe(&topic);
521 self.parse_parent_subaccounts_from_data(&data)
522 }
523 DydxWsParentSubaccountsMessage::ChannelData(data) => {
524 self.parse_parent_subaccounts_from_data(&data)
525 }
526 DydxWsParentSubaccountsMessage::Unsubscribed(data) => {
527 let topic = self.topic_from_msg(&DydxWsChannel::ParentSubaccounts, &data.id);
528 self.subscriptions.confirm_unsubscribe(&topic);
529 vec![]
530 }
531 }
532 }
533
534 fn handle_block_height_feed(&self, msg: DydxWsBlockHeightMessage) -> Vec<NautilusWsMessage> {
536 match msg {
537 DydxWsBlockHeightMessage::Subscribed(data) => {
538 let topic =
539 self.topic_from_msg(&DydxWsChannel::BlockHeight, &Some(data.id.clone()));
540 self.subscriptions.confirm_subscribe(&topic);
541 match data.contents.height.parse::<u64>() {
542 Ok(height) => vec![NautilusWsMessage::BlockHeight {
543 height,
544 time: data.contents.time,
545 }],
546 Err(e) => {
547 log::warn!("Failed to parse block height from subscription: {e}");
548 vec![]
549 }
550 }
551 }
552 DydxWsBlockHeightMessage::ChannelData(data) => {
553 match data.contents.block_height.parse::<u64>() {
554 Ok(height) => vec![NautilusWsMessage::BlockHeight {
555 height,
556 time: data.contents.time,
557 }],
558 Err(e) => {
559 log::warn!("Failed to parse block height from channel data: {e}");
560 vec![]
561 }
562 }
563 }
564 DydxWsBlockHeightMessage::Unsubscribed(data) => {
565 let topic = self.topic_from_msg(&DydxWsChannel::BlockHeight, &data.id);
566 self.subscriptions.confirm_unsubscribe(&topic);
567 vec![]
568 }
569 }
570 }
571
572 fn process_subaccounts_subscribed(
574 &self,
575 msg: &DydxWsSubaccountsSubscribed,
576 ) -> Vec<NautilusWsMessage> {
577 log::debug!("Forwarding subaccount subscription to execution client");
578 vec![NautilusWsMessage::SubaccountSubscribed(Box::new(
579 msg.clone(),
580 ))]
581 }
582
583 fn process_subaccounts_channel_data(
585 &self,
586 data: DydxWsSubaccountsChannelData,
587 ) -> Vec<NautilusWsMessage> {
588 let has_orders = data.contents.orders.as_ref().is_some_and(|o| !o.is_empty());
589 let has_fills = data.contents.fills.as_ref().is_some_and(|f| !f.is_empty());
590
591 if has_orders || has_fills {
592 log::debug!(
593 "Received {} order(s), {} fill(s) - forwarding to execution client",
594 data.contents.orders.as_ref().map_or(0, |o| o.len()),
595 data.contents.fills.as_ref().map_or(0, |f| f.len())
596 );
597 vec![NautilusWsMessage::SubaccountsChannelData(Box::new(data))]
598 } else {
599 vec![]
600 }
601 }
602
603 fn parse_trades_from_data(&self, data: &DydxWsChannelDataMsg) -> Vec<NautilusWsMessage> {
605 match self.parse_trades(data) {
606 Ok(msgs) => msgs,
607 Err(e) => {
608 log::error!("Error parsing trades: {e}");
609 vec![]
610 }
611 }
612 }
613
614 fn parse_orderbook_from_data(
616 &mut self,
617 data: &DydxWsChannelDataMsg,
618 is_snapshot: bool,
619 ) -> Vec<NautilusWsMessage> {
620 match self.parse_orderbook(data, is_snapshot) {
621 Ok(msgs) => msgs,
622 Err(e) => {
623 log::error!("Error parsing orderbook: {e}");
624 vec![]
625 }
626 }
627 }
628
629 fn parse_orderbook_batch_from_data(
631 &mut self,
632 data: &DydxWsChannelBatchDataMsg,
633 ) -> Vec<NautilusWsMessage> {
634 match self.parse_orderbook_batch(data) {
635 Ok(msgs) => msgs,
636 Err(e) => {
637 log::error!("Error parsing orderbook batch: {e}");
638 vec![]
639 }
640 }
641 }
642
643 fn parse_markets_from_data(&self, data: &DydxWsChannelDataMsg) -> Vec<NautilusWsMessage> {
645 match self.parse_markets(data) {
646 Ok(msgs) => msgs,
647 Err(e) => {
648 log::error!("Error parsing markets: {e}");
649 vec![]
650 }
651 }
652 }
653
654 fn parse_candles_from_data(&mut self, data: &DydxWsChannelDataMsg) -> Vec<NautilusWsMessage> {
656 match self.parse_candles(data) {
657 Ok(msgs) => msgs,
658 Err(e) => {
659 log::error!("Error parsing candles: {e}");
660 vec![]
661 }
662 }
663 }
664
665 fn parse_parent_subaccounts_from_data(
667 &self,
668 data: &DydxWsChannelDataMsg,
669 ) -> Vec<NautilusWsMessage> {
670 match self.parse_subaccounts(data) {
671 Ok(msgs) => msgs,
672 Err(e) => {
673 log::error!("Error parsing parent subaccounts: {e}");
674 vec![]
675 }
676 }
677 }
678
679 async fn handle_command(&mut self, command: HandlerCommand) {
681 match command {
682 HandlerCommand::UpdateInstrument(instrument) => {
683 let symbol = instrument.id().symbol.inner();
684 self.instruments.insert(symbol, *instrument);
685 }
686 HandlerCommand::InitializeInstruments(instruments) => {
687 log::debug!(
688 "Initializing {} instruments in WebSocket handler",
689 instruments.len()
690 );
691 for instrument in instruments {
692 let symbol = instrument.id().symbol.inner();
693 self.instruments.insert(symbol, instrument);
694 }
695 log::debug!(
696 "Handler now has {} instruments cached",
697 self.instruments.len()
698 );
699 }
700 HandlerCommand::RegisterBarType { topic, bar_type } => {
701 self.bar_types.insert(topic, bar_type);
702 }
703 HandlerCommand::UnregisterBarType { topic } => {
704 self.bar_types.remove(&topic);
705 }
706 HandlerCommand::RegisterSubscription {
707 topic,
708 subscription,
709 } => {
710 self.subscription_messages.insert(topic, subscription);
711 }
712 HandlerCommand::UnregisterSubscription { topic } => {
713 self.subscription_messages.remove(&topic);
714 }
715 HandlerCommand::SendText(text) => {
716 if let Err(e) = self
717 .send_with_retry(text, Some(DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
718 .await
719 {
720 log::error!("Failed to send WebSocket text after retries: {e}");
721 }
722 }
723 }
724 }
725
726 pub fn register_bar_type(&mut self, topic: String, bar_type: BarType) {
728 self.bar_types.insert(topic, bar_type);
729 }
730
731 pub fn unregister_bar_type(&mut self, topic: &str) {
733 self.bar_types.remove(topic);
734 }
735
736 fn topic_from_msg(&self, channel: &DydxWsChannel, id: &Option<String>) -> String {
737 match id {
738 Some(id) => format!(
739 "{}{}{}",
740 channel.as_ref(),
741 self.subscriptions.delimiter(),
742 id
743 ),
744 None => channel.as_ref().to_string(),
745 }
746 }
747
748 fn clear_state(&mut self) {
753 let buffer_count = self.message_buffer.len();
754 let seq_count = self.book_sequence.len();
755 let bars_count = self.pending_bars.len();
756 self.message_buffer.clear();
757 self.book_sequence.clear();
758 self.pending_bars.clear();
759 log::debug!(
760 "Cleared reconnect state: message_buffer={buffer_count}, \
761 book_sequence={seq_count}, pending_bars={bars_count}"
762 );
763 }
764
765 async fn replay_subscriptions(&self) -> DydxWsResult<()> {
766 let topics = self.subscriptions.all_topics();
767 for topic in topics {
768 let Some(subscription) = self.subscription_messages.get(&topic).cloned() else {
769 log::warn!("No preserved subscription message for topic: {topic}");
770 continue;
771 };
772
773 let payload = serde_json::to_string(&subscription)?;
774 self.subscriptions.mark_subscribe(&topic);
775
776 if let Err(e) = self
777 .send_with_retry(payload, Some(DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
778 .await
779 {
780 self.subscriptions.mark_failure(&topic);
781 return Err(e);
782 }
783 }
784
785 Ok(())
786 }
787
788 #[allow(clippy::result_large_err)]
796 pub async fn handle_message(
797 &mut self,
798 msg: DydxWsMessage,
799 ) -> DydxWsResult<Vec<NautilusWsMessage>> {
800 match msg {
801 DydxWsMessage::Connected(_) => {
802 log::info!("dYdX WebSocket connected");
803 Ok(vec![])
804 }
805 DydxWsMessage::Subscribed(sub) => {
806 log::debug!("Subscribed to {} (id: {:?})", sub.channel, sub.id);
807 let topic = self.topic_from_msg(&sub.channel, &sub.id);
808 self.subscriptions.confirm_subscribe(&topic);
809 Ok(vec![])
810 }
811 DydxWsMessage::SubaccountsSubscribed(msg) => {
812 log::debug!("Subaccounts subscribed with initial state (fallback path)");
813 let topic = self.topic_from_msg(&DydxWsChannel::Subaccounts, &Some(msg.id.clone()));
814 self.subscriptions.confirm_subscribe(&topic);
815 Ok(self.process_subaccounts_subscribed(&msg))
816 }
817 DydxWsMessage::Unsubscribed(unsub) => {
818 log::debug!("Unsubscribed from {} (id: {:?})", unsub.channel, unsub.id);
819 let topic = self.topic_from_msg(&unsub.channel, &unsub.id);
820 self.subscriptions.confirm_unsubscribe(&topic);
821 Ok(vec![])
822 }
823 DydxWsMessage::Error(err) => Ok(vec![NautilusWsMessage::Error(err)]),
824 DydxWsMessage::Reconnected => {
825 self.clear_state();
826 if let Err(e) = self.replay_subscriptions().await {
827 log::error!("Failed to replay subscriptions after reconnect message: {e}");
828 }
829 Ok(vec![NautilusWsMessage::Reconnected])
830 }
831 DydxWsMessage::Pong => Ok(vec![]),
832 DydxWsMessage::Raw(_) => Ok(vec![]),
833 }
834 }
835
836 fn parse_trades(&self, data: &DydxWsChannelDataMsg) -> DydxWsResult<Vec<NautilusWsMessage>> {
837 let symbol = data
838 .id
839 .as_ref()
840 .ok_or_else(|| DydxWsError::Parse("Missing id for trades channel".into()))?;
841
842 let instrument_id = self.parse_instrument_id(symbol)?;
843 let instrument = self.get_instrument(&instrument_id)?;
844
845 let contents: DydxTradeContents = serde_json::from_value(data.contents.clone())
846 .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade contents: {e}")))?;
847
848 let ts_init = get_atomic_clock_realtime().get_time_ns();
849 let ticks = ws_parse::parse_trade_ticks(instrument_id, instrument, &contents, ts_init)?;
850
851 if ticks.is_empty() {
852 Ok(vec![])
853 } else {
854 Ok(vec![NautilusWsMessage::Data(ticks)])
855 }
856 }
857
858 fn parse_orderbook(
859 &mut self,
860 data: &DydxWsChannelDataMsg,
861 is_snapshot: bool,
862 ) -> DydxWsResult<Vec<NautilusWsMessage>> {
863 let symbol = data
864 .id
865 .as_ref()
866 .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook channel".into()))?;
867
868 let instrument_id = self.parse_instrument_id(symbol)?;
869 let instrument = self.get_instrument(&instrument_id)?;
870 let price_prec = instrument.price_precision();
871 let size_prec = instrument.size_precision();
872
873 let ts_init = get_atomic_clock_realtime().get_time_ns();
874
875 if is_snapshot {
876 let contents: DydxOrderbookSnapshotContents =
877 serde_json::from_value(data.contents.clone()).map_err(|e| {
878 DydxWsError::Parse(format!("Failed to parse orderbook snapshot: {e}"))
879 })?;
880
881 let deltas = ws_parse::parse_orderbook_snapshot(
882 &instrument_id,
883 &contents,
884 price_prec,
885 size_prec,
886 ts_init,
887 )?;
888
889 Ok(vec![NautilusWsMessage::Deltas(Box::new(deltas))])
890 } else {
891 let contents: DydxOrderbookContents = serde_json::from_value(data.contents.clone())
892 .map_err(|e| {
893 DydxWsError::Parse(format!("Failed to parse orderbook contents: {e}"))
894 })?;
895
896 let deltas = ws_parse::parse_orderbook_deltas(
897 &instrument_id,
898 &contents,
899 price_prec,
900 size_prec,
901 ts_init,
902 )?;
903
904 Ok(vec![NautilusWsMessage::Deltas(Box::new(deltas))])
905 }
906 }
907
908 fn parse_orderbook_batch(
909 &mut self,
910 data: &DydxWsChannelBatchDataMsg,
911 ) -> DydxWsResult<Vec<NautilusWsMessage>> {
912 let symbol = data
913 .id
914 .as_ref()
915 .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook batch channel".into()))?;
916
917 let instrument_id = self.parse_instrument_id(symbol)?;
918 let instrument = self.get_instrument(&instrument_id)?;
919 let price_prec = instrument.price_precision();
920 let size_prec = instrument.size_precision();
921
922 let contents: Vec<DydxOrderbookContents> = serde_json::from_value(data.contents.clone())
923 .map_err(|e| DydxWsError::Parse(format!("Failed to parse orderbook batch: {e}")))?;
924
925 let ts_init = get_atomic_clock_realtime().get_time_ns();
926 let mut all_deltas = Vec::new();
927
928 let num_messages = contents.len();
929 for (idx, content) in contents.iter().enumerate() {
930 let is_last_message = idx == num_messages - 1;
931 let deltas = ws_parse::parse_orderbook_deltas_with_flag(
932 &instrument_id,
933 content,
934 price_prec,
935 size_prec,
936 ts_init,
937 is_last_message,
938 )?;
939 all_deltas.extend(deltas);
940 }
941
942 let deltas = OrderBookDeltas::new(instrument_id, all_deltas);
943 Ok(vec![NautilusWsMessage::Deltas(Box::new(deltas))])
944 }
945
946 fn parse_candles(
947 &mut self,
948 data: &DydxWsChannelDataMsg,
949 ) -> DydxWsResult<Vec<NautilusWsMessage>> {
950 let topic = data
951 .id
952 .as_ref()
953 .ok_or_else(|| DydxWsError::Parse("Missing id for candles channel".into()))?;
954
955 let bar_type = *self.bar_types.get(topic).ok_or_else(|| {
956 DydxWsError::Parse(format!("No bar type registered for topic: {topic}"))
957 })?;
958
959 let candle: DydxCandle = serde_json::from_value(data.contents.clone())
960 .map_err(|e| DydxWsError::Parse(format!("Failed to parse candle contents: {e}")))?;
961
962 let instrument_id = self.parse_instrument_id(&candle.ticker)?;
963 let instrument = self.get_instrument(&instrument_id)?;
964
965 let ts_init = get_atomic_clock_realtime().get_time_ns();
966 let bar = ws_parse::parse_candle_bar(
967 bar_type,
968 instrument,
969 &candle,
970 self.bars_timestamp_on_close,
971 ts_init,
972 )?;
973
974 let topic_key = topic.clone();
977 if let Some(pending) = self.pending_bars.get(&topic_key) {
978 if pending.ts_event != bar.ts_event {
979 let closed_bar = *pending;
981 self.pending_bars.insert(topic_key, bar);
982 return Ok(vec![NautilusWsMessage::Data(vec![Data::Bar(closed_bar)])]);
983 }
984 self.pending_bars.insert(topic_key, bar);
986 return Ok(vec![]);
987 }
988
989 self.pending_bars.insert(topic_key, bar);
991 Ok(vec![])
992 }
993
994 fn parse_markets(&self, data: &DydxWsChannelDataMsg) -> DydxWsResult<Vec<NautilusWsMessage>> {
995 let contents: DydxMarketsContents = serde_json::from_value(data.contents.clone())
996 .map_err(|e| DydxWsError::Parse(format!("Failed to parse markets contents: {e}")))?;
997
998 let mut messages = Vec::new();
999 let ts_init = get_atomic_clock_realtime().get_time_ns();
1000
1001 if let Some(oracle_prices) = &contents.oracle_prices {
1003 for (symbol_str, oracle_market) in oracle_prices {
1004 let Ok(instrument_id) = self.parse_instrument_id(symbol_str) else {
1005 continue;
1006 };
1007 let Some(instrument) = self.instruments.get(&instrument_id.symbol.inner()) else {
1008 continue;
1009 };
1010 let Ok(oracle_price_dec) = oracle_market.oracle_price.parse::<Decimal>() else {
1011 log::error!("Failed to parse oracle price: market={symbol_str}");
1012 continue;
1013 };
1014 let Ok(price) =
1015 Price::from_decimal_dp(oracle_price_dec, instrument.price_precision())
1016 else {
1017 log::error!("Failed to create Price: market={symbol_str}");
1018 continue;
1019 };
1020
1021 messages.push(NautilusWsMessage::MarkPrice(MarkPriceUpdate::new(
1022 instrument_id,
1023 price,
1024 ts_init,
1025 ts_init,
1026 )));
1027 messages.push(NautilusWsMessage::IndexPrice(IndexPriceUpdate::new(
1028 instrument_id,
1029 price,
1030 ts_init,
1031 ts_init,
1032 )));
1033 }
1034 }
1035
1036 if let Some(trading) = &contents.trading {
1038 for (symbol_str, trading_data) in trading {
1039 let Ok(instrument_id) = self.parse_instrument_id(symbol_str) else {
1040 continue;
1041 };
1042
1043 if !self.instruments.contains_key(&instrument_id.symbol.inner()) {
1045 log::info!("New instrument discovered via WebSocket: {symbol_str}");
1046 messages.push(NautilusWsMessage::NewInstrumentDiscovered {
1047 ticker: symbol_str.clone(),
1048 });
1049 continue;
1050 }
1051
1052 let Some(rate_str) = &trading_data.next_funding_rate else {
1054 continue;
1055 };
1056 let Ok(rate) = rate_str.parse::<Decimal>() else {
1057 log::error!(
1058 "Failed to parse funding rate: market={symbol_str}, rate={rate_str}"
1059 );
1060 continue;
1061 };
1062
1063 messages.push(NautilusWsMessage::FundingRate(FundingRateUpdate::new(
1064 instrument_id,
1065 rate,
1066 None,
1067 ts_init,
1068 ts_init,
1069 )));
1070 }
1071 }
1072
1073 Ok(messages)
1074 }
1075
1076 fn parse_subaccounts(
1077 &self,
1078 data: &DydxWsChannelDataMsg,
1079 ) -> DydxWsResult<Vec<NautilusWsMessage>> {
1080 log::debug!(
1081 "Parsing subaccounts channel data (msg_type={:?})",
1082 data.msg_type
1083 );
1084 let contents: DydxWsSubaccountsChannelContents =
1085 serde_json::from_value(data.contents.clone()).map_err(|e| {
1086 DydxWsError::Parse(format!("Failed to parse subaccounts contents: {e}"))
1087 })?;
1088
1089 let has_orders = contents.orders.as_ref().is_some_and(|o| !o.is_empty());
1090 let has_fills = contents.fills.as_ref().is_some_and(|f| !f.is_empty());
1091
1092 if has_orders || has_fills {
1093 log::debug!(
1096 "Received {} order(s), {} fill(s) - forwarding to execution client",
1097 contents.orders.as_ref().map_or(0, |o| o.len()),
1098 contents.fills.as_ref().map_or(0, |f| f.len())
1099 );
1100
1101 let channel_data = DydxWsSubaccountsChannelData {
1102 connection_id: data.connection_id.clone(),
1103 message_id: data.message_id,
1104 id: data.id.clone().unwrap_or_default(),
1105 version: data.version.clone().unwrap_or_default(),
1106 contents,
1107 };
1108
1109 return Ok(vec![NautilusWsMessage::SubaccountsChannelData(Box::new(
1110 channel_data,
1111 ))]);
1112 }
1113
1114 Ok(vec![])
1115 }
1116
1117 fn parse_instrument_id(&self, symbol: &str) -> DydxWsResult<InstrumentId> {
1118 let symbol_with_perp = format!("{symbol}-PERP");
1121 Ok(parse_instrument_id(&symbol_with_perp))
1122 }
1123
1124 fn get_instrument(&self, instrument_id: &InstrumentId) -> DydxWsResult<&InstrumentAny> {
1125 self.instruments
1126 .get(&instrument_id.symbol.inner())
1127 .ok_or_else(|| DydxWsError::Parse(format!("No instrument cached for {instrument_id}")))
1128 }
1129}
1130
1131fn should_retry_dydx_error(error: &DydxWsError) -> bool {
1133 match error {
1134 DydxWsError::Transport(_) => true,
1135 DydxWsError::Send(_) => true,
1136 DydxWsError::ClientError(msg) => {
1137 let msg_lower = msg.to_lowercase();
1138 msg_lower.contains("timeout")
1139 || msg_lower.contains("timed out")
1140 || msg_lower.contains("connection")
1141 || msg_lower.contains("network")
1142 }
1143 DydxWsError::NotConnected
1144 | DydxWsError::Json(_)
1145 | DydxWsError::Parse(_)
1146 | DydxWsError::Authentication(_)
1147 | DydxWsError::Subscription(_)
1148 | DydxWsError::Venue(_) => false,
1149 }
1150}
1151
1152fn create_dydx_timeout_error(msg: String) -> DydxWsError {
1154 DydxWsError::ClientError(msg)
1155}