Skip to main content

nautilus_dydx/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Message handler for dYdX WebSocket streams.
17//!
18//! This module processes incoming WebSocket messages and converts them into
19//! Nautilus domain objects.
20//!
21//! The handler owns the WebSocketClient exclusively and runs in a dedicated
22//! Tokio task within the lock-free I/O boundary.
23
24use std::{
25    collections::VecDeque,
26    fmt::Debug,
27    sync::{
28        Arc,
29        atomic::{AtomicBool, Ordering},
30    },
31};
32
33use ahash::AHashMap;
34use nautilus_core::time::get_atomic_clock_realtime;
35use nautilus_model::{
36    data::{
37        Bar, BarType, Data, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate, OrderBookDeltas,
38    },
39    identifiers::{AccountId, InstrumentId},
40    instruments::{Instrument, InstrumentAny},
41    types::Price,
42};
43use nautilus_network::{
44    RECONNECTED,
45    retry::{RetryManager, create_websocket_retry_manager},
46    websocket::{SubscriptionState, WebSocketClient},
47};
48use rust_decimal::Decimal;
49use tokio_tungstenite::tungstenite::Message;
50use ustr::Ustr;
51
52use super::{
53    DydxWsError, DydxWsResult,
54    client::DYDX_RATE_LIMIT_KEY_SUBSCRIPTION,
55    enums::{DydxWsChannel, DydxWsMessage, NautilusWsMessage},
56    error::DydxWebSocketError,
57    messages::{
58        DydxCandle, DydxMarketsContents, DydxOrderbookContents, DydxOrderbookSnapshotContents,
59        DydxSubscription, DydxTradeContents, DydxWsBlockHeightMessage, DydxWsCandlesMessage,
60        DydxWsChannelBatchDataMsg, DydxWsChannelDataMsg, DydxWsConnectedMsg, DydxWsFeedMessage,
61        DydxWsGenericMsg, DydxWsMarketsMessage, DydxWsOrderbookMessage,
62        DydxWsParentSubaccountsMessage, DydxWsSubaccountsChannelContents,
63        DydxWsSubaccountsChannelData, DydxWsSubaccountsMessage, DydxWsSubaccountsSubscribed,
64        DydxWsSubscriptionMsg, DydxWsTradesMessage,
65    },
66    parse as ws_parse,
67};
68use crate::common::parse::parse_instrument_id;
69
70/// Commands sent to the feed handler.
71#[derive(Debug, Clone)]
72pub enum HandlerCommand {
73    /// Update a single instrument in the cache.
74    UpdateInstrument(Box<InstrumentAny>),
75    /// Initialize instruments in bulk.
76    InitializeInstruments(Vec<InstrumentAny>),
77    /// Register a bar type for candle subscriptions.
78    RegisterBarType { topic: String, bar_type: BarType },
79    /// Unregister a bar type for candle subscriptions.
80    UnregisterBarType { topic: String },
81    /// Register a subscription message for replay.
82    RegisterSubscription {
83        topic: String,
84        subscription: DydxSubscription,
85    },
86    /// Unregister a subscription message.
87    UnregisterSubscription { topic: String },
88    /// Send a text message via WebSocket.
89    SendText(String),
90}
91
92/// Processes incoming WebSocket messages and converts them to Nautilus domain objects.
93///
94/// The handler owns the WebSocketClient exclusively within the lock-free I/O boundary,
95/// eliminating RwLock contention on the hot path.
96pub struct FeedHandler {
97    /// Account ID for parsing account-specific messages.
98    account_id: Option<AccountId>,
99    /// Command receiver from outer client.
100    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
101    /// Output sender for Nautilus messages.
102    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
103    /// Raw WebSocket message receiver.
104    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
105    /// Owned WebSocket client (no RwLock).
106    client: WebSocketClient,
107    /// Manual disconnect signal.
108    signal: Arc<AtomicBool>,
109    /// Retry manager for WebSocket send operations.
110    retry_manager: RetryManager<DydxWsError>,
111    /// Cached instruments for parsing market data.
112    instruments: AHashMap<Ustr, InstrumentAny>,
113    /// Cached bar types by topic (e.g., "BTC-USD/1MIN").
114    bar_types: AHashMap<String, BarType>,
115    /// Subscription state shared with the outer client for replay/acks.
116    subscriptions: SubscriptionState,
117    /// Original subscription messages by topic (for replay without reconstruction).
118    subscription_messages: AHashMap<String, DydxSubscription>,
119    /// Buffer for multiple messages produced from a single raw message.
120    message_buffer: VecDeque<NautilusWsMessage>,
121    /// Tracks last seen message_id per orderbook topic for gap detection.
122    book_sequence: AHashMap<String, u64>,
123    /// Pending (incomplete) bars per candle topic for emit-on-next logic.
124    pending_bars: AHashMap<String, Bar>,
125    /// Whether to timestamp bars at close time (open + interval).
126    bars_timestamp_on_close: bool,
127}
128
129impl Debug for FeedHandler {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        f.debug_struct(stringify!(FeedHandler))
132            .field("account_id", &self.account_id)
133            .field("instruments_count", &self.instruments.len())
134            .field("bar_types_count", &self.bar_types.len())
135            .finish_non_exhaustive()
136    }
137}
138
139impl FeedHandler {
140    /// Creates a new [`FeedHandler`].
141    #[must_use]
142    #[allow(clippy::too_many_arguments)]
143    pub fn new(
144        account_id: Option<AccountId>,
145        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
146        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
147        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
148        client: WebSocketClient,
149        signal: Arc<AtomicBool>,
150        subscriptions: SubscriptionState,
151        bars_timestamp_on_close: bool,
152    ) -> Self {
153        Self {
154            account_id,
155            cmd_rx,
156            out_tx,
157            raw_rx,
158            client,
159            signal,
160            retry_manager: create_websocket_retry_manager(),
161            instruments: AHashMap::new(),
162            bar_types: AHashMap::new(),
163            subscriptions,
164            subscription_messages: AHashMap::new(),
165            message_buffer: VecDeque::new(),
166            book_sequence: AHashMap::new(),
167            pending_bars: AHashMap::new(),
168            bars_timestamp_on_close,
169        }
170    }
171
172    /// Sends a WebSocket message with retry logic.
173    ///
174    /// Uses the configured [`RetryManager`] to handle transient failures.
175    async fn send_with_retry(
176        &self,
177        payload: String,
178        rate_limit_keys: Option<&[Ustr]>,
179    ) -> Result<(), DydxWsError> {
180        let keys_owned: Option<Vec<Ustr>> = rate_limit_keys.map(|k| k.to_vec());
181        self.retry_manager
182            .execute_with_retry(
183                "websocket_send",
184                || {
185                    let payload = payload.clone();
186                    let keys = keys_owned.clone();
187                    async move {
188                        self.client
189                            .send_text(payload, keys.as_deref())
190                            .await
191                            .map_err(|e| DydxWsError::ClientError(format!("Send failed: {e}")))
192                    }
193                },
194                should_retry_dydx_error,
195                create_dydx_timeout_error,
196            )
197            .await
198    }
199
200    /// Main processing loop for the handler.
201    ///
202    /// # Panics
203    ///
204    /// This method will not panic. The `expect` call on `iter.next()` is safe
205    /// because we explicitly check that `nautilus_msgs` is not empty before
206    /// calling it.
207    pub async fn run(&mut self) {
208        log::debug!("WebSocket handler started");
209        loop {
210            // First drain any buffered messages
211            if !self.message_buffer.is_empty() {
212                let nautilus_msg = self.message_buffer.pop_front().unwrap();
213                if self.out_tx.send(nautilus_msg).is_err() {
214                    log::debug!("Receiver dropped, stopping handler");
215                    break;
216                }
217                continue;
218            }
219
220            tokio::select! {
221                Some(cmd) = self.cmd_rx.recv() => {
222                    self.handle_command(cmd).await;
223                }
224
225                Some(msg) = self.raw_rx.recv() => {
226                    log::trace!("Handler received raw message");
227                    let nautilus_msgs = self.process_raw_message(msg).await;
228                    if !nautilus_msgs.is_empty() {
229                        let mut iter = nautilus_msgs.into_iter();
230                        // SAFETY: We just checked that nautilus_msgs is not empty
231                        let first = iter.next().expect("non-empty vec has first element");
232                        self.message_buffer.extend(iter);
233                        log::trace!("Handler sending message: {:?}", std::mem::discriminant(&first));
234                        if self.out_tx.send(first).is_err() {
235                            log::debug!("Receiver dropped, stopping handler");
236                            break;
237                        }
238                    }
239                }
240
241                else => {
242                    log::debug!("Handler shutting down: channels closed");
243                    break;
244                }
245            }
246
247            if self.signal.load(Ordering::Relaxed) {
248                log::debug!("Handler received stop signal");
249                break;
250            }
251        }
252    }
253
254    /// Processes a raw WebSocket message.
255    async fn process_raw_message(&mut self, msg: Message) -> Vec<NautilusWsMessage> {
256        match msg {
257            Message::Text(txt) => {
258                if txt == RECONNECTED {
259                    self.clear_state();
260                    if let Err(e) = self.replay_subscriptions().await {
261                        log::error!("Failed to replay subscriptions after reconnect: {e}");
262                    }
263                    return vec![NautilusWsMessage::Reconnected];
264                }
265
266                // Hot path: zero-copy parse for feed messages (orderbook/trades/candles)
267                match serde_json::from_str::<DydxWsFeedMessage>(&txt) {
268                    Ok(feed_msg) => {
269                        return self.handle_feed_message(feed_msg);
270                    }
271                    Err(e) => {
272                        // Log subaccounts channel failures at warn level for diagnosis
273                        if txt.contains("v4_subaccounts") {
274                            log::warn!(
275                                "[WS_DESER] Failed to parse v4_subaccounts as DydxWsFeedMessage: {e}\nRaw: {txt}"
276                            );
277                        }
278                    }
279                }
280
281                // Cold path: infrequent control messages (connected/subscribed/error)
282                match serde_json::from_str::<serde_json::Value>(&txt) {
283                    Ok(val) => match serde_json::from_value::<DydxWsGenericMsg>(val.clone()) {
284                        Ok(meta) => {
285                            let result = if meta.is_connected() {
286                                serde_json::from_value::<DydxWsConnectedMsg>(val)
287                                    .map(DydxWsMessage::Connected)
288                            } else if meta.is_subscribed() {
289                                log::debug!("Processing subscribed message via fallback path");
290                                if let Ok(sub_msg) =
291                                    serde_json::from_value::<DydxWsSubscriptionMsg>(val.clone())
292                                {
293                                    if sub_msg.channel == DydxWsChannel::Subaccounts {
294                                        log::debug!("Parsing subaccounts subscription (fallback)");
295                                        serde_json::from_value::<DydxWsSubaccountsSubscribed>(val)
296                                            .map(DydxWsMessage::SubaccountsSubscribed)
297                                            .or_else(|e| {
298                                                log::warn!(
299                                                    "Failed to parse subaccounts subscription: {e}"
300                                                );
301                                                Ok(DydxWsMessage::Subscribed(sub_msg))
302                                            })
303                                    } else {
304                                        Ok(DydxWsMessage::Subscribed(sub_msg))
305                                    }
306                                } else {
307                                    serde_json::from_value::<DydxWsSubscriptionMsg>(val)
308                                        .map(DydxWsMessage::Subscribed)
309                                }
310                            } else if meta.is_unsubscribed() {
311                                serde_json::from_value::<DydxWsSubscriptionMsg>(val)
312                                    .map(DydxWsMessage::Unsubscribed)
313                            } else if meta.is_error() {
314                                serde_json::from_value::<DydxWebSocketError>(val)
315                                    .map(DydxWsMessage::Error)
316                            } else if meta.is_unknown() {
317                                log::warn!("Received unknown WebSocket message type: {txt}",);
318                                Ok(DydxWsMessage::Raw(val))
319                            } else {
320                                Ok(DydxWsMessage::Raw(val))
321                            };
322
323                            match result {
324                                Ok(dydx_msg) => self.handle_dydx_message(dydx_msg).await,
325                                Err(e) => {
326                                    log::error!(
327                                        "Failed to parse WebSocket message: {e}. Message type: {:?}, Channel: {:?}. Raw: {txt}",
328                                        meta.msg_type,
329                                        meta.channel,
330                                    );
331                                    vec![]
332                                }
333                            }
334                        }
335                        Err(e) => {
336                            log::error!(
337                                "Failed to parse WebSocket message envelope (DydxWsGenericMsg): {e}\nRaw JSON:\n{txt}"
338                            );
339                            vec![]
340                        }
341                    },
342                    Err(e) => {
343                        let err = DydxWebSocketError::from_message(e.to_string());
344                        vec![NautilusWsMessage::Error(err)]
345                    }
346                }
347            }
348            Message::Pong(_data) => vec![],
349            Message::Ping(_data) => vec![], // Handled by lower layers
350            Message::Binary(_bin) => vec![], // dYdX uses text frames
351            Message::Close(_frame) => {
352                log::info!("WebSocket close frame received");
353                vec![]
354            }
355            Message::Frame(_) => vec![],
356        }
357    }
358
359    /// Handles a parsed dYdX WebSocket message.
360    async fn handle_dydx_message(&mut self, msg: DydxWsMessage) -> Vec<NautilusWsMessage> {
361        match self.handle_message(msg).await {
362            Ok(msgs) => msgs,
363            Err(e) => {
364                log::error!("Error handling message: {e}");
365                vec![]
366            }
367        }
368    }
369
370    /// Dispatches feed messages directly to typed handlers.
371    fn handle_feed_message(&mut self, feed_msg: DydxWsFeedMessage) -> Vec<NautilusWsMessage> {
372        log::trace!(
373            "Handling feed message: {:?}",
374            std::mem::discriminant(&feed_msg)
375        );
376        match feed_msg {
377            DydxWsFeedMessage::Subaccounts(msg) => self.handle_subaccounts(msg),
378            DydxWsFeedMessage::Orderbook(msg) => self.handle_orderbook(msg),
379            DydxWsFeedMessage::Trades(msg) => self.handle_trades(msg),
380            DydxWsFeedMessage::Markets(msg) => self.handle_markets_feed(msg),
381            DydxWsFeedMessage::Candles(msg) => self.handle_candles_feed(msg),
382            DydxWsFeedMessage::ParentSubaccounts(msg) => self.handle_parent_subaccounts(msg),
383            DydxWsFeedMessage::BlockHeight(msg) => self.handle_block_height_feed(msg),
384        }
385    }
386
387    /// Handles subaccounts channel messages.
388    fn handle_subaccounts(&self, msg: DydxWsSubaccountsMessage) -> Vec<NautilusWsMessage> {
389        match msg {
390            DydxWsSubaccountsMessage::Subscribed(data) => {
391                let topic =
392                    self.topic_from_msg(&DydxWsChannel::Subaccounts, &Some(data.id.clone()));
393                self.subscriptions.confirm_subscribe(&topic);
394                self.process_subaccounts_subscribed(&data)
395            }
396            DydxWsSubaccountsMessage::ChannelData(data) => {
397                self.process_subaccounts_channel_data(data)
398            }
399            DydxWsSubaccountsMessage::Unsubscribed(data) => {
400                let topic = self.topic_from_msg(&DydxWsChannel::Subaccounts, &data.id);
401                self.subscriptions.confirm_unsubscribe(&topic);
402                vec![]
403            }
404        }
405    }
406
407    /// Handles orderbook channel messages.
408    fn handle_orderbook(&mut self, msg: DydxWsOrderbookMessage) -> Vec<NautilusWsMessage> {
409        match msg {
410            DydxWsOrderbookMessage::Subscribed(data) => {
411                let topic = self.topic_from_msg(&DydxWsChannel::Orderbook, &data.id);
412                self.subscriptions.confirm_subscribe(&topic);
413                // Reset sequence tracking on snapshot
414                if let Some(id) = &data.id {
415                    self.book_sequence.insert(id.clone(), data.message_id);
416                }
417                self.parse_orderbook_from_data(&data, true)
418            }
419            DydxWsOrderbookMessage::ChannelData(data) => {
420                if let Some(id) = &data.id {
421                    if let Some(last_id) = self.book_sequence.get(id)
422                        && data.message_id <= *last_id
423                    {
424                        log::warn!(
425                            "Orderbook sequence regression for {id}: last {last_id}, received {}",
426                            data.message_id
427                        );
428                    }
429                    self.book_sequence.insert(id.clone(), data.message_id);
430                }
431                self.parse_orderbook_from_data(&data, false)
432            }
433            DydxWsOrderbookMessage::ChannelBatchData(data) => {
434                if let Some(id) = &data.id {
435                    if let Some(last_id) = self.book_sequence.get(id)
436                        && data.message_id <= *last_id
437                    {
438                        log::warn!(
439                            "Orderbook batch sequence regression for {id}: last {last_id}, received {}",
440                            data.message_id
441                        );
442                    }
443                    self.book_sequence.insert(id.clone(), data.message_id);
444                }
445                self.parse_orderbook_batch_from_data(&data)
446            }
447            DydxWsOrderbookMessage::Unsubscribed(data) => {
448                let topic = self.topic_from_msg(&DydxWsChannel::Orderbook, &data.id);
449                self.subscriptions.confirm_unsubscribe(&topic);
450                if let Some(id) = &data.id {
451                    self.book_sequence.remove(id);
452                }
453                vec![]
454            }
455        }
456    }
457
458    /// Handles trades channel messages.
459    fn handle_trades(&self, msg: DydxWsTradesMessage) -> Vec<NautilusWsMessage> {
460        match msg {
461            DydxWsTradesMessage::Subscribed(data) => {
462                let topic = self.topic_from_msg(&DydxWsChannel::Trades, &data.id);
463                self.subscriptions.confirm_subscribe(&topic);
464                self.parse_trades_from_data(&data)
465            }
466            DydxWsTradesMessage::ChannelData(data) => self.parse_trades_from_data(&data),
467            DydxWsTradesMessage::Unsubscribed(data) => {
468                let topic = self.topic_from_msg(&DydxWsChannel::Trades, &data.id);
469                self.subscriptions.confirm_unsubscribe(&topic);
470                vec![]
471            }
472        }
473    }
474
475    /// Handles markets channel messages.
476    fn handle_markets_feed(&self, msg: DydxWsMarketsMessage) -> Vec<NautilusWsMessage> {
477        match msg {
478            DydxWsMarketsMessage::Subscribed(data) => {
479                let topic = self.topic_from_msg(&DydxWsChannel::Markets, &data.id);
480                self.subscriptions.confirm_subscribe(&topic);
481                self.parse_markets_from_data(&data)
482            }
483            DydxWsMarketsMessage::ChannelData(data) => self.parse_markets_from_data(&data),
484            DydxWsMarketsMessage::Unsubscribed(data) => {
485                let topic = self.topic_from_msg(&DydxWsChannel::Markets, &data.id);
486                self.subscriptions.confirm_unsubscribe(&topic);
487                vec![]
488            }
489        }
490    }
491
492    /// Handles candles channel messages.
493    ///
494    /// Subscribed contents is `{"candles": [...]}` (array wrapper), while
495    /// channel_data contents is a single candle object.
496    fn handle_candles_feed(&mut self, msg: DydxWsCandlesMessage) -> Vec<NautilusWsMessage> {
497        match msg {
498            DydxWsCandlesMessage::Subscribed(data) => {
499                let topic = self.topic_from_msg(&DydxWsChannel::Candles, &data.id);
500                self.subscriptions.confirm_subscribe(&topic);
501                vec![]
502            }
503            DydxWsCandlesMessage::ChannelData(data) => self.parse_candles_from_data(&data),
504            DydxWsCandlesMessage::Unsubscribed(data) => {
505                let topic = self.topic_from_msg(&DydxWsChannel::Candles, &data.id);
506                self.subscriptions.confirm_unsubscribe(&topic);
507                vec![]
508            }
509        }
510    }
511
512    /// Handles parent subaccounts channel messages.
513    fn handle_parent_subaccounts(
514        &self,
515        msg: DydxWsParentSubaccountsMessage,
516    ) -> Vec<NautilusWsMessage> {
517        match msg {
518            DydxWsParentSubaccountsMessage::Subscribed(data) => {
519                let topic = self.topic_from_msg(&DydxWsChannel::ParentSubaccounts, &data.id);
520                self.subscriptions.confirm_subscribe(&topic);
521                self.parse_parent_subaccounts_from_data(&data)
522            }
523            DydxWsParentSubaccountsMessage::ChannelData(data) => {
524                self.parse_parent_subaccounts_from_data(&data)
525            }
526            DydxWsParentSubaccountsMessage::Unsubscribed(data) => {
527                let topic = self.topic_from_msg(&DydxWsChannel::ParentSubaccounts, &data.id);
528                self.subscriptions.confirm_unsubscribe(&topic);
529                vec![]
530            }
531        }
532    }
533
534    /// Handles block height channel messages.
535    fn handle_block_height_feed(&self, msg: DydxWsBlockHeightMessage) -> Vec<NautilusWsMessage> {
536        match msg {
537            DydxWsBlockHeightMessage::Subscribed(data) => {
538                let topic =
539                    self.topic_from_msg(&DydxWsChannel::BlockHeight, &Some(data.id.clone()));
540                self.subscriptions.confirm_subscribe(&topic);
541                match data.contents.height.parse::<u64>() {
542                    Ok(height) => vec![NautilusWsMessage::BlockHeight {
543                        height,
544                        time: data.contents.time,
545                    }],
546                    Err(e) => {
547                        log::warn!("Failed to parse block height from subscription: {e}");
548                        vec![]
549                    }
550                }
551            }
552            DydxWsBlockHeightMessage::ChannelData(data) => {
553                match data.contents.block_height.parse::<u64>() {
554                    Ok(height) => vec![NautilusWsMessage::BlockHeight {
555                        height,
556                        time: data.contents.time,
557                    }],
558                    Err(e) => {
559                        log::warn!("Failed to parse block height from channel data: {e}");
560                        vec![]
561                    }
562                }
563            }
564            DydxWsBlockHeightMessage::Unsubscribed(data) => {
565                let topic = self.topic_from_msg(&DydxWsChannel::BlockHeight, &data.id);
566                self.subscriptions.confirm_unsubscribe(&topic);
567                vec![]
568            }
569        }
570    }
571
572    /// Processes subaccounts subscribed message.
573    fn process_subaccounts_subscribed(
574        &self,
575        msg: &DydxWsSubaccountsSubscribed,
576    ) -> Vec<NautilusWsMessage> {
577        log::debug!("Forwarding subaccount subscription to execution client");
578        vec![NautilusWsMessage::SubaccountSubscribed(Box::new(
579            msg.clone(),
580        ))]
581    }
582
583    /// Processes subaccounts channel data directly.
584    fn process_subaccounts_channel_data(
585        &self,
586        data: DydxWsSubaccountsChannelData,
587    ) -> Vec<NautilusWsMessage> {
588        let has_orders = data.contents.orders.as_ref().is_some_and(|o| !o.is_empty());
589        let has_fills = data.contents.fills.as_ref().is_some_and(|f| !f.is_empty());
590
591        if has_orders || has_fills {
592            log::debug!(
593                "Received {} order(s), {} fill(s) - forwarding to execution client",
594                data.contents.orders.as_ref().map_or(0, |o| o.len()),
595                data.contents.fills.as_ref().map_or(0, |f| f.len())
596            );
597            vec![NautilusWsMessage::SubaccountsChannelData(Box::new(data))]
598        } else {
599            vec![]
600        }
601    }
602
603    /// Parses trades from channel data message.
604    fn parse_trades_from_data(&self, data: &DydxWsChannelDataMsg) -> Vec<NautilusWsMessage> {
605        match self.parse_trades(data) {
606            Ok(msgs) => msgs,
607            Err(e) => {
608                log::error!("Error parsing trades: {e}");
609                vec![]
610            }
611        }
612    }
613
614    /// Parses orderbook from channel data message.
615    fn parse_orderbook_from_data(
616        &mut self,
617        data: &DydxWsChannelDataMsg,
618        is_snapshot: bool,
619    ) -> Vec<NautilusWsMessage> {
620        match self.parse_orderbook(data, is_snapshot) {
621            Ok(msgs) => msgs,
622            Err(e) => {
623                log::error!("Error parsing orderbook: {e}");
624                vec![]
625            }
626        }
627    }
628
629    /// Parses orderbook batch from batch data message.
630    fn parse_orderbook_batch_from_data(
631        &mut self,
632        data: &DydxWsChannelBatchDataMsg,
633    ) -> Vec<NautilusWsMessage> {
634        match self.parse_orderbook_batch(data) {
635            Ok(msgs) => msgs,
636            Err(e) => {
637                log::error!("Error parsing orderbook batch: {e}");
638                vec![]
639            }
640        }
641    }
642
643    /// Parses markets from channel data message.
644    fn parse_markets_from_data(&self, data: &DydxWsChannelDataMsg) -> Vec<NautilusWsMessage> {
645        match self.parse_markets(data) {
646            Ok(msgs) => msgs,
647            Err(e) => {
648                log::error!("Error parsing markets: {e}");
649                vec![]
650            }
651        }
652    }
653
654    /// Parses candles from channel data message (single candle object).
655    fn parse_candles_from_data(&mut self, data: &DydxWsChannelDataMsg) -> Vec<NautilusWsMessage> {
656        match self.parse_candles(data) {
657            Ok(msgs) => msgs,
658            Err(e) => {
659                log::error!("Error parsing candles: {e}");
660                vec![]
661            }
662        }
663    }
664
665    /// Parses parent subaccounts from channel data message.
666    fn parse_parent_subaccounts_from_data(
667        &self,
668        data: &DydxWsChannelDataMsg,
669    ) -> Vec<NautilusWsMessage> {
670        match self.parse_subaccounts(data) {
671            Ok(msgs) => msgs,
672            Err(e) => {
673                log::error!("Error parsing parent subaccounts: {e}");
674                vec![]
675            }
676        }
677    }
678
679    /// Handles a command to update the internal state.
680    async fn handle_command(&mut self, command: HandlerCommand) {
681        match command {
682            HandlerCommand::UpdateInstrument(instrument) => {
683                let symbol = instrument.id().symbol.inner();
684                self.instruments.insert(symbol, *instrument);
685            }
686            HandlerCommand::InitializeInstruments(instruments) => {
687                log::debug!(
688                    "Initializing {} instruments in WebSocket handler",
689                    instruments.len()
690                );
691                for instrument in instruments {
692                    let symbol = instrument.id().symbol.inner();
693                    self.instruments.insert(symbol, instrument);
694                }
695                log::debug!(
696                    "Handler now has {} instruments cached",
697                    self.instruments.len()
698                );
699            }
700            HandlerCommand::RegisterBarType { topic, bar_type } => {
701                self.bar_types.insert(topic, bar_type);
702            }
703            HandlerCommand::UnregisterBarType { topic } => {
704                self.bar_types.remove(&topic);
705            }
706            HandlerCommand::RegisterSubscription {
707                topic,
708                subscription,
709            } => {
710                self.subscription_messages.insert(topic, subscription);
711            }
712            HandlerCommand::UnregisterSubscription { topic } => {
713                self.subscription_messages.remove(&topic);
714            }
715            HandlerCommand::SendText(text) => {
716                if let Err(e) = self
717                    .send_with_retry(text, Some(DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
718                    .await
719                {
720                    log::error!("Failed to send WebSocket text after retries: {e}");
721                }
722            }
723        }
724    }
725
726    /// Registers a bar type for a specific topic (e.g., "BTC-USD/1MIN").
727    pub fn register_bar_type(&mut self, topic: String, bar_type: BarType) {
728        self.bar_types.insert(topic, bar_type);
729    }
730
731    /// Unregisters a bar type for a specific topic.
732    pub fn unregister_bar_type(&mut self, topic: &str) {
733        self.bar_types.remove(topic);
734    }
735
736    fn topic_from_msg(&self, channel: &DydxWsChannel, id: &Option<String>) -> String {
737        match id {
738            Some(id) => format!(
739                "{}{}{}",
740                channel.as_ref(),
741                self.subscriptions.delimiter(),
742                id
743            ),
744            None => channel.as_ref().to_string(),
745        }
746    }
747
748    /// Clears transient state on reconnect.
749    ///
750    /// Preserves `instruments`, `bar_types`, and `subscription_messages` which are
751    /// needed for replay and parsing after reconnect.
752    fn clear_state(&mut self) {
753        let buffer_count = self.message_buffer.len();
754        let seq_count = self.book_sequence.len();
755        let bars_count = self.pending_bars.len();
756        self.message_buffer.clear();
757        self.book_sequence.clear();
758        self.pending_bars.clear();
759        log::debug!(
760            "Cleared reconnect state: message_buffer={buffer_count}, \
761             book_sequence={seq_count}, pending_bars={bars_count}"
762        );
763    }
764
765    async fn replay_subscriptions(&self) -> DydxWsResult<()> {
766        let topics = self.subscriptions.all_topics();
767        for topic in topics {
768            let Some(subscription) = self.subscription_messages.get(&topic).cloned() else {
769                log::warn!("No preserved subscription message for topic: {topic}");
770                continue;
771            };
772
773            let payload = serde_json::to_string(&subscription)?;
774            self.subscriptions.mark_subscribe(&topic);
775
776            if let Err(e) = self
777                .send_with_retry(payload, Some(DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
778                .await
779            {
780                self.subscriptions.mark_failure(&topic);
781                return Err(e);
782            }
783        }
784
785        Ok(())
786    }
787
788    /// Handles control messages from the fallback parsing path.
789    ///
790    /// Channel data is handled directly via `handle_feed_message()`.
791    ///
792    /// # Errors
793    ///
794    /// Returns an error if the message cannot be processed.
795    #[allow(clippy::result_large_err)]
796    pub async fn handle_message(
797        &mut self,
798        msg: DydxWsMessage,
799    ) -> DydxWsResult<Vec<NautilusWsMessage>> {
800        match msg {
801            DydxWsMessage::Connected(_) => {
802                log::info!("dYdX WebSocket connected");
803                Ok(vec![])
804            }
805            DydxWsMessage::Subscribed(sub) => {
806                log::debug!("Subscribed to {} (id: {:?})", sub.channel, sub.id);
807                let topic = self.topic_from_msg(&sub.channel, &sub.id);
808                self.subscriptions.confirm_subscribe(&topic);
809                Ok(vec![])
810            }
811            DydxWsMessage::SubaccountsSubscribed(msg) => {
812                log::debug!("Subaccounts subscribed with initial state (fallback path)");
813                let topic = self.topic_from_msg(&DydxWsChannel::Subaccounts, &Some(msg.id.clone()));
814                self.subscriptions.confirm_subscribe(&topic);
815                Ok(self.process_subaccounts_subscribed(&msg))
816            }
817            DydxWsMessage::Unsubscribed(unsub) => {
818                log::debug!("Unsubscribed from {} (id: {:?})", unsub.channel, unsub.id);
819                let topic = self.topic_from_msg(&unsub.channel, &unsub.id);
820                self.subscriptions.confirm_unsubscribe(&topic);
821                Ok(vec![])
822            }
823            DydxWsMessage::Error(err) => Ok(vec![NautilusWsMessage::Error(err)]),
824            DydxWsMessage::Reconnected => {
825                self.clear_state();
826                if let Err(e) = self.replay_subscriptions().await {
827                    log::error!("Failed to replay subscriptions after reconnect message: {e}");
828                }
829                Ok(vec![NautilusWsMessage::Reconnected])
830            }
831            DydxWsMessage::Pong => Ok(vec![]),
832            DydxWsMessage::Raw(_) => Ok(vec![]),
833        }
834    }
835
836    fn parse_trades(&self, data: &DydxWsChannelDataMsg) -> DydxWsResult<Vec<NautilusWsMessage>> {
837        let symbol = data
838            .id
839            .as_ref()
840            .ok_or_else(|| DydxWsError::Parse("Missing id for trades channel".into()))?;
841
842        let instrument_id = self.parse_instrument_id(symbol)?;
843        let instrument = self.get_instrument(&instrument_id)?;
844
845        let contents: DydxTradeContents = serde_json::from_value(data.contents.clone())
846            .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade contents: {e}")))?;
847
848        let ts_init = get_atomic_clock_realtime().get_time_ns();
849        let ticks = ws_parse::parse_trade_ticks(instrument_id, instrument, &contents, ts_init)?;
850
851        if ticks.is_empty() {
852            Ok(vec![])
853        } else {
854            Ok(vec![NautilusWsMessage::Data(ticks)])
855        }
856    }
857
858    fn parse_orderbook(
859        &mut self,
860        data: &DydxWsChannelDataMsg,
861        is_snapshot: bool,
862    ) -> DydxWsResult<Vec<NautilusWsMessage>> {
863        let symbol = data
864            .id
865            .as_ref()
866            .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook channel".into()))?;
867
868        let instrument_id = self.parse_instrument_id(symbol)?;
869        let instrument = self.get_instrument(&instrument_id)?;
870        let price_prec = instrument.price_precision();
871        let size_prec = instrument.size_precision();
872
873        let ts_init = get_atomic_clock_realtime().get_time_ns();
874
875        if is_snapshot {
876            let contents: DydxOrderbookSnapshotContents =
877                serde_json::from_value(data.contents.clone()).map_err(|e| {
878                    DydxWsError::Parse(format!("Failed to parse orderbook snapshot: {e}"))
879                })?;
880
881            let deltas = ws_parse::parse_orderbook_snapshot(
882                &instrument_id,
883                &contents,
884                price_prec,
885                size_prec,
886                ts_init,
887            )?;
888
889            Ok(vec![NautilusWsMessage::Deltas(Box::new(deltas))])
890        } else {
891            let contents: DydxOrderbookContents = serde_json::from_value(data.contents.clone())
892                .map_err(|e| {
893                    DydxWsError::Parse(format!("Failed to parse orderbook contents: {e}"))
894                })?;
895
896            let deltas = ws_parse::parse_orderbook_deltas(
897                &instrument_id,
898                &contents,
899                price_prec,
900                size_prec,
901                ts_init,
902            )?;
903
904            Ok(vec![NautilusWsMessage::Deltas(Box::new(deltas))])
905        }
906    }
907
908    fn parse_orderbook_batch(
909        &mut self,
910        data: &DydxWsChannelBatchDataMsg,
911    ) -> DydxWsResult<Vec<NautilusWsMessage>> {
912        let symbol = data
913            .id
914            .as_ref()
915            .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook batch channel".into()))?;
916
917        let instrument_id = self.parse_instrument_id(symbol)?;
918        let instrument = self.get_instrument(&instrument_id)?;
919        let price_prec = instrument.price_precision();
920        let size_prec = instrument.size_precision();
921
922        let contents: Vec<DydxOrderbookContents> = serde_json::from_value(data.contents.clone())
923            .map_err(|e| DydxWsError::Parse(format!("Failed to parse orderbook batch: {e}")))?;
924
925        let ts_init = get_atomic_clock_realtime().get_time_ns();
926        let mut all_deltas = Vec::new();
927
928        let num_messages = contents.len();
929        for (idx, content) in contents.iter().enumerate() {
930            let is_last_message = idx == num_messages - 1;
931            let deltas = ws_parse::parse_orderbook_deltas_with_flag(
932                &instrument_id,
933                content,
934                price_prec,
935                size_prec,
936                ts_init,
937                is_last_message,
938            )?;
939            all_deltas.extend(deltas);
940        }
941
942        let deltas = OrderBookDeltas::new(instrument_id, all_deltas);
943        Ok(vec![NautilusWsMessage::Deltas(Box::new(deltas))])
944    }
945
946    fn parse_candles(
947        &mut self,
948        data: &DydxWsChannelDataMsg,
949    ) -> DydxWsResult<Vec<NautilusWsMessage>> {
950        let topic = data
951            .id
952            .as_ref()
953            .ok_or_else(|| DydxWsError::Parse("Missing id for candles channel".into()))?;
954
955        let bar_type = *self.bar_types.get(topic).ok_or_else(|| {
956            DydxWsError::Parse(format!("No bar type registered for topic: {topic}"))
957        })?;
958
959        let candle: DydxCandle = serde_json::from_value(data.contents.clone())
960            .map_err(|e| DydxWsError::Parse(format!("Failed to parse candle contents: {e}")))?;
961
962        let instrument_id = self.parse_instrument_id(&candle.ticker)?;
963        let instrument = self.get_instrument(&instrument_id)?;
964
965        let ts_init = get_atomic_clock_realtime().get_time_ns();
966        let bar = ws_parse::parse_candle_bar(
967            bar_type,
968            instrument,
969            &candle,
970            self.bars_timestamp_on_close,
971            ts_init,
972        )?;
973
974        // Emit-on-next: only emit a bar when a new candle period arrives,
975        // confirming the previous bar is closed
976        let topic_key = topic.clone();
977        if let Some(pending) = self.pending_bars.get(&topic_key) {
978            if pending.ts_event != bar.ts_event {
979                // New candle period - emit the previous (now closed) bar
980                let closed_bar = *pending;
981                self.pending_bars.insert(topic_key, bar);
982                return Ok(vec![NautilusWsMessage::Data(vec![Data::Bar(closed_bar)])]);
983            }
984            // Same candle period - update pending with latest values
985            self.pending_bars.insert(topic_key, bar);
986            return Ok(vec![]);
987        }
988
989        // No pending bar yet - store as pending, emit nothing
990        self.pending_bars.insert(topic_key, bar);
991        Ok(vec![])
992    }
993
994    fn parse_markets(&self, data: &DydxWsChannelDataMsg) -> DydxWsResult<Vec<NautilusWsMessage>> {
995        let contents: DydxMarketsContents = serde_json::from_value(data.contents.clone())
996            .map_err(|e| DydxWsError::Parse(format!("Failed to parse markets contents: {e}")))?;
997
998        let mut messages = Vec::new();
999        let ts_init = get_atomic_clock_realtime().get_time_ns();
1000
1001        // Parse oracle prices → MarkPriceUpdate + IndexPriceUpdate
1002        if let Some(oracle_prices) = &contents.oracle_prices {
1003            for (symbol_str, oracle_market) in oracle_prices {
1004                let Ok(instrument_id) = self.parse_instrument_id(symbol_str) else {
1005                    continue;
1006                };
1007                let Some(instrument) = self.instruments.get(&instrument_id.symbol.inner()) else {
1008                    continue;
1009                };
1010                let Ok(oracle_price_dec) = oracle_market.oracle_price.parse::<Decimal>() else {
1011                    log::error!("Failed to parse oracle price: market={symbol_str}");
1012                    continue;
1013                };
1014                let Ok(price) =
1015                    Price::from_decimal_dp(oracle_price_dec, instrument.price_precision())
1016                else {
1017                    log::error!("Failed to create Price: market={symbol_str}");
1018                    continue;
1019                };
1020
1021                messages.push(NautilusWsMessage::MarkPrice(MarkPriceUpdate::new(
1022                    instrument_id,
1023                    price,
1024                    ts_init,
1025                    ts_init,
1026                )));
1027                messages.push(NautilusWsMessage::IndexPrice(IndexPriceUpdate::new(
1028                    instrument_id,
1029                    price,
1030                    ts_init,
1031                    ts_init,
1032                )));
1033            }
1034        }
1035
1036        // Parse trading data → FundingRateUpdate (and detect new instruments)
1037        if let Some(trading) = &contents.trading {
1038            for (symbol_str, trading_data) in trading {
1039                let Ok(instrument_id) = self.parse_instrument_id(symbol_str) else {
1040                    continue;
1041                };
1042
1043                // Check if this is a new instrument not in our cache
1044                if !self.instruments.contains_key(&instrument_id.symbol.inner()) {
1045                    log::info!("New instrument discovered via WebSocket: {symbol_str}");
1046                    messages.push(NautilusWsMessage::NewInstrumentDiscovered {
1047                        ticker: symbol_str.clone(),
1048                    });
1049                    continue;
1050                }
1051
1052                // Existing instrument - emit funding rate if available
1053                let Some(rate_str) = &trading_data.next_funding_rate else {
1054                    continue;
1055                };
1056                let Ok(rate) = rate_str.parse::<Decimal>() else {
1057                    log::error!(
1058                        "Failed to parse funding rate: market={symbol_str}, rate={rate_str}"
1059                    );
1060                    continue;
1061                };
1062
1063                messages.push(NautilusWsMessage::FundingRate(FundingRateUpdate::new(
1064                    instrument_id,
1065                    rate,
1066                    None,
1067                    ts_init,
1068                    ts_init,
1069                )));
1070            }
1071        }
1072
1073        Ok(messages)
1074    }
1075
1076    fn parse_subaccounts(
1077        &self,
1078        data: &DydxWsChannelDataMsg,
1079    ) -> DydxWsResult<Vec<NautilusWsMessage>> {
1080        log::debug!(
1081            "Parsing subaccounts channel data (msg_type={:?})",
1082            data.msg_type
1083        );
1084        let contents: DydxWsSubaccountsChannelContents =
1085            serde_json::from_value(data.contents.clone()).map_err(|e| {
1086                DydxWsError::Parse(format!("Failed to parse subaccounts contents: {e}"))
1087            })?;
1088
1089        let has_orders = contents.orders.as_ref().is_some_and(|o| !o.is_empty());
1090        let has_fills = contents.fills.as_ref().is_some_and(|f| !f.is_empty());
1091
1092        if has_orders || has_fills {
1093            // Forward raw channel data to execution client for parsing
1094            // The execution client has the clob_pair_id and instrument mappings needed
1095            log::debug!(
1096                "Received {} order(s), {} fill(s) - forwarding to execution client",
1097                contents.orders.as_ref().map_or(0, |o| o.len()),
1098                contents.fills.as_ref().map_or(0, |f| f.len())
1099            );
1100
1101            let channel_data = DydxWsSubaccountsChannelData {
1102                connection_id: data.connection_id.clone(),
1103                message_id: data.message_id,
1104                id: data.id.clone().unwrap_or_default(),
1105                version: data.version.clone().unwrap_or_default(),
1106                contents,
1107            };
1108
1109            return Ok(vec![NautilusWsMessage::SubaccountsChannelData(Box::new(
1110                channel_data,
1111            ))]);
1112        }
1113
1114        Ok(vec![])
1115    }
1116
1117    fn parse_instrument_id(&self, symbol: &str) -> DydxWsResult<InstrumentId> {
1118        // dYdX WS uses raw symbols (e.g., "BTC-USD")
1119        // Need to append "-PERP" to match Nautilus instrument IDs
1120        let symbol_with_perp = format!("{symbol}-PERP");
1121        Ok(parse_instrument_id(&symbol_with_perp))
1122    }
1123
1124    fn get_instrument(&self, instrument_id: &InstrumentId) -> DydxWsResult<&InstrumentAny> {
1125        self.instruments
1126            .get(&instrument_id.symbol.inner())
1127            .ok_or_else(|| DydxWsError::Parse(format!("No instrument cached for {instrument_id}")))
1128    }
1129}
1130
1131/// Determines if a dYdX WebSocket error should trigger a retry.
1132fn should_retry_dydx_error(error: &DydxWsError) -> bool {
1133    match error {
1134        DydxWsError::Transport(_) => true,
1135        DydxWsError::Send(_) => true,
1136        DydxWsError::ClientError(msg) => {
1137            let msg_lower = msg.to_lowercase();
1138            msg_lower.contains("timeout")
1139                || msg_lower.contains("timed out")
1140                || msg_lower.contains("connection")
1141                || msg_lower.contains("network")
1142        }
1143        DydxWsError::NotConnected
1144        | DydxWsError::Json(_)
1145        | DydxWsError::Parse(_)
1146        | DydxWsError::Authentication(_)
1147        | DydxWsError::Subscription(_)
1148        | DydxWsError::Venue(_) => false,
1149    }
1150}
1151
1152/// Creates a timeout error for the retry manager.
1153fn create_dydx_timeout_error(msg: String) -> DydxWsError {
1154    DydxWsError::ClientError(msg)
1155}