nautilus_dydx/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Message handler for dYdX WebSocket streams.
17//!
18//! This module processes incoming WebSocket messages and converts them into
19//! Nautilus domain objects.
20//!
21//! The handler owns the WebSocketClient exclusively and runs in a dedicated
22//! Tokio task within the lock-free I/O boundary.
23
24use std::{
25    fmt::Debug,
26    str::FromStr,
27    sync::{
28        Arc,
29        atomic::{AtomicBool, Ordering},
30    },
31};
32
33use ahash::AHashMap;
34use nautilus_core::{nanos::UnixNanos, time::get_atomic_clock_realtime};
35use nautilus_model::{
36    data::{
37        Bar, BarType, BookOrder, Data, OrderBookDelta, OrderBookDeltas, TradeTick,
38        bar::get_bar_interval_ns,
39    },
40    enums::{AggressorSide, BookAction, OrderSide, RecordFlag},
41    identifiers::{AccountId, InstrumentId, TradeId},
42    instruments::{Instrument, InstrumentAny},
43    types::{Price, Quantity},
44};
45use nautilus_network::{
46    RECONNECTED,
47    retry::{RetryManager, create_websocket_retry_manager},
48    websocket::{SubscriptionState, WebSocketClient},
49};
50use rust_decimal::Decimal;
51use tokio_tungstenite::tungstenite::Message;
52use ustr::Ustr;
53
54use super::{
55    DydxWsError, DydxWsResult,
56    client::DYDX_RATE_LIMIT_KEY_SUBSCRIPTION,
57    enums::{DydxWsChannel, DydxWsMessage, NautilusWsMessage},
58    error::DydxWebSocketError,
59    messages::{
60        DydxBlockHeightChannelContents, DydxCandle, DydxMarketsContents, DydxOrderbookContents,
61        DydxOrderbookSnapshotContents, DydxTradeContents, DydxWsBlockHeightMessage,
62        DydxWsCandlesMessage, DydxWsChannelBatchDataMsg, DydxWsChannelDataMsg, DydxWsConnectedMsg,
63        DydxWsFeedMessage, DydxWsGenericMsg, DydxWsMarketsMessage, DydxWsOrderbookMessage,
64        DydxWsSubaccountsChannelContents, DydxWsSubaccountsChannelData, DydxWsSubaccountsMessage,
65        DydxWsSubaccountsSubscribed, DydxWsSubscriptionMsg, DydxWsTradesMessage,
66    },
67};
68use crate::common::parse::parse_instrument_id;
69
70/// Commands sent to the feed handler.
71#[derive(Debug, Clone)]
72pub enum HandlerCommand {
73    /// Update a single instrument in the cache.
74    UpdateInstrument(Box<InstrumentAny>),
75    /// Initialize instruments in bulk.
76    InitializeInstruments(Vec<InstrumentAny>),
77    /// Register a bar type for candle subscriptions.
78    RegisterBarType { topic: String, bar_type: BarType },
79    /// Unregister a bar type for candle subscriptions.
80    UnregisterBarType { topic: String },
81    /// Send a text message via WebSocket.
82    SendText(String),
83}
84
85/// Processes incoming WebSocket messages and converts them to Nautilus domain objects.
86///
87/// The handler owns the WebSocketClient exclusively within the lock-free I/O boundary,
88/// eliminating RwLock contention on the hot path.
89pub struct FeedHandler {
90    /// Account ID for parsing account-specific messages.
91    account_id: Option<AccountId>,
92    /// Command receiver from outer client.
93    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
94    /// Output sender for Nautilus messages.
95    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
96    /// Raw WebSocket message receiver.
97    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
98    /// Owned WebSocket client (no RwLock).
99    client: WebSocketClient,
100    /// Manual disconnect signal.
101    signal: Arc<AtomicBool>,
102    /// Retry manager for WebSocket send operations.
103    retry_manager: RetryManager<DydxWsError>,
104    /// Cached instruments for parsing market data.
105    instruments: AHashMap<Ustr, InstrumentAny>,
106    /// Cached bar types by topic (e.g., "BTC-USD/1MIN").
107    bar_types: AHashMap<String, BarType>,
108    /// Subscription state shared with the outer client for replay/acks.
109    subscriptions: SubscriptionState,
110}
111
112impl Debug for FeedHandler {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        f.debug_struct("FeedHandler")
115            .field("account_id", &self.account_id)
116            .field("instruments_count", &self.instruments.len())
117            .field("bar_types_count", &self.bar_types.len())
118            .finish_non_exhaustive()
119    }
120}
121
122impl FeedHandler {
123    /// Creates a new [`FeedHandler`].
124    #[must_use]
125    pub fn new(
126        account_id: Option<AccountId>,
127        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
128        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
129        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
130        client: WebSocketClient,
131        signal: Arc<AtomicBool>,
132        subscriptions: SubscriptionState,
133    ) -> Self {
134        Self {
135            account_id,
136            cmd_rx,
137            out_tx,
138            raw_rx,
139            client,
140            signal,
141            retry_manager: create_websocket_retry_manager(),
142            instruments: AHashMap::new(),
143            bar_types: AHashMap::new(),
144            subscriptions,
145        }
146    }
147
148    /// Sends a WebSocket message with retry logic.
149    ///
150    /// Uses the configured [`RetryManager`] to handle transient failures.
151    async fn send_with_retry(
152        &self,
153        payload: String,
154        rate_limit_keys: Option<Vec<String>>,
155    ) -> Result<(), DydxWsError> {
156        self.retry_manager
157            .execute_with_retry(
158                "websocket_send",
159                || {
160                    let payload = payload.clone();
161                    let keys = rate_limit_keys.clone();
162                    async move {
163                        self.client
164                            .send_text(payload, keys)
165                            .await
166                            .map_err(|e| DydxWsError::ClientError(format!("Send failed: {e}")))
167                    }
168                },
169                should_retry_dydx_error,
170                create_dydx_timeout_error,
171            )
172            .await
173    }
174
175    /// Main processing loop for the handler.
176    pub async fn run(&mut self) {
177        loop {
178            tokio::select! {
179                Some(cmd) = self.cmd_rx.recv() => {
180                    self.handle_command(cmd).await;
181                }
182
183                Some(msg) = self.raw_rx.recv() => {
184                    if let Some(nautilus_msg) = self.process_raw_message(msg).await
185                        && self.out_tx.send(nautilus_msg).is_err()
186                    {
187                        tracing::debug!("Receiver dropped, stopping handler");
188                        break;
189                    }
190                }
191
192                else => {
193                    tracing::debug!("Handler shutting down: channels closed");
194                    break;
195                }
196            }
197
198            if self.signal.load(Ordering::Relaxed) {
199                tracing::debug!("Handler received stop signal");
200                break;
201            }
202        }
203    }
204
205    /// Processes a raw WebSocket message.
206    async fn process_raw_message(&self, msg: Message) -> Option<NautilusWsMessage> {
207        match msg {
208            Message::Text(txt) => {
209                if txt == RECONNECTED {
210                    if let Err(e) = self.replay_subscriptions().await {
211                        tracing::error!("Failed to replay subscriptions after reconnect: {e}");
212                    }
213                    return Some(NautilusWsMessage::Reconnected);
214                }
215
216                match serde_json::from_str::<serde_json::Value>(&txt) {
217                    Ok(val) => {
218                        let val_clone = val.clone();
219
220                        // Try two-level parsing first (channel → type)
221                        if let Ok(feed_msg) =
222                            serde_json::from_value::<DydxWsFeedMessage>(val.clone())
223                        {
224                            return self.handle_feed_message(feed_msg).await;
225                        }
226
227                        // Fall back to single-level parsing for non-channel messages
228                        // (connected, error, subscribed/unsubscribed without channel data)
229                        match serde_json::from_value::<DydxWsGenericMsg>(val.clone()) {
230                            Ok(meta) => {
231                                let result = if meta.is_connected() {
232                                    serde_json::from_value::<DydxWsConnectedMsg>(val)
233                                        .map(DydxWsMessage::Connected)
234                                } else if meta.is_subscribed() {
235                                    if let Ok(sub_msg) =
236                                        serde_json::from_value::<DydxWsSubscriptionMsg>(val.clone())
237                                    {
238                                        if sub_msg.channel == DydxWsChannel::Subaccounts {
239                                            serde_json::from_value::<DydxWsSubaccountsSubscribed>(
240                                                val.clone(),
241                                            )
242                                            .map(DydxWsMessage::SubaccountsSubscribed)
243                                            .or_else(|e| {
244                                                tracing::debug!(
245                                                    "Failed to parse subaccounts subscription (fallback to standard): {e}. Raw: {}",
246                                                    serde_json::to_string(&val_clone)
247                                                        .unwrap_or_else(|_| "<invalid json>".into())
248                                                );
249                                                Ok(DydxWsMessage::Subscribed(sub_msg))
250                                            })
251                                        } else {
252                                            Ok(DydxWsMessage::Subscribed(sub_msg))
253                                        }
254                                    } else {
255                                        serde_json::from_value::<DydxWsSubscriptionMsg>(val)
256                                            .map(DydxWsMessage::Subscribed)
257                                    }
258                                } else if meta.is_unsubscribed() {
259                                    serde_json::from_value::<DydxWsSubscriptionMsg>(val)
260                                        .map(DydxWsMessage::Unsubscribed)
261                                } else if meta.is_error() {
262                                    serde_json::from_value::<DydxWebSocketError>(val)
263                                        .map(DydxWsMessage::Error)
264                                } else if meta.is_unknown() {
265                                    tracing::warn!(
266                                        "Received unknown WebSocket message type: {}",
267                                        serde_json::to_string(&val_clone)
268                                            .unwrap_or_else(|_| "<invalid json>".into())
269                                    );
270                                    Ok(DydxWsMessage::Raw(val))
271                                } else {
272                                    Ok(DydxWsMessage::Raw(val))
273                                };
274
275                                match result {
276                                    Ok(dydx_msg) => self.handle_dydx_message(dydx_msg).await,
277                                    Err(e) => {
278                                        tracing::error!(
279                                            "Failed to parse WebSocket message: {e}. Message type: {:?}, Channel: {:?}. Raw: {}",
280                                            meta.msg_type,
281                                            meta.channel,
282                                            serde_json::to_string(&val_clone)
283                                                .unwrap_or_else(|_| "<invalid json>".into())
284                                        );
285                                        None
286                                    }
287                                }
288                            }
289                            Err(e) => {
290                                let raw_json = serde_json::to_string_pretty(&val_clone)
291                                    .unwrap_or_else(|_| format!("{val_clone:?}"));
292                                tracing::error!(
293                                    "Failed to parse WebSocket message envelope (DydxWsGenericMsg): {e}\nRaw JSON:\n{}",
294                                    raw_json
295                                );
296                                None
297                            }
298                        }
299                    }
300                    Err(e) => {
301                        let err = DydxWebSocketError::from_message(e.to_string());
302                        Some(NautilusWsMessage::Error(err))
303                    }
304                }
305            }
306            Message::Pong(_data) => None,
307            Message::Ping(_data) => None,  // Handled by lower layers
308            Message::Binary(_bin) => None, // dYdX uses text frames
309            Message::Close(_frame) => {
310                tracing::info!("WebSocket close frame received");
311                None
312            }
313            Message::Frame(_) => None,
314        }
315    }
316
317    /// Handles a parsed dYdX WebSocket message.
318    async fn handle_dydx_message(&self, msg: DydxWsMessage) -> Option<NautilusWsMessage> {
319        match self.handle_message(msg).await {
320            Ok(opt_msg) => opt_msg,
321            Err(e) => {
322                tracing::error!("Error handling message: {e}");
323                None
324            }
325        }
326    }
327
328    /// Handles a two-level channel-tagged feed message.
329    async fn handle_feed_message(&self, feed_msg: DydxWsFeedMessage) -> Option<NautilusWsMessage> {
330        match feed_msg {
331            DydxWsFeedMessage::Subaccounts(msg) => match msg {
332                DydxWsSubaccountsMessage::Subscribed(data) => {
333                    self.handle_dydx_message(DydxWsMessage::SubaccountsSubscribed(data))
334                        .await
335                }
336                DydxWsSubaccountsMessage::ChannelData(data) => {
337                    self.handle_dydx_message(DydxWsMessage::ChannelData(DydxWsChannelDataMsg {
338                        msg_type: data.msg_type,
339                        connection_id: data.connection_id,
340                        message_id: data.message_id,
341                        channel: data.channel,
342                        id: Some(data.id),
343                        contents: serde_json::to_value(&data.contents)
344                            .unwrap_or(serde_json::Value::Null),
345                        version: Some(data.version),
346                    }))
347                    .await
348                }
349            },
350            DydxWsFeedMessage::Orderbook(msg) => match msg {
351                DydxWsOrderbookMessage::Subscribed(data)
352                | DydxWsOrderbookMessage::ChannelData(data) => {
353                    self.handle_dydx_message(DydxWsMessage::ChannelData(data))
354                        .await
355                }
356                DydxWsOrderbookMessage::ChannelBatchData(data) => {
357                    self.handle_dydx_message(DydxWsMessage::ChannelBatchData(data))
358                        .await
359                }
360            },
361            DydxWsFeedMessage::Trades(msg) => match msg {
362                DydxWsTradesMessage::Subscribed(data) | DydxWsTradesMessage::ChannelData(data) => {
363                    self.handle_dydx_message(DydxWsMessage::ChannelData(data))
364                        .await
365                }
366            },
367            DydxWsFeedMessage::Markets(msg) => match msg {
368                DydxWsMarketsMessage::Subscribed(data)
369                | DydxWsMarketsMessage::ChannelData(data) => {
370                    self.handle_dydx_message(DydxWsMessage::ChannelData(data))
371                        .await
372                }
373            },
374            DydxWsFeedMessage::Candles(msg) => match msg {
375                DydxWsCandlesMessage::Subscribed(data)
376                | DydxWsCandlesMessage::ChannelData(data) => {
377                    self.handle_dydx_message(DydxWsMessage::ChannelData(data))
378                        .await
379                }
380            },
381            DydxWsFeedMessage::ParentSubaccounts(msg) => match msg {
382                super::messages::DydxWsParentSubaccountsMessage::Subscribed(data)
383                | super::messages::DydxWsParentSubaccountsMessage::ChannelData(data) => {
384                    self.handle_dydx_message(DydxWsMessage::ChannelData(data))
385                        .await
386                }
387            },
388            DydxWsFeedMessage::BlockHeight(msg) => match msg {
389                DydxWsBlockHeightMessage::Subscribed(data) => {
390                    // Subscribed message uses "height" field, parse directly
391                    match data.contents.height.parse::<u64>() {
392                        Ok(height) => Some(NautilusWsMessage::BlockHeight(height)),
393                        Err(e) => {
394                            tracing::warn!("Failed to parse block height from subscription: {e}");
395                            None
396                        }
397                    }
398                }
399                DydxWsBlockHeightMessage::ChannelData(data) => {
400                    // Channel data uses "blockHeight" field, parse directly
401                    match data.contents.block_height.parse::<u64>() {
402                        Ok(height) => Some(NautilusWsMessage::BlockHeight(height)),
403                        Err(e) => {
404                            tracing::warn!("Failed to parse block height from channel data: {e}");
405                            None
406                        }
407                    }
408                }
409            },
410        }
411    }
412
413    /// Handles a command to update the internal state.
414    async fn handle_command(&mut self, command: HandlerCommand) {
415        match command {
416            HandlerCommand::UpdateInstrument(instrument) => {
417                let symbol = instrument.id().symbol.inner();
418                self.instruments.insert(symbol, *instrument);
419            }
420            HandlerCommand::InitializeInstruments(instruments) => {
421                for instrument in instruments {
422                    let symbol = instrument.id().symbol.inner();
423                    self.instruments.insert(symbol, instrument);
424                }
425            }
426            HandlerCommand::RegisterBarType { topic, bar_type } => {
427                self.bar_types.insert(topic, bar_type);
428            }
429            HandlerCommand::UnregisterBarType { topic } => {
430                self.bar_types.remove(&topic);
431            }
432            HandlerCommand::SendText(text) => {
433                if let Err(e) = self
434                    .send_with_retry(
435                        text,
436                        Some(vec![DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
437                    )
438                    .await
439                {
440                    tracing::error!("Failed to send WebSocket text after retries: {e}");
441                }
442            }
443        }
444    }
445
446    /// Registers a bar type for a specific topic (e.g., "BTC-USD/1MIN").
447    pub fn register_bar_type(&mut self, topic: String, bar_type: BarType) {
448        self.bar_types.insert(topic, bar_type);
449    }
450
451    /// Unregisters a bar type for a specific topic.
452    pub fn unregister_bar_type(&mut self, topic: &str) {
453        self.bar_types.remove(topic);
454    }
455
456    fn topic_from_msg(&self, channel: &super::enums::DydxWsChannel, id: &Option<String>) -> String {
457        match id {
458            Some(id) => format!(
459                "{}{}{}",
460                channel.as_ref(),
461                self.subscriptions.delimiter(),
462                id
463            ),
464            None => channel.as_ref().to_string(),
465        }
466    }
467
468    fn subscription_from_topic(
469        &self,
470        topic: &str,
471        op: super::enums::DydxWsOperation,
472    ) -> Option<super::messages::DydxSubscription> {
473        let (channel, symbol) = nautilus_network::websocket::subscription::split_topic(
474            topic,
475            self.subscriptions.delimiter(),
476        );
477        let channel = super::enums::DydxWsChannel::from_str(channel).ok()?;
478        let id = symbol.map(std::string::ToString::to_string);
479
480        Some(super::messages::DydxSubscription { op, channel, id })
481    }
482
483    async fn replay_subscriptions(&self) -> DydxWsResult<()> {
484        let topics = self.subscriptions.all_topics();
485        for topic in topics {
486            let Some(subscription) =
487                self.subscription_from_topic(&topic, super::enums::DydxWsOperation::Subscribe)
488            else {
489                tracing::warn!("Failed to reconstruct subscription from topic: {topic}");
490                continue;
491            };
492
493            let payload = serde_json::to_string(&subscription)?;
494            self.subscriptions.mark_subscribe(&topic);
495
496            if let Err(e) = self
497                .send_with_retry(
498                    payload,
499                    Some(vec![DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
500                )
501                .await
502            {
503                self.subscriptions.mark_failure(&topic);
504                return Err(e);
505            }
506        }
507
508        Ok(())
509    }
510
511    /// Processes a WebSocket message and converts it to Nautilus domain objects.
512    ///
513    /// # Errors
514    ///
515    /// Returns an error if message parsing fails.
516    #[allow(clippy::result_large_err)]
517    pub async fn handle_message(
518        &self,
519        msg: DydxWsMessage,
520    ) -> DydxWsResult<Option<NautilusWsMessage>> {
521        match msg {
522            DydxWsMessage::Connected(_) => {
523                tracing::info!("dYdX WebSocket connected");
524                Ok(None)
525            }
526            DydxWsMessage::Subscribed(sub) => {
527                tracing::debug!("Subscribed to {} (id: {:?})", sub.channel, sub.id);
528                let topic = self.topic_from_msg(&sub.channel, &sub.id);
529                self.subscriptions.confirm_subscribe(&topic);
530                Ok(None)
531            }
532            DydxWsMessage::SubaccountsSubscribed(msg) => {
533                tracing::debug!("Subaccounts subscribed with initial state");
534                let topic = self.topic_from_msg(&msg.channel, &Some(msg.id.clone()));
535                self.subscriptions.confirm_subscribe(&topic);
536                self.parse_subaccounts_subscribed(&msg)
537            }
538            DydxWsMessage::Unsubscribed(unsub) => {
539                tracing::debug!("Unsubscribed from {} (id: {:?})", unsub.channel, unsub.id);
540                let topic = self.topic_from_msg(&unsub.channel, &unsub.id);
541                self.subscriptions.confirm_unsubscribe(&topic);
542                Ok(None)
543            }
544            DydxWsMessage::ChannelData(data) => self.handle_channel_data(data),
545            DydxWsMessage::ChannelBatchData(data) => self.handle_channel_batch_data(data),
546            DydxWsMessage::BlockHeight(height) => Ok(Some(NautilusWsMessage::BlockHeight(height))),
547            DydxWsMessage::Error(err) => Ok(Some(NautilusWsMessage::Error(err))),
548            DydxWsMessage::Reconnected => {
549                if let Err(e) = self.replay_subscriptions().await {
550                    tracing::error!("Failed to replay subscriptions after reconnect message: {e}");
551                }
552                Ok(Some(NautilusWsMessage::Reconnected))
553            }
554            DydxWsMessage::Pong => Ok(None),
555            DydxWsMessage::Raw(_) => Ok(None),
556        }
557    }
558
559    fn handle_channel_data(
560        &self,
561        data: DydxWsChannelDataMsg,
562    ) -> DydxWsResult<Option<NautilusWsMessage>> {
563        match data.channel {
564            DydxWsChannel::Trades => self.parse_trades(&data),
565            DydxWsChannel::Orderbook => self.parse_orderbook(&data, false),
566            DydxWsChannel::Candles => self.parse_candles(&data),
567            DydxWsChannel::Markets => self.parse_markets(&data),
568            DydxWsChannel::Subaccounts | DydxWsChannel::ParentSubaccounts => {
569                self.parse_subaccounts(&data)
570            }
571            DydxWsChannel::BlockHeight => self.parse_block_height(&data),
572            DydxWsChannel::Unknown => {
573                tracing::warn!(
574                    "Unknown channel data received: id={:?}, msg_type={:?}",
575                    data.id,
576                    data.msg_type
577                );
578                Ok(None)
579            }
580        }
581    }
582
583    fn handle_channel_batch_data(
584        &self,
585        data: DydxWsChannelBatchDataMsg,
586    ) -> DydxWsResult<Option<NautilusWsMessage>> {
587        match data.channel {
588            DydxWsChannel::Orderbook => self.parse_orderbook_batch(&data),
589            _ => {
590                tracing::warn!(
591                    "Unexpected batch data for channel: {:?}, id={:?}",
592                    data.channel,
593                    data.id
594                );
595                Ok(None)
596            }
597        }
598    }
599
600    fn parse_block_height(
601        &self,
602        data: &DydxWsChannelDataMsg,
603    ) -> DydxWsResult<Option<NautilusWsMessage>> {
604        let contents: DydxBlockHeightChannelContents =
605            serde_json::from_value(data.contents.clone()).map_err(|e| {
606                DydxWsError::Parse(format!("Failed to parse block height contents: {e}"))
607            })?;
608
609        let height = contents
610            .block_height
611            .parse::<u64>()
612            .map_err(|e| DydxWsError::Parse(format!("Failed to parse block height: {e}")))?;
613
614        Ok(Some(NautilusWsMessage::BlockHeight(height)))
615    }
616
617    fn parse_trades(&self, data: &DydxWsChannelDataMsg) -> DydxWsResult<Option<NautilusWsMessage>> {
618        let symbol = data
619            .id
620            .as_ref()
621            .ok_or_else(|| DydxWsError::Parse("Missing id for trades channel".into()))?;
622
623        let instrument_id = self.parse_instrument_id(symbol)?;
624        let instrument = self.get_instrument(&instrument_id)?;
625
626        let contents: DydxTradeContents = serde_json::from_value(data.contents.clone())
627            .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade contents: {e}")))?;
628
629        let mut ticks = Vec::new();
630        let ts_init = get_atomic_clock_realtime().get_time_ns();
631
632        for trade in contents.trades {
633            let aggressor_side = match trade.side {
634                OrderSide::Buy => AggressorSide::Buyer,
635                OrderSide::Sell => AggressorSide::Seller,
636                _ => continue, // Skip NoOrderSide
637            };
638
639            let price = Decimal::from_str(&trade.price)
640                .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade price: {e}")))?;
641
642            let size = Decimal::from_str(&trade.size)
643                .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade size: {e}")))?;
644
645            let trade_ts = trade.created_at.timestamp_nanos_opt().ok_or_else(|| {
646                DydxWsError::Parse(format!("Timestamp out of range for trade {}", trade.id))
647            })?;
648
649            let tick = TradeTick::new(
650                instrument_id,
651                Price::from_decimal_dp(price, instrument.price_precision()).map_err(|e| {
652                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
653                })?,
654                Quantity::from_decimal_dp(size, instrument.size_precision()).map_err(|e| {
655                    DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
656                })?,
657                aggressor_side,
658                TradeId::new(&trade.id),
659                UnixNanos::from(trade_ts as u64),
660                ts_init,
661            );
662            ticks.push(Data::Trade(tick));
663        }
664
665        if ticks.is_empty() {
666            Ok(None)
667        } else {
668            Ok(Some(NautilusWsMessage::Data(ticks)))
669        }
670    }
671
672    fn parse_orderbook(
673        &self,
674        data: &DydxWsChannelDataMsg,
675        is_snapshot: bool,
676    ) -> DydxWsResult<Option<NautilusWsMessage>> {
677        let symbol = data
678            .id
679            .as_ref()
680            .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook channel".into()))?;
681
682        let instrument_id = self.parse_instrument_id(symbol)?;
683        let instrument = self.get_instrument(&instrument_id)?;
684
685        let ts_init = get_atomic_clock_realtime().get_time_ns();
686
687        if is_snapshot {
688            let contents: DydxOrderbookSnapshotContents =
689                serde_json::from_value(data.contents.clone()).map_err(|e| {
690                    DydxWsError::Parse(format!("Failed to parse orderbook snapshot: {e}"))
691                })?;
692
693            let deltas = self.parse_orderbook_snapshot(
694                &instrument_id,
695                &contents,
696                instrument.price_precision(),
697                instrument.size_precision(),
698                ts_init,
699            )?;
700
701            Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
702        } else {
703            let contents: DydxOrderbookContents = serde_json::from_value(data.contents.clone())
704                .map_err(|e| {
705                    DydxWsError::Parse(format!("Failed to parse orderbook contents: {e}"))
706                })?;
707
708            let deltas = self.parse_orderbook_deltas(
709                &instrument_id,
710                &contents,
711                instrument.price_precision(),
712                instrument.size_precision(),
713                ts_init,
714            )?;
715
716            Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
717        }
718    }
719
720    fn parse_orderbook_batch(
721        &self,
722        data: &DydxWsChannelBatchDataMsg,
723    ) -> DydxWsResult<Option<NautilusWsMessage>> {
724        let symbol = data
725            .id
726            .as_ref()
727            .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook batch channel".into()))?;
728
729        let instrument_id = self.parse_instrument_id(symbol)?;
730        let instrument = self.get_instrument(&instrument_id)?;
731
732        let contents: Vec<DydxOrderbookContents> = serde_json::from_value(data.contents.clone())
733            .map_err(|e| DydxWsError::Parse(format!("Failed to parse orderbook batch: {e}")))?;
734
735        let ts_init = get_atomic_clock_realtime().get_time_ns();
736        let mut all_deltas = Vec::new();
737
738        let num_messages = contents.len();
739        for (idx, content) in contents.iter().enumerate() {
740            let is_last_message = idx == num_messages - 1;
741            let deltas = self.parse_orderbook_deltas_with_flag(
742                &instrument_id,
743                content,
744                instrument.price_precision(),
745                instrument.size_precision(),
746                ts_init,
747                is_last_message,
748            )?;
749            all_deltas.extend(deltas);
750        }
751
752        let deltas = OrderBookDeltas::new(instrument_id, all_deltas);
753        Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
754    }
755
756    fn parse_orderbook_snapshot(
757        &self,
758        instrument_id: &InstrumentId,
759        contents: &DydxOrderbookSnapshotContents,
760        price_precision: u8,
761        size_precision: u8,
762        ts_init: UnixNanos,
763    ) -> DydxWsResult<OrderBookDeltas> {
764        let mut deltas = Vec::new();
765        deltas.push(OrderBookDelta::clear(*instrument_id, 0, ts_init, ts_init));
766
767        let bids = contents.bids.as_deref().unwrap_or(&[]);
768        let asks = contents.asks.as_deref().unwrap_or(&[]);
769
770        let bids_len = bids.len();
771        let asks_len = asks.len();
772
773        for (idx, bid) in bids.iter().enumerate() {
774            let is_last = idx == bids_len - 1 && asks_len == 0;
775            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
776
777            let price = Decimal::from_str(&bid.price)
778                .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
779
780            let size = Decimal::from_str(&bid.size)
781                .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
782
783            let order = BookOrder::new(
784                OrderSide::Buy,
785                Price::from_decimal_dp(price, price_precision).map_err(|e| {
786                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
787                })?,
788                Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
789                    DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
790                })?,
791                0,
792            );
793
794            deltas.push(OrderBookDelta::new(
795                *instrument_id,
796                BookAction::Add,
797                order,
798                flags,
799                0,
800                ts_init,
801                ts_init,
802            ));
803        }
804
805        for (idx, ask) in asks.iter().enumerate() {
806            let is_last = idx == asks_len - 1;
807            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
808
809            let price = Decimal::from_str(&ask.price)
810                .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
811
812            let size = Decimal::from_str(&ask.size)
813                .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
814
815            let order = BookOrder::new(
816                OrderSide::Sell,
817                Price::from_decimal_dp(price, price_precision).map_err(|e| {
818                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
819                })?,
820                Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
821                    DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
822                })?,
823                0,
824            );
825
826            deltas.push(OrderBookDelta::new(
827                *instrument_id,
828                BookAction::Add,
829                order,
830                flags,
831                0,
832                ts_init,
833                ts_init,
834            ));
835        }
836
837        Ok(OrderBookDeltas::new(*instrument_id, deltas))
838    }
839
840    fn parse_orderbook_deltas(
841        &self,
842        instrument_id: &InstrumentId,
843        contents: &DydxOrderbookContents,
844        price_precision: u8,
845        size_precision: u8,
846        ts_init: UnixNanos,
847    ) -> DydxWsResult<OrderBookDeltas> {
848        let deltas = self.parse_orderbook_deltas_with_flag(
849            instrument_id,
850            contents,
851            price_precision,
852            size_precision,
853            ts_init,
854            true, // Mark as last message by default
855        )?;
856        Ok(OrderBookDeltas::new(*instrument_id, deltas))
857    }
858
859    #[allow(clippy::too_many_arguments)]
860    fn parse_orderbook_deltas_with_flag(
861        &self,
862        instrument_id: &InstrumentId,
863        contents: &DydxOrderbookContents,
864        price_precision: u8,
865        size_precision: u8,
866        ts_init: UnixNanos,
867        is_last_message: bool,
868    ) -> DydxWsResult<Vec<OrderBookDelta>> {
869        let mut deltas = Vec::new();
870
871        let bids = contents.bids.as_deref().unwrap_or(&[]);
872        let asks = contents.asks.as_deref().unwrap_or(&[]);
873
874        let bids_len = bids.len();
875        let asks_len = asks.len();
876
877        for (idx, (price_str, size_str)) in bids.iter().enumerate() {
878            let is_last = is_last_message && idx == bids_len - 1 && asks_len == 0;
879            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
880
881            let price = Decimal::from_str(price_str)
882                .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
883
884            let size = Decimal::from_str(size_str)
885                .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
886
887            let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
888                DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
889            })?;
890            let action = if qty.is_zero() {
891                BookAction::Delete
892            } else {
893                BookAction::Update
894            };
895
896            let order = BookOrder::new(
897                OrderSide::Buy,
898                Price::from_decimal_dp(price, price_precision).map_err(|e| {
899                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
900                })?,
901                qty,
902                0,
903            );
904
905            deltas.push(OrderBookDelta::new(
906                *instrument_id,
907                action,
908                order,
909                flags,
910                0,
911                ts_init,
912                ts_init,
913            ));
914        }
915
916        for (idx, (price_str, size_str)) in asks.iter().enumerate() {
917            let is_last = is_last_message && idx == asks_len - 1;
918            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
919
920            let price = Decimal::from_str(price_str)
921                .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
922
923            let size = Decimal::from_str(size_str)
924                .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
925
926            let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
927                DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
928            })?;
929            let action = if qty.is_zero() {
930                BookAction::Delete
931            } else {
932                BookAction::Update
933            };
934
935            let order = BookOrder::new(
936                OrderSide::Sell,
937                Price::from_decimal_dp(price, price_precision).map_err(|e| {
938                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
939                })?,
940                qty,
941                0,
942            );
943
944            deltas.push(OrderBookDelta::new(
945                *instrument_id,
946                action,
947                order,
948                flags,
949                0,
950                ts_init,
951                ts_init,
952            ));
953        }
954
955        Ok(deltas)
956    }
957
958    fn parse_candles(
959        &self,
960        data: &DydxWsChannelDataMsg,
961    ) -> DydxWsResult<Option<NautilusWsMessage>> {
962        let topic = data
963            .id
964            .as_ref()
965            .ok_or_else(|| DydxWsError::Parse("Missing id for candles channel".into()))?;
966
967        let bar_type = self.bar_types.get(topic).ok_or_else(|| {
968            DydxWsError::Parse(format!("No bar type registered for topic: {topic}"))
969        })?;
970
971        let candle: DydxCandle = serde_json::from_value(data.contents.clone())
972            .map_err(|e| DydxWsError::Parse(format!("Failed to parse candle contents: {e}")))?;
973
974        let instrument_id = self.parse_instrument_id(&candle.ticker)?;
975        let instrument = self.get_instrument(&instrument_id)?;
976
977        let open = Decimal::from_str(&candle.open)
978            .map_err(|e| DydxWsError::Parse(format!("Failed to parse open: {e}")))?;
979        let high = Decimal::from_str(&candle.high)
980            .map_err(|e| DydxWsError::Parse(format!("Failed to parse high: {e}")))?;
981        let low = Decimal::from_str(&candle.low)
982            .map_err(|e| DydxWsError::Parse(format!("Failed to parse low: {e}")))?;
983        let close = Decimal::from_str(&candle.close)
984            .map_err(|e| DydxWsError::Parse(format!("Failed to parse close: {e}")))?;
985        let volume = Decimal::from_str(&candle.base_token_volume)
986            .map_err(|e| DydxWsError::Parse(format!("Failed to parse volume: {e}")))?;
987
988        let ts_init = get_atomic_clock_realtime().get_time_ns();
989
990        let started_at_nanos = candle.started_at.timestamp_nanos_opt().ok_or_else(|| {
991            DydxWsError::Parse(format!(
992                "Timestamp out of range for candle at {}",
993                candle.started_at
994            ))
995        })?;
996        let interval_nanos = get_bar_interval_ns(bar_type);
997        let ts_event = UnixNanos::from(started_at_nanos as u64) + interval_nanos;
998
999        let bar = Bar::new(
1000            *bar_type,
1001            Price::from_decimal_dp(open, instrument.price_precision()).map_err(|e| {
1002                DydxWsError::Parse(format!("Failed to create open Price from decimal: {e}"))
1003            })?,
1004            Price::from_decimal_dp(high, instrument.price_precision()).map_err(|e| {
1005                DydxWsError::Parse(format!("Failed to create high Price from decimal: {e}"))
1006            })?,
1007            Price::from_decimal_dp(low, instrument.price_precision()).map_err(|e| {
1008                DydxWsError::Parse(format!("Failed to create low Price from decimal: {e}"))
1009            })?,
1010            Price::from_decimal_dp(close, instrument.price_precision()).map_err(|e| {
1011                DydxWsError::Parse(format!("Failed to create close Price from decimal: {e}"))
1012            })?,
1013            Quantity::from_decimal_dp(volume, instrument.size_precision()).map_err(|e| {
1014                DydxWsError::Parse(format!(
1015                    "Failed to create volume Quantity from decimal: {e}"
1016                ))
1017            })?,
1018            ts_event,
1019            ts_init,
1020        );
1021
1022        Ok(Some(NautilusWsMessage::Data(vec![Data::Bar(bar)])))
1023    }
1024
1025    fn parse_markets(
1026        &self,
1027        data: &DydxWsChannelDataMsg,
1028    ) -> DydxWsResult<Option<NautilusWsMessage>> {
1029        let contents: DydxMarketsContents = serde_json::from_value(data.contents.clone())
1030            .map_err(|e| DydxWsError::Parse(format!("Failed to parse markets contents: {e}")))?;
1031
1032        // Markets channel provides oracle price updates needed for margin calculations
1033        // Forward to execution client to update oracle_prices map
1034        if let Some(oracle_prices) = contents.oracle_prices {
1035            tracing::debug!(
1036                "Forwarding oracle price updates for {} markets to execution client",
1037                oracle_prices.len()
1038            );
1039            return Ok(Some(NautilusWsMessage::OraclePrices(oracle_prices)));
1040        }
1041
1042        Ok(None)
1043    }
1044
1045    fn parse_subaccounts(
1046        &self,
1047        data: &DydxWsChannelDataMsg,
1048    ) -> DydxWsResult<Option<NautilusWsMessage>> {
1049        let contents: DydxWsSubaccountsChannelContents =
1050            serde_json::from_value(data.contents.clone()).map_err(|e| {
1051                DydxWsError::Parse(format!("Failed to parse subaccounts contents: {e}"))
1052            })?;
1053
1054        let has_orders = contents.orders.as_ref().is_some_and(|o| !o.is_empty());
1055        let has_fills = contents.fills.as_ref().is_some_and(|f| !f.is_empty());
1056
1057        if has_orders || has_fills {
1058            // Forward raw channel data to execution client for parsing
1059            // The execution client has the clob_pair_id and instrument mappings needed
1060            tracing::debug!(
1061                "Received {} order(s), {} fill(s) - forwarding to execution client",
1062                contents.orders.as_ref().map_or(0, |o| o.len()),
1063                contents.fills.as_ref().map_or(0, |f| f.len())
1064            );
1065
1066            let channel_data = DydxWsSubaccountsChannelData {
1067                msg_type: data.msg_type,
1068                connection_id: data.connection_id.clone(),
1069                message_id: data.message_id,
1070                id: data.id.clone().unwrap_or_default(),
1071                channel: data.channel,
1072                version: data.version.clone().unwrap_or_default(),
1073                contents,
1074            };
1075
1076            return Ok(Some(NautilusWsMessage::SubaccountsChannelData(Box::new(
1077                channel_data,
1078            ))));
1079        }
1080
1081        Ok(None)
1082    }
1083
1084    fn parse_subaccounts_subscribed(
1085        &self,
1086        msg: &DydxWsSubaccountsSubscribed,
1087    ) -> DydxWsResult<Option<NautilusWsMessage>> {
1088        // Pass raw subaccount subscription to execution client for parsing
1089        // The execution client has access to instruments and oracle prices needed for margin calculations
1090        tracing::debug!("Forwarding subaccount subscription to execution client");
1091        Ok(Some(NautilusWsMessage::SubaccountSubscribed(Box::new(
1092            msg.clone(),
1093        ))))
1094    }
1095
1096    fn parse_instrument_id(&self, symbol: &str) -> DydxWsResult<InstrumentId> {
1097        // dYdX WS uses raw symbols (e.g., "BTC-USD")
1098        // Need to append "-PERP" to match Nautilus instrument IDs
1099        let symbol_with_perp = format!("{symbol}-PERP");
1100        Ok(parse_instrument_id(&symbol_with_perp))
1101    }
1102
1103    fn get_instrument(&self, instrument_id: &InstrumentId) -> DydxWsResult<&InstrumentAny> {
1104        self.instruments
1105            .get(&instrument_id.symbol.inner())
1106            .ok_or_else(|| DydxWsError::Parse(format!("No instrument cached for {instrument_id}")))
1107    }
1108}
1109
1110/// Determines if a dYdX WebSocket error should trigger a retry.
1111fn should_retry_dydx_error(error: &DydxWsError) -> bool {
1112    match error {
1113        DydxWsError::Transport(_) => true,
1114        DydxWsError::Send(_) => true,
1115        DydxWsError::ClientError(msg) => {
1116            let msg_lower = msg.to_lowercase();
1117            msg_lower.contains("timeout")
1118                || msg_lower.contains("timed out")
1119                || msg_lower.contains("connection")
1120                || msg_lower.contains("network")
1121        }
1122        DydxWsError::NotConnected
1123        | DydxWsError::Json(_)
1124        | DydxWsError::Parse(_)
1125        | DydxWsError::Authentication(_)
1126        | DydxWsError::Subscription(_)
1127        | DydxWsError::Venue(_) => false,
1128    }
1129}
1130
1131/// Creates a timeout error for the retry manager.
1132fn create_dydx_timeout_error(msg: String) -> DydxWsError {
1133    DydxWsError::ClientError(msg)
1134}