nautilus_dydx/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Message handler for dYdX WebSocket streams.
17//!
18//! This module processes incoming WebSocket messages and converts them into
19//! Nautilus domain objects.
20//!
21//! The handler owns the WebSocketClient exclusively and runs in a dedicated
22//! Tokio task within the lock-free I/O boundary.
23
24use std::{
25    fmt::Debug,
26    str::FromStr,
27    sync::{
28        Arc,
29        atomic::{AtomicBool, Ordering},
30    },
31};
32
33use ahash::AHashMap;
34use nautilus_core::{nanos::UnixNanos, time::get_atomic_clock_realtime};
35use nautilus_model::{
36    data::{
37        Bar, BarType, BookOrder, Data, OrderBookDelta, OrderBookDeltas, TradeTick,
38        bar::get_bar_interval_ns,
39    },
40    enums::{AggressorSide, BookAction, OrderSide, RecordFlag},
41    identifiers::{AccountId, InstrumentId, TradeId},
42    instruments::{Instrument, InstrumentAny},
43    types::{Price, Quantity},
44};
45use nautilus_network::{
46    RECONNECTED,
47    retry::{RetryManager, create_websocket_retry_manager},
48    websocket::{SubscriptionState, WebSocketClient},
49};
50use rust_decimal::Decimal;
51use tokio_tungstenite::tungstenite::Message;
52use ustr::Ustr;
53
54use super::{
55    DydxWsError, DydxWsResult,
56    client::DYDX_RATE_LIMIT_KEY_SUBSCRIPTION,
57    enums::{DydxWsChannel, DydxWsMessage, NautilusWsMessage},
58    error::DydxWebSocketError,
59    messages::{
60        DydxBlockHeightChannelContents, DydxCandle, DydxMarketsContents, DydxOrderbookContents,
61        DydxOrderbookSnapshotContents, DydxTradeContents, DydxWsBlockHeightMessage,
62        DydxWsCandlesMessage, DydxWsChannelBatchDataMsg, DydxWsChannelDataMsg, DydxWsConnectedMsg,
63        DydxWsFeedMessage, DydxWsGenericMsg, DydxWsMarketsMessage, DydxWsOrderbookMessage,
64        DydxWsSubaccountsChannelContents, DydxWsSubaccountsChannelData, DydxWsSubaccountsMessage,
65        DydxWsSubaccountsSubscribed, DydxWsSubscriptionMsg, DydxWsTradesMessage,
66    },
67};
68use crate::common::parse::parse_instrument_id;
69
70/// Commands sent to the feed handler.
71#[derive(Debug, Clone)]
72pub enum HandlerCommand {
73    /// Update a single instrument in the cache.
74    UpdateInstrument(Box<InstrumentAny>),
75    /// Initialize instruments in bulk.
76    InitializeInstruments(Vec<InstrumentAny>),
77    /// Register a bar type for candle subscriptions.
78    RegisterBarType { topic: String, bar_type: BarType },
79    /// Unregister a bar type for candle subscriptions.
80    UnregisterBarType { topic: String },
81    /// Send a text message via WebSocket.
82    SendText(String),
83}
84
85/// Processes incoming WebSocket messages and converts them to Nautilus domain objects.
86///
87/// The handler owns the WebSocketClient exclusively within the lock-free I/O boundary,
88/// eliminating RwLock contention on the hot path.
89pub struct FeedHandler {
90    /// Account ID for parsing account-specific messages.
91    account_id: Option<AccountId>,
92    /// Command receiver from outer client.
93    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
94    /// Output sender for Nautilus messages.
95    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
96    /// Raw WebSocket message receiver.
97    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
98    /// Owned WebSocket client (no RwLock).
99    client: WebSocketClient,
100    /// Manual disconnect signal.
101    signal: Arc<AtomicBool>,
102    /// Retry manager for WebSocket send operations.
103    retry_manager: RetryManager<DydxWsError>,
104    /// Cached instruments for parsing market data.
105    instruments: AHashMap<Ustr, InstrumentAny>,
106    /// Cached bar types by topic (e.g., "BTC-USD/1MIN").
107    bar_types: AHashMap<String, BarType>,
108    /// Subscription state shared with the outer client for replay/acks.
109    subscriptions: SubscriptionState,
110}
111
112impl Debug for FeedHandler {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        f.debug_struct(stringify!(FeedHandler))
115            .field("account_id", &self.account_id)
116            .field("instruments_count", &self.instruments.len())
117            .field("bar_types_count", &self.bar_types.len())
118            .finish_non_exhaustive()
119    }
120}
121
122impl FeedHandler {
123    /// Creates a new [`FeedHandler`].
124    #[must_use]
125    pub fn new(
126        account_id: Option<AccountId>,
127        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
128        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
129        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
130        client: WebSocketClient,
131        signal: Arc<AtomicBool>,
132        subscriptions: SubscriptionState,
133    ) -> Self {
134        Self {
135            account_id,
136            cmd_rx,
137            out_tx,
138            raw_rx,
139            client,
140            signal,
141            retry_manager: create_websocket_retry_manager(),
142            instruments: AHashMap::new(),
143            bar_types: AHashMap::new(),
144            subscriptions,
145        }
146    }
147
148    /// Sends a WebSocket message with retry logic.
149    ///
150    /// Uses the configured [`RetryManager`] to handle transient failures.
151    async fn send_with_retry(
152        &self,
153        payload: String,
154        rate_limit_keys: Option<Vec<String>>,
155    ) -> Result<(), DydxWsError> {
156        self.retry_manager
157            .execute_with_retry(
158                "websocket_send",
159                || {
160                    let payload = payload.clone();
161                    let keys = rate_limit_keys.clone();
162                    async move {
163                        self.client
164                            .send_text(payload, keys)
165                            .await
166                            .map_err(|e| DydxWsError::ClientError(format!("Send failed: {e}")))
167                    }
168                },
169                should_retry_dydx_error,
170                create_dydx_timeout_error,
171            )
172            .await
173    }
174
175    /// Main processing loop for the handler.
176    pub async fn run(&mut self) {
177        loop {
178            tokio::select! {
179                Some(cmd) = self.cmd_rx.recv() => {
180                    self.handle_command(cmd).await;
181                }
182
183                Some(msg) = self.raw_rx.recv() => {
184                    if let Some(nautilus_msg) = self.process_raw_message(msg).await
185                        && self.out_tx.send(nautilus_msg).is_err()
186                    {
187                        log::debug!("Receiver dropped, stopping handler");
188                        break;
189                    }
190                }
191
192                else => {
193                    log::debug!("Handler shutting down: channels closed");
194                    break;
195                }
196            }
197
198            if self.signal.load(Ordering::Relaxed) {
199                log::debug!("Handler received stop signal");
200                break;
201            }
202        }
203    }
204
205    /// Processes a raw WebSocket message.
206    async fn process_raw_message(&self, msg: Message) -> Option<NautilusWsMessage> {
207        match msg {
208            Message::Text(txt) => {
209                if txt == RECONNECTED {
210                    if let Err(e) = self.replay_subscriptions().await {
211                        log::error!("Failed to replay subscriptions after reconnect: {e}");
212                    }
213                    return Some(NautilusWsMessage::Reconnected);
214                }
215
216                match serde_json::from_str::<serde_json::Value>(&txt) {
217                    Ok(val) => {
218                        let val_clone = val.clone();
219
220                        // Try two-level parsing first (channel → type)
221                        if let Ok(feed_msg) =
222                            serde_json::from_value::<DydxWsFeedMessage>(val.clone())
223                        {
224                            return self.handle_feed_message(feed_msg).await;
225                        }
226
227                        // Fall back to single-level parsing for non-channel messages
228                        // (connected, error, subscribed/unsubscribed without channel data)
229                        match serde_json::from_value::<DydxWsGenericMsg>(val.clone()) {
230                            Ok(meta) => {
231                                let result = if meta.is_connected() {
232                                    serde_json::from_value::<DydxWsConnectedMsg>(val)
233                                        .map(DydxWsMessage::Connected)
234                                } else if meta.is_subscribed() {
235                                    if let Ok(sub_msg) =
236                                        serde_json::from_value::<DydxWsSubscriptionMsg>(val.clone())
237                                    {
238                                        if sub_msg.channel == DydxWsChannel::Subaccounts {
239                                            serde_json::from_value::<DydxWsSubaccountsSubscribed>(
240                                                val.clone(),
241                                            )
242                                            .map(DydxWsMessage::SubaccountsSubscribed)
243                                            .or_else(|e| {
244                                                log::debug!(
245                                                    "Failed to parse subaccounts subscription (fallback to standard): {e}. Raw: {}",
246                                                    serde_json::to_string(&val_clone)
247                                                        .unwrap_or_else(|_| "<invalid json>".into())
248                                                );
249                                                Ok(DydxWsMessage::Subscribed(sub_msg))
250                                            })
251                                        } else {
252                                            Ok(DydxWsMessage::Subscribed(sub_msg))
253                                        }
254                                    } else {
255                                        serde_json::from_value::<DydxWsSubscriptionMsg>(val)
256                                            .map(DydxWsMessage::Subscribed)
257                                    }
258                                } else if meta.is_unsubscribed() {
259                                    serde_json::from_value::<DydxWsSubscriptionMsg>(val)
260                                        .map(DydxWsMessage::Unsubscribed)
261                                } else if meta.is_error() {
262                                    serde_json::from_value::<DydxWebSocketError>(val)
263                                        .map(DydxWsMessage::Error)
264                                } else if meta.is_unknown() {
265                                    log::warn!(
266                                        "Received unknown WebSocket message type: {}",
267                                        serde_json::to_string(&val_clone)
268                                            .unwrap_or_else(|_| "<invalid json>".into())
269                                    );
270                                    Ok(DydxWsMessage::Raw(val))
271                                } else {
272                                    Ok(DydxWsMessage::Raw(val))
273                                };
274
275                                match result {
276                                    Ok(dydx_msg) => self.handle_dydx_message(dydx_msg).await,
277                                    Err(e) => {
278                                        log::error!(
279                                            "Failed to parse WebSocket message: {e}. Message type: {:?}, Channel: {:?}. Raw: {}",
280                                            meta.msg_type,
281                                            meta.channel,
282                                            serde_json::to_string(&val_clone)
283                                                .unwrap_or_else(|_| "<invalid json>".into())
284                                        );
285                                        None
286                                    }
287                                }
288                            }
289                            Err(e) => {
290                                let raw_json = serde_json::to_string_pretty(&val_clone)
291                                    .unwrap_or_else(|_| format!("{val_clone:?}"));
292                                log::error!(
293                                    "Failed to parse WebSocket message envelope (DydxWsGenericMsg): {e}\nRaw JSON:\n{raw_json}"
294                                );
295                                None
296                            }
297                        }
298                    }
299                    Err(e) => {
300                        let err = DydxWebSocketError::from_message(e.to_string());
301                        Some(NautilusWsMessage::Error(err))
302                    }
303                }
304            }
305            Message::Pong(_data) => None,
306            Message::Ping(_data) => None,  // Handled by lower layers
307            Message::Binary(_bin) => None, // dYdX uses text frames
308            Message::Close(_frame) => {
309                log::info!("WebSocket close frame received");
310                None
311            }
312            Message::Frame(_) => None,
313        }
314    }
315
316    /// Handles a parsed dYdX WebSocket message.
317    async fn handle_dydx_message(&self, msg: DydxWsMessage) -> Option<NautilusWsMessage> {
318        match self.handle_message(msg).await {
319            Ok(opt_msg) => opt_msg,
320            Err(e) => {
321                log::error!("Error handling message: {e}");
322                None
323            }
324        }
325    }
326
327    /// Handles a two-level channel-tagged feed message.
328    async fn handle_feed_message(&self, feed_msg: DydxWsFeedMessage) -> Option<NautilusWsMessage> {
329        match feed_msg {
330            DydxWsFeedMessage::Subaccounts(msg) => match msg {
331                DydxWsSubaccountsMessage::Subscribed(data) => {
332                    self.handle_dydx_message(DydxWsMessage::SubaccountsSubscribed(data))
333                        .await
334                }
335                DydxWsSubaccountsMessage::ChannelData(data) => {
336                    self.handle_dydx_message(DydxWsMessage::ChannelData(DydxWsChannelDataMsg {
337                        msg_type: data.msg_type,
338                        connection_id: data.connection_id,
339                        message_id: data.message_id,
340                        channel: data.channel,
341                        id: Some(data.id),
342                        contents: serde_json::to_value(&data.contents)
343                            .unwrap_or(serde_json::Value::Null),
344                        version: Some(data.version),
345                    }))
346                    .await
347                }
348            },
349            DydxWsFeedMessage::Orderbook(msg) => match msg {
350                DydxWsOrderbookMessage::Subscribed(data)
351                | DydxWsOrderbookMessage::ChannelData(data) => {
352                    self.handle_dydx_message(DydxWsMessage::ChannelData(data))
353                        .await
354                }
355                DydxWsOrderbookMessage::ChannelBatchData(data) => {
356                    self.handle_dydx_message(DydxWsMessage::ChannelBatchData(data))
357                        .await
358                }
359            },
360            DydxWsFeedMessage::Trades(msg) => match msg {
361                DydxWsTradesMessage::Subscribed(data) | DydxWsTradesMessage::ChannelData(data) => {
362                    self.handle_dydx_message(DydxWsMessage::ChannelData(data))
363                        .await
364                }
365            },
366            DydxWsFeedMessage::Markets(msg) => match msg {
367                DydxWsMarketsMessage::Subscribed(data)
368                | DydxWsMarketsMessage::ChannelData(data) => {
369                    self.handle_dydx_message(DydxWsMessage::ChannelData(data))
370                        .await
371                }
372            },
373            DydxWsFeedMessage::Candles(msg) => match msg {
374                DydxWsCandlesMessage::Subscribed(data)
375                | DydxWsCandlesMessage::ChannelData(data) => {
376                    self.handle_dydx_message(DydxWsMessage::ChannelData(data))
377                        .await
378                }
379            },
380            DydxWsFeedMessage::ParentSubaccounts(msg) => match msg {
381                super::messages::DydxWsParentSubaccountsMessage::Subscribed(data)
382                | super::messages::DydxWsParentSubaccountsMessage::ChannelData(data) => {
383                    self.handle_dydx_message(DydxWsMessage::ChannelData(data))
384                        .await
385                }
386            },
387            DydxWsFeedMessage::BlockHeight(msg) => match msg {
388                DydxWsBlockHeightMessage::Subscribed(data) => {
389                    // Subscribed message uses "height" field, parse directly
390                    match data.contents.height.parse::<u64>() {
391                        Ok(height) => Some(NautilusWsMessage::BlockHeight(height)),
392                        Err(e) => {
393                            log::warn!("Failed to parse block height from subscription: {e}");
394                            None
395                        }
396                    }
397                }
398                DydxWsBlockHeightMessage::ChannelData(data) => {
399                    // Channel data uses "blockHeight" field, parse directly
400                    match data.contents.block_height.parse::<u64>() {
401                        Ok(height) => Some(NautilusWsMessage::BlockHeight(height)),
402                        Err(e) => {
403                            log::warn!("Failed to parse block height from channel data: {e}");
404                            None
405                        }
406                    }
407                }
408            },
409        }
410    }
411
412    /// Handles a command to update the internal state.
413    async fn handle_command(&mut self, command: HandlerCommand) {
414        match command {
415            HandlerCommand::UpdateInstrument(instrument) => {
416                let symbol = instrument.id().symbol.inner();
417                self.instruments.insert(symbol, *instrument);
418            }
419            HandlerCommand::InitializeInstruments(instruments) => {
420                for instrument in instruments {
421                    let symbol = instrument.id().symbol.inner();
422                    self.instruments.insert(symbol, instrument);
423                }
424            }
425            HandlerCommand::RegisterBarType { topic, bar_type } => {
426                self.bar_types.insert(topic, bar_type);
427            }
428            HandlerCommand::UnregisterBarType { topic } => {
429                self.bar_types.remove(&topic);
430            }
431            HandlerCommand::SendText(text) => {
432                if let Err(e) = self
433                    .send_with_retry(
434                        text,
435                        Some(vec![DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
436                    )
437                    .await
438                {
439                    log::error!("Failed to send WebSocket text after retries: {e}");
440                }
441            }
442        }
443    }
444
445    /// Registers a bar type for a specific topic (e.g., "BTC-USD/1MIN").
446    pub fn register_bar_type(&mut self, topic: String, bar_type: BarType) {
447        self.bar_types.insert(topic, bar_type);
448    }
449
450    /// Unregisters a bar type for a specific topic.
451    pub fn unregister_bar_type(&mut self, topic: &str) {
452        self.bar_types.remove(topic);
453    }
454
455    fn topic_from_msg(&self, channel: &super::enums::DydxWsChannel, id: &Option<String>) -> String {
456        match id {
457            Some(id) => format!(
458                "{}{}{}",
459                channel.as_ref(),
460                self.subscriptions.delimiter(),
461                id
462            ),
463            None => channel.as_ref().to_string(),
464        }
465    }
466
467    fn subscription_from_topic(
468        &self,
469        topic: &str,
470        op: super::enums::DydxWsOperation,
471    ) -> Option<super::messages::DydxSubscription> {
472        let (channel, symbol) = nautilus_network::websocket::subscription::split_topic(
473            topic,
474            self.subscriptions.delimiter(),
475        );
476        let channel = super::enums::DydxWsChannel::from_str(channel).ok()?;
477        let id = symbol.map(std::string::ToString::to_string);
478
479        Some(super::messages::DydxSubscription { op, channel, id })
480    }
481
482    async fn replay_subscriptions(&self) -> DydxWsResult<()> {
483        let topics = self.subscriptions.all_topics();
484        for topic in topics {
485            let Some(subscription) =
486                self.subscription_from_topic(&topic, super::enums::DydxWsOperation::Subscribe)
487            else {
488                log::warn!("Failed to reconstruct subscription from topic: {topic}");
489                continue;
490            };
491
492            let payload = serde_json::to_string(&subscription)?;
493            self.subscriptions.mark_subscribe(&topic);
494
495            if let Err(e) = self
496                .send_with_retry(
497                    payload,
498                    Some(vec![DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
499                )
500                .await
501            {
502                self.subscriptions.mark_failure(&topic);
503                return Err(e);
504            }
505        }
506
507        Ok(())
508    }
509
510    /// Processes a WebSocket message and converts it to Nautilus domain objects.
511    ///
512    /// # Errors
513    ///
514    /// Returns an error if message parsing fails.
515    #[allow(clippy::result_large_err)]
516    pub async fn handle_message(
517        &self,
518        msg: DydxWsMessage,
519    ) -> DydxWsResult<Option<NautilusWsMessage>> {
520        match msg {
521            DydxWsMessage::Connected(_) => {
522                log::info!("dYdX WebSocket connected");
523                Ok(None)
524            }
525            DydxWsMessage::Subscribed(sub) => {
526                log::debug!("Subscribed to {} (id: {:?})", sub.channel, sub.id);
527                let topic = self.topic_from_msg(&sub.channel, &sub.id);
528                self.subscriptions.confirm_subscribe(&topic);
529                Ok(None)
530            }
531            DydxWsMessage::SubaccountsSubscribed(msg) => {
532                log::debug!("Subaccounts subscribed with initial state");
533                let topic = self.topic_from_msg(&msg.channel, &Some(msg.id.clone()));
534                self.subscriptions.confirm_subscribe(&topic);
535                self.parse_subaccounts_subscribed(&msg)
536            }
537            DydxWsMessage::Unsubscribed(unsub) => {
538                log::debug!("Unsubscribed from {} (id: {:?})", unsub.channel, unsub.id);
539                let topic = self.topic_from_msg(&unsub.channel, &unsub.id);
540                self.subscriptions.confirm_unsubscribe(&topic);
541                Ok(None)
542            }
543            DydxWsMessage::ChannelData(data) => self.handle_channel_data(data),
544            DydxWsMessage::ChannelBatchData(data) => self.handle_channel_batch_data(data),
545            DydxWsMessage::BlockHeight(height) => Ok(Some(NautilusWsMessage::BlockHeight(height))),
546            DydxWsMessage::Error(err) => Ok(Some(NautilusWsMessage::Error(err))),
547            DydxWsMessage::Reconnected => {
548                if let Err(e) = self.replay_subscriptions().await {
549                    log::error!("Failed to replay subscriptions after reconnect message: {e}");
550                }
551                Ok(Some(NautilusWsMessage::Reconnected))
552            }
553            DydxWsMessage::Pong => Ok(None),
554            DydxWsMessage::Raw(_) => Ok(None),
555        }
556    }
557
558    fn handle_channel_data(
559        &self,
560        data: DydxWsChannelDataMsg,
561    ) -> DydxWsResult<Option<NautilusWsMessage>> {
562        match data.channel {
563            DydxWsChannel::Trades => self.parse_trades(&data),
564            DydxWsChannel::Orderbook => self.parse_orderbook(&data, false),
565            DydxWsChannel::Candles => self.parse_candles(&data),
566            DydxWsChannel::Markets => self.parse_markets(&data),
567            DydxWsChannel::Subaccounts | DydxWsChannel::ParentSubaccounts => {
568                self.parse_subaccounts(&data)
569            }
570            DydxWsChannel::BlockHeight => self.parse_block_height(&data),
571            DydxWsChannel::Unknown => {
572                log::warn!(
573                    "Unknown channel data received: id={:?}, msg_type={:?}",
574                    data.id,
575                    data.msg_type
576                );
577                Ok(None)
578            }
579        }
580    }
581
582    fn handle_channel_batch_data(
583        &self,
584        data: DydxWsChannelBatchDataMsg,
585    ) -> DydxWsResult<Option<NautilusWsMessage>> {
586        match data.channel {
587            DydxWsChannel::Orderbook => self.parse_orderbook_batch(&data),
588            _ => {
589                log::warn!(
590                    "Unexpected batch data for channel: {:?}, id={:?}",
591                    data.channel,
592                    data.id
593                );
594                Ok(None)
595            }
596        }
597    }
598
599    fn parse_block_height(
600        &self,
601        data: &DydxWsChannelDataMsg,
602    ) -> DydxWsResult<Option<NautilusWsMessage>> {
603        let contents: DydxBlockHeightChannelContents =
604            serde_json::from_value(data.contents.clone()).map_err(|e| {
605                DydxWsError::Parse(format!("Failed to parse block height contents: {e}"))
606            })?;
607
608        let height = contents
609            .block_height
610            .parse::<u64>()
611            .map_err(|e| DydxWsError::Parse(format!("Failed to parse block height: {e}")))?;
612
613        Ok(Some(NautilusWsMessage::BlockHeight(height)))
614    }
615
616    fn parse_trades(&self, data: &DydxWsChannelDataMsg) -> DydxWsResult<Option<NautilusWsMessage>> {
617        let symbol = data
618            .id
619            .as_ref()
620            .ok_or_else(|| DydxWsError::Parse("Missing id for trades channel".into()))?;
621
622        let instrument_id = self.parse_instrument_id(symbol)?;
623        let instrument = self.get_instrument(&instrument_id)?;
624
625        let contents: DydxTradeContents = serde_json::from_value(data.contents.clone())
626            .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade contents: {e}")))?;
627
628        let mut ticks = Vec::new();
629        let ts_init = get_atomic_clock_realtime().get_time_ns();
630
631        for trade in contents.trades {
632            let aggressor_side = match trade.side {
633                OrderSide::Buy => AggressorSide::Buyer,
634                OrderSide::Sell => AggressorSide::Seller,
635                _ => continue, // Skip NoOrderSide
636            };
637
638            let price = Decimal::from_str(&trade.price)
639                .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade price: {e}")))?;
640
641            let size = Decimal::from_str(&trade.size)
642                .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade size: {e}")))?;
643
644            let trade_ts = trade.created_at.timestamp_nanos_opt().ok_or_else(|| {
645                DydxWsError::Parse(format!("Timestamp out of range for trade {}", trade.id))
646            })?;
647
648            let tick = TradeTick::new(
649                instrument_id,
650                Price::from_decimal_dp(price, instrument.price_precision()).map_err(|e| {
651                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
652                })?,
653                Quantity::from_decimal_dp(size, instrument.size_precision()).map_err(|e| {
654                    DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
655                })?,
656                aggressor_side,
657                TradeId::new(&trade.id),
658                UnixNanos::from(trade_ts as u64),
659                ts_init,
660            );
661            ticks.push(Data::Trade(tick));
662        }
663
664        if ticks.is_empty() {
665            Ok(None)
666        } else {
667            Ok(Some(NautilusWsMessage::Data(ticks)))
668        }
669    }
670
671    fn parse_orderbook(
672        &self,
673        data: &DydxWsChannelDataMsg,
674        is_snapshot: bool,
675    ) -> DydxWsResult<Option<NautilusWsMessage>> {
676        let symbol = data
677            .id
678            .as_ref()
679            .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook channel".into()))?;
680
681        let instrument_id = self.parse_instrument_id(symbol)?;
682        let instrument = self.get_instrument(&instrument_id)?;
683
684        let ts_init = get_atomic_clock_realtime().get_time_ns();
685
686        if is_snapshot {
687            let contents: DydxOrderbookSnapshotContents =
688                serde_json::from_value(data.contents.clone()).map_err(|e| {
689                    DydxWsError::Parse(format!("Failed to parse orderbook snapshot: {e}"))
690                })?;
691
692            let deltas = self.parse_orderbook_snapshot(
693                &instrument_id,
694                &contents,
695                instrument.price_precision(),
696                instrument.size_precision(),
697                ts_init,
698            )?;
699
700            Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
701        } else {
702            let contents: DydxOrderbookContents = serde_json::from_value(data.contents.clone())
703                .map_err(|e| {
704                    DydxWsError::Parse(format!("Failed to parse orderbook contents: {e}"))
705                })?;
706
707            let deltas = self.parse_orderbook_deltas(
708                &instrument_id,
709                &contents,
710                instrument.price_precision(),
711                instrument.size_precision(),
712                ts_init,
713            )?;
714
715            Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
716        }
717    }
718
719    fn parse_orderbook_batch(
720        &self,
721        data: &DydxWsChannelBatchDataMsg,
722    ) -> DydxWsResult<Option<NautilusWsMessage>> {
723        let symbol = data
724            .id
725            .as_ref()
726            .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook batch channel".into()))?;
727
728        let instrument_id = self.parse_instrument_id(symbol)?;
729        let instrument = self.get_instrument(&instrument_id)?;
730
731        let contents: Vec<DydxOrderbookContents> = serde_json::from_value(data.contents.clone())
732            .map_err(|e| DydxWsError::Parse(format!("Failed to parse orderbook batch: {e}")))?;
733
734        let ts_init = get_atomic_clock_realtime().get_time_ns();
735        let mut all_deltas = Vec::new();
736
737        let num_messages = contents.len();
738        for (idx, content) in contents.iter().enumerate() {
739            let is_last_message = idx == num_messages - 1;
740            let deltas = self.parse_orderbook_deltas_with_flag(
741                &instrument_id,
742                content,
743                instrument.price_precision(),
744                instrument.size_precision(),
745                ts_init,
746                is_last_message,
747            )?;
748            all_deltas.extend(deltas);
749        }
750
751        let deltas = OrderBookDeltas::new(instrument_id, all_deltas);
752        Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
753    }
754
755    fn parse_orderbook_snapshot(
756        &self,
757        instrument_id: &InstrumentId,
758        contents: &DydxOrderbookSnapshotContents,
759        price_precision: u8,
760        size_precision: u8,
761        ts_init: UnixNanos,
762    ) -> DydxWsResult<OrderBookDeltas> {
763        let mut deltas = Vec::new();
764        deltas.push(OrderBookDelta::clear(*instrument_id, 0, ts_init, ts_init));
765
766        let bids = contents.bids.as_deref().unwrap_or(&[]);
767        let asks = contents.asks.as_deref().unwrap_or(&[]);
768
769        let bids_len = bids.len();
770        let asks_len = asks.len();
771
772        for (idx, bid) in bids.iter().enumerate() {
773            let is_last = idx == bids_len - 1 && asks_len == 0;
774            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
775
776            let price = Decimal::from_str(&bid.price)
777                .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
778
779            let size = Decimal::from_str(&bid.size)
780                .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
781
782            let order = BookOrder::new(
783                OrderSide::Buy,
784                Price::from_decimal_dp(price, price_precision).map_err(|e| {
785                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
786                })?,
787                Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
788                    DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
789                })?,
790                0,
791            );
792
793            deltas.push(OrderBookDelta::new(
794                *instrument_id,
795                BookAction::Add,
796                order,
797                flags,
798                0,
799                ts_init,
800                ts_init,
801            ));
802        }
803
804        for (idx, ask) in asks.iter().enumerate() {
805            let is_last = idx == asks_len - 1;
806            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
807
808            let price = Decimal::from_str(&ask.price)
809                .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
810
811            let size = Decimal::from_str(&ask.size)
812                .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
813
814            let order = BookOrder::new(
815                OrderSide::Sell,
816                Price::from_decimal_dp(price, price_precision).map_err(|e| {
817                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
818                })?,
819                Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
820                    DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
821                })?,
822                0,
823            );
824
825            deltas.push(OrderBookDelta::new(
826                *instrument_id,
827                BookAction::Add,
828                order,
829                flags,
830                0,
831                ts_init,
832                ts_init,
833            ));
834        }
835
836        Ok(OrderBookDeltas::new(*instrument_id, deltas))
837    }
838
839    fn parse_orderbook_deltas(
840        &self,
841        instrument_id: &InstrumentId,
842        contents: &DydxOrderbookContents,
843        price_precision: u8,
844        size_precision: u8,
845        ts_init: UnixNanos,
846    ) -> DydxWsResult<OrderBookDeltas> {
847        let deltas = self.parse_orderbook_deltas_with_flag(
848            instrument_id,
849            contents,
850            price_precision,
851            size_precision,
852            ts_init,
853            true, // Mark as last message by default
854        )?;
855        Ok(OrderBookDeltas::new(*instrument_id, deltas))
856    }
857
858    #[allow(clippy::too_many_arguments)]
859    fn parse_orderbook_deltas_with_flag(
860        &self,
861        instrument_id: &InstrumentId,
862        contents: &DydxOrderbookContents,
863        price_precision: u8,
864        size_precision: u8,
865        ts_init: UnixNanos,
866        is_last_message: bool,
867    ) -> DydxWsResult<Vec<OrderBookDelta>> {
868        let mut deltas = Vec::new();
869
870        let bids = contents.bids.as_deref().unwrap_or(&[]);
871        let asks = contents.asks.as_deref().unwrap_or(&[]);
872
873        let bids_len = bids.len();
874        let asks_len = asks.len();
875
876        for (idx, (price_str, size_str)) in bids.iter().enumerate() {
877            let is_last = is_last_message && idx == bids_len - 1 && asks_len == 0;
878            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
879
880            let price = Decimal::from_str(price_str)
881                .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
882
883            let size = Decimal::from_str(size_str)
884                .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
885
886            let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
887                DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
888            })?;
889            let action = if qty.is_zero() {
890                BookAction::Delete
891            } else {
892                BookAction::Update
893            };
894
895            let order = BookOrder::new(
896                OrderSide::Buy,
897                Price::from_decimal_dp(price, price_precision).map_err(|e| {
898                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
899                })?,
900                qty,
901                0,
902            );
903
904            deltas.push(OrderBookDelta::new(
905                *instrument_id,
906                action,
907                order,
908                flags,
909                0,
910                ts_init,
911                ts_init,
912            ));
913        }
914
915        for (idx, (price_str, size_str)) in asks.iter().enumerate() {
916            let is_last = is_last_message && idx == asks_len - 1;
917            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
918
919            let price = Decimal::from_str(price_str)
920                .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
921
922            let size = Decimal::from_str(size_str)
923                .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
924
925            let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
926                DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
927            })?;
928            let action = if qty.is_zero() {
929                BookAction::Delete
930            } else {
931                BookAction::Update
932            };
933
934            let order = BookOrder::new(
935                OrderSide::Sell,
936                Price::from_decimal_dp(price, price_precision).map_err(|e| {
937                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
938                })?,
939                qty,
940                0,
941            );
942
943            deltas.push(OrderBookDelta::new(
944                *instrument_id,
945                action,
946                order,
947                flags,
948                0,
949                ts_init,
950                ts_init,
951            ));
952        }
953
954        Ok(deltas)
955    }
956
957    fn parse_candles(
958        &self,
959        data: &DydxWsChannelDataMsg,
960    ) -> DydxWsResult<Option<NautilusWsMessage>> {
961        let topic = data
962            .id
963            .as_ref()
964            .ok_or_else(|| DydxWsError::Parse("Missing id for candles channel".into()))?;
965
966        let bar_type = self.bar_types.get(topic).ok_or_else(|| {
967            DydxWsError::Parse(format!("No bar type registered for topic: {topic}"))
968        })?;
969
970        let candle: DydxCandle = serde_json::from_value(data.contents.clone())
971            .map_err(|e| DydxWsError::Parse(format!("Failed to parse candle contents: {e}")))?;
972
973        let instrument_id = self.parse_instrument_id(&candle.ticker)?;
974        let instrument = self.get_instrument(&instrument_id)?;
975
976        let open = Decimal::from_str(&candle.open)
977            .map_err(|e| DydxWsError::Parse(format!("Failed to parse open: {e}")))?;
978        let high = Decimal::from_str(&candle.high)
979            .map_err(|e| DydxWsError::Parse(format!("Failed to parse high: {e}")))?;
980        let low = Decimal::from_str(&candle.low)
981            .map_err(|e| DydxWsError::Parse(format!("Failed to parse low: {e}")))?;
982        let close = Decimal::from_str(&candle.close)
983            .map_err(|e| DydxWsError::Parse(format!("Failed to parse close: {e}")))?;
984        let volume = Decimal::from_str(&candle.base_token_volume)
985            .map_err(|e| DydxWsError::Parse(format!("Failed to parse volume: {e}")))?;
986
987        let ts_init = get_atomic_clock_realtime().get_time_ns();
988
989        let started_at_nanos = candle.started_at.timestamp_nanos_opt().ok_or_else(|| {
990            DydxWsError::Parse(format!(
991                "Timestamp out of range for candle at {}",
992                candle.started_at
993            ))
994        })?;
995        let interval_nanos = get_bar_interval_ns(bar_type);
996        let ts_event = UnixNanos::from(started_at_nanos as u64) + interval_nanos;
997
998        let bar = Bar::new(
999            *bar_type,
1000            Price::from_decimal_dp(open, instrument.price_precision()).map_err(|e| {
1001                DydxWsError::Parse(format!("Failed to create open Price from decimal: {e}"))
1002            })?,
1003            Price::from_decimal_dp(high, instrument.price_precision()).map_err(|e| {
1004                DydxWsError::Parse(format!("Failed to create high Price from decimal: {e}"))
1005            })?,
1006            Price::from_decimal_dp(low, instrument.price_precision()).map_err(|e| {
1007                DydxWsError::Parse(format!("Failed to create low Price from decimal: {e}"))
1008            })?,
1009            Price::from_decimal_dp(close, instrument.price_precision()).map_err(|e| {
1010                DydxWsError::Parse(format!("Failed to create close Price from decimal: {e}"))
1011            })?,
1012            Quantity::from_decimal_dp(volume, instrument.size_precision()).map_err(|e| {
1013                DydxWsError::Parse(format!(
1014                    "Failed to create volume Quantity from decimal: {e}"
1015                ))
1016            })?,
1017            ts_event,
1018            ts_init,
1019        );
1020
1021        Ok(Some(NautilusWsMessage::Data(vec![Data::Bar(bar)])))
1022    }
1023
1024    fn parse_markets(
1025        &self,
1026        data: &DydxWsChannelDataMsg,
1027    ) -> DydxWsResult<Option<NautilusWsMessage>> {
1028        let contents: DydxMarketsContents = serde_json::from_value(data.contents.clone())
1029            .map_err(|e| DydxWsError::Parse(format!("Failed to parse markets contents: {e}")))?;
1030
1031        // Markets channel provides oracle price updates needed for margin calculations
1032        // Forward to execution client to update oracle_prices map
1033        if let Some(oracle_prices) = contents.oracle_prices {
1034            log::debug!(
1035                "Forwarding oracle price updates for {} markets to execution client",
1036                oracle_prices.len()
1037            );
1038            return Ok(Some(NautilusWsMessage::OraclePrices(oracle_prices)));
1039        }
1040
1041        Ok(None)
1042    }
1043
1044    fn parse_subaccounts(
1045        &self,
1046        data: &DydxWsChannelDataMsg,
1047    ) -> DydxWsResult<Option<NautilusWsMessage>> {
1048        let contents: DydxWsSubaccountsChannelContents =
1049            serde_json::from_value(data.contents.clone()).map_err(|e| {
1050                DydxWsError::Parse(format!("Failed to parse subaccounts contents: {e}"))
1051            })?;
1052
1053        let has_orders = contents.orders.as_ref().is_some_and(|o| !o.is_empty());
1054        let has_fills = contents.fills.as_ref().is_some_and(|f| !f.is_empty());
1055
1056        if has_orders || has_fills {
1057            // Forward raw channel data to execution client for parsing
1058            // The execution client has the clob_pair_id and instrument mappings needed
1059            log::debug!(
1060                "Received {} order(s), {} fill(s) - forwarding to execution client",
1061                contents.orders.as_ref().map_or(0, |o| o.len()),
1062                contents.fills.as_ref().map_or(0, |f| f.len())
1063            );
1064
1065            let channel_data = DydxWsSubaccountsChannelData {
1066                msg_type: data.msg_type,
1067                connection_id: data.connection_id.clone(),
1068                message_id: data.message_id,
1069                id: data.id.clone().unwrap_or_default(),
1070                channel: data.channel,
1071                version: data.version.clone().unwrap_or_default(),
1072                contents,
1073            };
1074
1075            return Ok(Some(NautilusWsMessage::SubaccountsChannelData(Box::new(
1076                channel_data,
1077            ))));
1078        }
1079
1080        Ok(None)
1081    }
1082
1083    fn parse_subaccounts_subscribed(
1084        &self,
1085        msg: &DydxWsSubaccountsSubscribed,
1086    ) -> DydxWsResult<Option<NautilusWsMessage>> {
1087        // Pass raw subaccount subscription to execution client for parsing
1088        // The execution client has access to instruments and oracle prices needed for margin calculations
1089        log::debug!("Forwarding subaccount subscription to execution client");
1090        Ok(Some(NautilusWsMessage::SubaccountSubscribed(Box::new(
1091            msg.clone(),
1092        ))))
1093    }
1094
1095    fn parse_instrument_id(&self, symbol: &str) -> DydxWsResult<InstrumentId> {
1096        // dYdX WS uses raw symbols (e.g., "BTC-USD")
1097        // Need to append "-PERP" to match Nautilus instrument IDs
1098        let symbol_with_perp = format!("{symbol}-PERP");
1099        Ok(parse_instrument_id(&symbol_with_perp))
1100    }
1101
1102    fn get_instrument(&self, instrument_id: &InstrumentId) -> DydxWsResult<&InstrumentAny> {
1103        self.instruments
1104            .get(&instrument_id.symbol.inner())
1105            .ok_or_else(|| DydxWsError::Parse(format!("No instrument cached for {instrument_id}")))
1106    }
1107}
1108
1109/// Determines if a dYdX WebSocket error should trigger a retry.
1110fn should_retry_dydx_error(error: &DydxWsError) -> bool {
1111    match error {
1112        DydxWsError::Transport(_) => true,
1113        DydxWsError::Send(_) => true,
1114        DydxWsError::ClientError(msg) => {
1115            let msg_lower = msg.to_lowercase();
1116            msg_lower.contains("timeout")
1117                || msg_lower.contains("timed out")
1118                || msg_lower.contains("connection")
1119                || msg_lower.contains("network")
1120        }
1121        DydxWsError::NotConnected
1122        | DydxWsError::Json(_)
1123        | DydxWsError::Parse(_)
1124        | DydxWsError::Authentication(_)
1125        | DydxWsError::Subscription(_)
1126        | DydxWsError::Venue(_) => false,
1127    }
1128}
1129
1130/// Creates a timeout error for the retry manager.
1131fn create_dydx_timeout_error(msg: String) -> DydxWsError {
1132    DydxWsError::ClientError(msg)
1133}