nautilus_dydx/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Message handler for dYdX WebSocket streams.
17//!
18//! This module processes incoming WebSocket messages and converts them into
19//! Nautilus domain objects.
20//!
21//! The handler owns the WebSocketClient exclusively and runs in a dedicated
22//! Tokio task within the lock-free I/O boundary.
23
24use std::{
25    fmt::{Debug, Formatter},
26    str::FromStr,
27    sync::{
28        Arc,
29        atomic::{AtomicBool, Ordering},
30    },
31};
32
33use ahash::AHashMap;
34use nautilus_core::{nanos::UnixNanos, time::get_atomic_clock_realtime};
35use nautilus_model::{
36    data::{
37        Bar, BarType, BookOrder, Data, OrderBookDelta, OrderBookDeltas, TradeTick,
38        bar::get_bar_interval_ns,
39    },
40    enums::{AggressorSide, BookAction, OrderSide, RecordFlag},
41    identifiers::{AccountId, InstrumentId, TradeId},
42    instruments::{Instrument, InstrumentAny},
43    types::{Price, Quantity},
44};
45use nautilus_network::{
46    RECONNECTED,
47    retry::{RetryManager, create_websocket_retry_manager},
48    websocket::{SubscriptionState, WebSocketClient},
49};
50use rust_decimal::Decimal;
51use tokio_tungstenite::tungstenite::Message;
52use ustr::Ustr;
53
54use super::{
55    DydxWsError, DydxWsResult,
56    client::DYDX_RATE_LIMIT_KEY_SUBSCRIPTION,
57    enums::{DydxWsChannel, DydxWsMessage, NautilusWsMessage},
58    error::DydxWebSocketError,
59    messages::{
60        DydxCandle, DydxMarketsContents, DydxOrderbookContents, DydxOrderbookSnapshotContents,
61        DydxTradeContents, DydxWsChannelBatchDataMsg, DydxWsChannelDataMsg, DydxWsConnectedMsg,
62        DydxWsGenericMsg, DydxWsSubaccountsChannelContents, DydxWsSubaccountsChannelData,
63        DydxWsSubaccountsSubscribed, DydxWsSubscriptionMsg,
64    },
65};
66use crate::common::parse::parse_instrument_id;
67
68/// Commands sent to the feed handler.
69#[derive(Debug, Clone)]
70pub enum HandlerCommand {
71    /// Update a single instrument in the cache.
72    UpdateInstrument(Box<InstrumentAny>),
73    /// Initialize instruments in bulk.
74    InitializeInstruments(Vec<InstrumentAny>),
75    /// Register a bar type for candle subscriptions.
76    RegisterBarType { topic: String, bar_type: BarType },
77    /// Unregister a bar type for candle subscriptions.
78    UnregisterBarType { topic: String },
79    /// Send a text message via WebSocket.
80    SendText(String),
81}
82
83/// Processes incoming WebSocket messages and converts them to Nautilus domain objects.
84///
85/// The handler owns the WebSocketClient exclusively within the lock-free I/O boundary,
86/// eliminating RwLock contention on the hot path.
87pub struct FeedHandler {
88    /// Account ID for parsing account-specific messages.
89    account_id: Option<AccountId>,
90    /// Command receiver from outer client.
91    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
92    /// Output sender for Nautilus messages.
93    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
94    /// Raw WebSocket message receiver.
95    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
96    /// Owned WebSocket client (no RwLock).
97    client: WebSocketClient,
98    /// Manual disconnect signal.
99    signal: Arc<AtomicBool>,
100    /// Retry manager for WebSocket send operations.
101    retry_manager: RetryManager<DydxWsError>,
102    /// Cached instruments for parsing market data.
103    instruments: AHashMap<Ustr, InstrumentAny>,
104    /// Cached bar types by topic (e.g., "BTC-USD/1MIN").
105    bar_types: AHashMap<String, BarType>,
106    /// Subscription state shared with the outer client for replay/acks.
107    subscriptions: SubscriptionState,
108}
109
110impl Debug for FeedHandler {
111    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
112        f.debug_struct("FeedHandler")
113            .field("account_id", &self.account_id)
114            .field("instruments_count", &self.instruments.len())
115            .field("bar_types_count", &self.bar_types.len())
116            .finish_non_exhaustive()
117    }
118}
119
120impl FeedHandler {
121    /// Creates a new [`FeedHandler`].
122    #[must_use]
123    pub fn new(
124        account_id: Option<AccountId>,
125        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
126        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
127        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
128        client: WebSocketClient,
129        signal: Arc<AtomicBool>,
130        subscriptions: SubscriptionState,
131    ) -> Self {
132        Self {
133            account_id,
134            cmd_rx,
135            out_tx,
136            raw_rx,
137            client,
138            signal,
139            retry_manager: create_websocket_retry_manager(),
140            instruments: AHashMap::new(),
141            bar_types: AHashMap::new(),
142            subscriptions,
143        }
144    }
145
146    /// Sends a WebSocket message with retry logic.
147    ///
148    /// Uses the configured [`RetryManager`] to handle transient failures.
149    async fn send_with_retry(
150        &self,
151        payload: String,
152        rate_limit_keys: Option<Vec<String>>,
153    ) -> Result<(), DydxWsError> {
154        self.retry_manager
155            .execute_with_retry(
156                "websocket_send",
157                || {
158                    let payload = payload.clone();
159                    let keys = rate_limit_keys.clone();
160                    async move {
161                        self.client
162                            .send_text(payload, keys)
163                            .await
164                            .map_err(|e| DydxWsError::ClientError(format!("Send failed: {e}")))
165                    }
166                },
167                should_retry_dydx_error,
168                create_dydx_timeout_error,
169            )
170            .await
171    }
172
173    /// Main processing loop for the handler.
174    pub async fn run(&mut self) {
175        loop {
176            tokio::select! {
177                // Process commands from outer client
178                Some(cmd) = self.cmd_rx.recv() => {
179                    self.handle_command(cmd).await;
180                }
181
182                // Process raw WebSocket messages
183                Some(msg) = self.raw_rx.recv() => {
184                    if let Some(nautilus_msg) = self.process_raw_message(msg).await
185                        && self.out_tx.send(nautilus_msg).is_err()
186                    {
187                        tracing::debug!("Receiver dropped, stopping handler");
188                        break;
189                    }
190                }
191
192                else => {
193                    tracing::debug!("Handler shutting down: channels closed");
194                    break;
195                }
196            }
197
198            // Check for stop signal
199            if self.signal.load(Ordering::Relaxed) {
200                tracing::debug!("Handler received stop signal");
201                break;
202            }
203        }
204    }
205
206    /// Processes a raw WebSocket message.
207    async fn process_raw_message(&self, msg: Message) -> Option<NautilusWsMessage> {
208        match msg {
209            Message::Text(txt) => {
210                if txt == RECONNECTED {
211                    if let Err(e) = self.replay_subscriptions().await {
212                        tracing::error!("Failed to replay subscriptions after reconnect: {e}");
213                    }
214                    return Some(NautilusWsMessage::Reconnected);
215                }
216
217                match serde_json::from_str::<serde_json::Value>(&txt) {
218                    Ok(val) => {
219                        // Attempt to classify message using generic envelope
220                        match serde_json::from_value::<DydxWsGenericMsg>(val.clone()) {
221                            Ok(meta) => {
222                                let result = if meta.is_connected() {
223                                    serde_json::from_value::<DydxWsConnectedMsg>(val)
224                                        .map(DydxWsMessage::Connected)
225                                } else if meta.is_subscribed() {
226                                    // Check if this is a subaccounts subscription with initial state
227                                    if let Ok(sub_msg) =
228                                        serde_json::from_value::<DydxWsSubscriptionMsg>(val.clone())
229                                    {
230                                        if sub_msg.channel == DydxWsChannel::Subaccounts {
231                                            // Parse as subaccounts-specific subscription message
232                                            serde_json::from_value::<DydxWsSubaccountsSubscribed>(
233                                                val,
234                                            )
235                                            .map(DydxWsMessage::SubaccountsSubscribed)
236                                        } else {
237                                            Ok(DydxWsMessage::Subscribed(sub_msg))
238                                        }
239                                    } else {
240                                        serde_json::from_value::<DydxWsSubscriptionMsg>(val)
241                                            .map(DydxWsMessage::Subscribed)
242                                    }
243                                } else if meta.is_unsubscribed() {
244                                    serde_json::from_value::<DydxWsSubscriptionMsg>(val)
245                                        .map(DydxWsMessage::Unsubscribed)
246                                } else if meta.is_channel_data() {
247                                    serde_json::from_value::<DydxWsChannelDataMsg>(val)
248                                        .map(DydxWsMessage::ChannelData)
249                                } else if meta.is_channel_batch_data() {
250                                    serde_json::from_value::<DydxWsChannelBatchDataMsg>(val)
251                                        .map(DydxWsMessage::ChannelBatchData)
252                                } else if meta.is_error() {
253                                    serde_json::from_value::<DydxWebSocketError>(val)
254                                        .map(DydxWsMessage::Error)
255                                } else if meta.is_unknown() {
256                                    tracing::debug!("Received unknown WebSocket message type");
257                                    Ok(DydxWsMessage::Raw(val))
258                                } else {
259                                    Ok(DydxWsMessage::Raw(val))
260                                };
261
262                                match result {
263                                    Ok(dydx_msg) => self.handle_dydx_message(dydx_msg).await,
264                                    Err(e) => {
265                                        tracing::warn!("Failed to parse WebSocket message: {e}");
266                                        None
267                                    }
268                                }
269                            }
270                            Err(_) => {
271                                // Fallback to raw if generic parse fails
272                                None
273                            }
274                        }
275                    }
276                    Err(e) => {
277                        let err = DydxWebSocketError::from_message(e.to_string());
278                        Some(NautilusWsMessage::Error(err))
279                    }
280                }
281            }
282            Message::Pong(_data) => None,
283            Message::Ping(_data) => None,  // Handled by lower layers
284            Message::Binary(_bin) => None, // dYdX uses text frames
285            Message::Close(_frame) => {
286                tracing::info!("WebSocket close frame received");
287                None
288            }
289            Message::Frame(_) => None,
290        }
291    }
292
293    /// Handles a parsed dYdX WebSocket message.
294    async fn handle_dydx_message(&self, msg: DydxWsMessage) -> Option<NautilusWsMessage> {
295        match self.handle_message(msg).await {
296            Ok(opt_msg) => opt_msg,
297            Err(e) => {
298                tracing::error!("Error handling message: {e}");
299                None
300            }
301        }
302    }
303
304    /// Handles a command to update the internal state.
305    async fn handle_command(&mut self, command: HandlerCommand) {
306        match command {
307            HandlerCommand::UpdateInstrument(instrument) => {
308                let symbol = instrument.id().symbol.inner();
309                self.instruments.insert(symbol, *instrument);
310            }
311            HandlerCommand::InitializeInstruments(instruments) => {
312                for instrument in instruments {
313                    let symbol = instrument.id().symbol.inner();
314                    self.instruments.insert(symbol, instrument);
315                }
316            }
317            HandlerCommand::RegisterBarType { topic, bar_type } => {
318                self.bar_types.insert(topic, bar_type);
319            }
320            HandlerCommand::UnregisterBarType { topic } => {
321                self.bar_types.remove(&topic);
322            }
323            HandlerCommand::SendText(text) => {
324                if let Err(e) = self
325                    .send_with_retry(
326                        text,
327                        Some(vec![DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
328                    )
329                    .await
330                {
331                    tracing::error!("Failed to send WebSocket text after retries: {e}");
332                }
333            }
334        }
335    }
336
337    /// Registers a bar type for a specific topic (e.g., "BTC-USD/1MIN").
338    pub fn register_bar_type(&mut self, topic: String, bar_type: BarType) {
339        self.bar_types.insert(topic, bar_type);
340    }
341
342    /// Unregisters a bar type for a specific topic.
343    pub fn unregister_bar_type(&mut self, topic: &str) {
344        self.bar_types.remove(topic);
345    }
346
347    fn topic_from_msg(&self, channel: &super::enums::DydxWsChannel, id: &Option<String>) -> String {
348        match id {
349            Some(id) => format!(
350                "{}{}{}",
351                channel.as_ref(),
352                self.subscriptions.delimiter(),
353                id
354            ),
355            None => channel.as_ref().to_string(),
356        }
357    }
358
359    fn subscription_from_topic(
360        &self,
361        topic: &str,
362        op: super::enums::DydxWsOperation,
363    ) -> Option<super::messages::DydxSubscription> {
364        let (channel, symbol) = nautilus_network::websocket::subscription::split_topic(
365            topic,
366            self.subscriptions.delimiter(),
367        );
368        let channel = super::enums::DydxWsChannel::from_str(channel).ok()?;
369        let id = symbol.map(std::string::ToString::to_string);
370
371        Some(super::messages::DydxSubscription { op, channel, id })
372    }
373
374    async fn replay_subscriptions(&self) -> DydxWsResult<()> {
375        let topics = self.subscriptions.all_topics();
376        for topic in topics {
377            let Some(subscription) =
378                self.subscription_from_topic(&topic, super::enums::DydxWsOperation::Subscribe)
379            else {
380                tracing::warn!("Failed to reconstruct subscription from topic: {topic}");
381                continue;
382            };
383
384            let payload = serde_json::to_string(&subscription)?;
385            self.subscriptions.mark_subscribe(&topic);
386
387            if let Err(e) = self
388                .send_with_retry(
389                    payload,
390                    Some(vec![DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
391                )
392                .await
393            {
394                self.subscriptions.mark_failure(&topic);
395                return Err(e);
396            }
397        }
398
399        Ok(())
400    }
401
402    /// Processes a WebSocket message and converts it to Nautilus domain objects.
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if message parsing fails.
407    #[allow(clippy::result_large_err)]
408    pub async fn handle_message(
409        &self,
410        msg: DydxWsMessage,
411    ) -> DydxWsResult<Option<NautilusWsMessage>> {
412        match msg {
413            DydxWsMessage::Connected(_) => {
414                tracing::info!("dYdX WebSocket connected");
415                Ok(None)
416            }
417            DydxWsMessage::Subscribed(sub) => {
418                tracing::debug!("Subscribed to {} (id: {:?})", sub.channel, sub.id);
419                let topic = self.topic_from_msg(&sub.channel, &sub.id);
420                self.subscriptions.confirm_subscribe(&topic);
421                Ok(None)
422            }
423            DydxWsMessage::SubaccountsSubscribed(msg) => {
424                tracing::debug!("Subaccounts subscribed with initial state");
425                let topic = self.topic_from_msg(&msg.channel, &Some(msg.id.clone()));
426                self.subscriptions.confirm_subscribe(&topic);
427                self.parse_subaccounts_subscribed(&msg)
428            }
429            DydxWsMessage::Unsubscribed(unsub) => {
430                tracing::debug!("Unsubscribed from {} (id: {:?})", unsub.channel, unsub.id);
431                let topic = self.topic_from_msg(&unsub.channel, &unsub.id);
432                self.subscriptions.confirm_unsubscribe(&topic);
433                Ok(None)
434            }
435            DydxWsMessage::ChannelData(data) => self.handle_channel_data(data),
436            DydxWsMessage::ChannelBatchData(data) => self.handle_channel_batch_data(data),
437            DydxWsMessage::Error(err) => Ok(Some(NautilusWsMessage::Error(err))),
438            DydxWsMessage::Reconnected => {
439                if let Err(e) = self.replay_subscriptions().await {
440                    tracing::error!("Failed to replay subscriptions after reconnect message: {e}");
441                }
442                Ok(Some(NautilusWsMessage::Reconnected))
443            }
444            DydxWsMessage::Pong => Ok(None),
445            DydxWsMessage::Raw(_) => Ok(None),
446        }
447    }
448
449    fn handle_channel_data(
450        &self,
451        data: DydxWsChannelDataMsg,
452    ) -> DydxWsResult<Option<NautilusWsMessage>> {
453        match data.channel {
454            DydxWsChannel::Trades => self.parse_trades(&data),
455            DydxWsChannel::Orderbook => self.parse_orderbook(&data, false),
456            DydxWsChannel::Candles => self.parse_candles(&data),
457            DydxWsChannel::Markets => self.parse_markets(&data),
458            DydxWsChannel::Subaccounts | DydxWsChannel::ParentSubaccounts => {
459                self.parse_subaccounts(&data)
460            }
461            DydxWsChannel::BlockHeight => {
462                tracing::debug!("Block height update received");
463                Ok(None)
464            }
465            DydxWsChannel::Unknown => {
466                tracing::debug!("Unknown channel data received");
467                Ok(None)
468            }
469        }
470    }
471
472    fn handle_channel_batch_data(
473        &self,
474        data: DydxWsChannelBatchDataMsg,
475    ) -> DydxWsResult<Option<NautilusWsMessage>> {
476        match data.channel {
477            DydxWsChannel::Orderbook => self.parse_orderbook_batch(&data),
478            _ => {
479                tracing::warn!("Unexpected batch data for channel: {:?}", data.channel);
480                Ok(None)
481            }
482        }
483    }
484
485    fn parse_trades(&self, data: &DydxWsChannelDataMsg) -> DydxWsResult<Option<NautilusWsMessage>> {
486        let symbol = data
487            .id
488            .as_ref()
489            .ok_or_else(|| DydxWsError::Parse("Missing id for trades channel".into()))?;
490
491        let instrument_id = self.parse_instrument_id(symbol)?;
492        let instrument = self.get_instrument(&instrument_id)?;
493
494        let contents: DydxTradeContents = serde_json::from_value(data.contents.clone())
495            .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade contents: {e}")))?;
496
497        let mut ticks = Vec::new();
498        let ts_init = get_atomic_clock_realtime().get_time_ns();
499
500        for trade in contents.trades {
501            let aggressor_side = match trade.side {
502                OrderSide::Buy => AggressorSide::Buyer,
503                OrderSide::Sell => AggressorSide::Seller,
504                _ => continue, // Skip NoOrderSide
505            };
506
507            let price = Decimal::from_str(&trade.price)
508                .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade price: {e}")))?;
509
510            let size = Decimal::from_str(&trade.size)
511                .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade size: {e}")))?;
512
513            let trade_ts = trade.created_at.timestamp_nanos_opt().ok_or_else(|| {
514                DydxWsError::Parse(format!("Timestamp out of range for trade {}", trade.id))
515            })?;
516
517            let tick = TradeTick::new(
518                instrument_id,
519                Price::from_decimal_dp(price, instrument.price_precision()).map_err(|e| {
520                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
521                })?,
522                Quantity::from_decimal_dp(size, instrument.size_precision()).map_err(|e| {
523                    DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
524                })?,
525                aggressor_side,
526                TradeId::new(&trade.id),
527                UnixNanos::from(trade_ts as u64),
528                ts_init,
529            );
530            ticks.push(Data::Trade(tick));
531        }
532
533        if ticks.is_empty() {
534            Ok(None)
535        } else {
536            Ok(Some(NautilusWsMessage::Data(ticks)))
537        }
538    }
539
540    fn parse_orderbook(
541        &self,
542        data: &DydxWsChannelDataMsg,
543        is_snapshot: bool,
544    ) -> DydxWsResult<Option<NautilusWsMessage>> {
545        let symbol = data
546            .id
547            .as_ref()
548            .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook channel".into()))?;
549
550        let instrument_id = self.parse_instrument_id(symbol)?;
551        let instrument = self.get_instrument(&instrument_id)?;
552
553        let ts_init = get_atomic_clock_realtime().get_time_ns();
554
555        if is_snapshot {
556            let contents: DydxOrderbookSnapshotContents =
557                serde_json::from_value(data.contents.clone()).map_err(|e| {
558                    DydxWsError::Parse(format!("Failed to parse orderbook snapshot: {e}"))
559                })?;
560
561            let deltas = self.parse_orderbook_snapshot(
562                &instrument_id,
563                &contents,
564                instrument.price_precision(),
565                instrument.size_precision(),
566                ts_init,
567            )?;
568
569            Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
570        } else {
571            let contents: DydxOrderbookContents = serde_json::from_value(data.contents.clone())
572                .map_err(|e| {
573                    DydxWsError::Parse(format!("Failed to parse orderbook contents: {e}"))
574                })?;
575
576            let deltas = self.parse_orderbook_deltas(
577                &instrument_id,
578                &contents,
579                instrument.price_precision(),
580                instrument.size_precision(),
581                ts_init,
582            )?;
583
584            Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
585        }
586    }
587
588    fn parse_orderbook_batch(
589        &self,
590        data: &DydxWsChannelBatchDataMsg,
591    ) -> DydxWsResult<Option<NautilusWsMessage>> {
592        let symbol = data
593            .id
594            .as_ref()
595            .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook batch channel".into()))?;
596
597        let instrument_id = self.parse_instrument_id(symbol)?;
598        let instrument = self.get_instrument(&instrument_id)?;
599
600        let contents: Vec<DydxOrderbookContents> = serde_json::from_value(data.contents.clone())
601            .map_err(|e| DydxWsError::Parse(format!("Failed to parse orderbook batch: {e}")))?;
602
603        let ts_init = get_atomic_clock_realtime().get_time_ns();
604        let mut all_deltas = Vec::new();
605
606        let num_messages = contents.len();
607        for (idx, content) in contents.iter().enumerate() {
608            let is_last_message = idx == num_messages - 1;
609            let deltas = self.parse_orderbook_deltas_with_flag(
610                &instrument_id,
611                content,
612                instrument.price_precision(),
613                instrument.size_precision(),
614                ts_init,
615                is_last_message,
616            )?;
617            all_deltas.extend(deltas);
618        }
619
620        let deltas = OrderBookDeltas::new(instrument_id, all_deltas);
621        Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
622    }
623
624    fn parse_orderbook_snapshot(
625        &self,
626        instrument_id: &InstrumentId,
627        contents: &DydxOrderbookSnapshotContents,
628        price_precision: u8,
629        size_precision: u8,
630        ts_init: UnixNanos,
631    ) -> DydxWsResult<OrderBookDeltas> {
632        let mut deltas = Vec::new();
633
634        // Add clear delta first
635        deltas.push(OrderBookDelta::clear(*instrument_id, 0, ts_init, ts_init));
636
637        let bids = contents.bids.as_deref().unwrap_or(&[]);
638        let asks = contents.asks.as_deref().unwrap_or(&[]);
639
640        let bids_len = bids.len();
641        let asks_len = asks.len();
642
643        for (idx, bid) in bids.iter().enumerate() {
644            let is_last = idx == bids_len - 1 && asks_len == 0;
645            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
646
647            let price = Decimal::from_str(&bid.price)
648                .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
649
650            let size = Decimal::from_str(&bid.size)
651                .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
652
653            let order = BookOrder::new(
654                OrderSide::Buy,
655                Price::from_decimal_dp(price, price_precision).map_err(|e| {
656                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
657                })?,
658                Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
659                    DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
660                })?,
661                0,
662            );
663
664            deltas.push(OrderBookDelta::new(
665                *instrument_id,
666                BookAction::Add,
667                order,
668                flags,
669                0,
670                ts_init,
671                ts_init,
672            ));
673        }
674
675        for (idx, ask) in asks.iter().enumerate() {
676            let is_last = idx == asks_len - 1;
677            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
678
679            let price = Decimal::from_str(&ask.price)
680                .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
681
682            let size = Decimal::from_str(&ask.size)
683                .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
684
685            let order = BookOrder::new(
686                OrderSide::Sell,
687                Price::from_decimal_dp(price, price_precision).map_err(|e| {
688                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
689                })?,
690                Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
691                    DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
692                })?,
693                0,
694            );
695
696            deltas.push(OrderBookDelta::new(
697                *instrument_id,
698                BookAction::Add,
699                order,
700                flags,
701                0,
702                ts_init,
703                ts_init,
704            ));
705        }
706
707        Ok(OrderBookDeltas::new(*instrument_id, deltas))
708    }
709
710    fn parse_orderbook_deltas(
711        &self,
712        instrument_id: &InstrumentId,
713        contents: &DydxOrderbookContents,
714        price_precision: u8,
715        size_precision: u8,
716        ts_init: UnixNanos,
717    ) -> DydxWsResult<OrderBookDeltas> {
718        let deltas = self.parse_orderbook_deltas_with_flag(
719            instrument_id,
720            contents,
721            price_precision,
722            size_precision,
723            ts_init,
724            true, // Mark as last message by default
725        )?;
726        Ok(OrderBookDeltas::new(*instrument_id, deltas))
727    }
728
729    #[allow(clippy::too_many_arguments)]
730    fn parse_orderbook_deltas_with_flag(
731        &self,
732        instrument_id: &InstrumentId,
733        contents: &DydxOrderbookContents,
734        price_precision: u8,
735        size_precision: u8,
736        ts_init: UnixNanos,
737        is_last_message: bool,
738    ) -> DydxWsResult<Vec<OrderBookDelta>> {
739        let mut deltas = Vec::new();
740
741        let bids = contents.bids.as_deref().unwrap_or(&[]);
742        let asks = contents.asks.as_deref().unwrap_or(&[]);
743
744        let bids_len = bids.len();
745        let asks_len = asks.len();
746
747        for (idx, (price_str, size_str)) in bids.iter().enumerate() {
748            let is_last = is_last_message && idx == bids_len - 1 && asks_len == 0;
749            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
750
751            let price = Decimal::from_str(price_str)
752                .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
753
754            let size = Decimal::from_str(size_str)
755                .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
756
757            let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
758                DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
759            })?;
760            let action = if qty.is_zero() {
761                BookAction::Delete
762            } else {
763                BookAction::Update
764            };
765
766            let order = BookOrder::new(
767                OrderSide::Buy,
768                Price::from_decimal_dp(price, price_precision).map_err(|e| {
769                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
770                })?,
771                qty,
772                0,
773            );
774
775            deltas.push(OrderBookDelta::new(
776                *instrument_id,
777                action,
778                order,
779                flags,
780                0,
781                ts_init,
782                ts_init,
783            ));
784        }
785
786        for (idx, (price_str, size_str)) in asks.iter().enumerate() {
787            let is_last = is_last_message && idx == asks_len - 1;
788            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
789
790            let price = Decimal::from_str(price_str)
791                .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
792
793            let size = Decimal::from_str(size_str)
794                .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
795
796            let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
797                DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
798            })?;
799            let action = if qty.is_zero() {
800                BookAction::Delete
801            } else {
802                BookAction::Update
803            };
804
805            let order = BookOrder::new(
806                OrderSide::Sell,
807                Price::from_decimal_dp(price, price_precision).map_err(|e| {
808                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
809                })?,
810                qty,
811                0,
812            );
813
814            deltas.push(OrderBookDelta::new(
815                *instrument_id,
816                action,
817                order,
818                flags,
819                0,
820                ts_init,
821                ts_init,
822            ));
823        }
824
825        Ok(deltas)
826    }
827
828    fn parse_candles(
829        &self,
830        data: &DydxWsChannelDataMsg,
831    ) -> DydxWsResult<Option<NautilusWsMessage>> {
832        let topic = data
833            .id
834            .as_ref()
835            .ok_or_else(|| DydxWsError::Parse("Missing id for candles channel".into()))?;
836
837        let bar_type = self.bar_types.get(topic).ok_or_else(|| {
838            DydxWsError::Parse(format!("No bar type registered for topic: {topic}"))
839        })?;
840
841        let candle: DydxCandle = serde_json::from_value(data.contents.clone())
842            .map_err(|e| DydxWsError::Parse(format!("Failed to parse candle contents: {e}")))?;
843
844        let instrument_id = self.parse_instrument_id(&candle.ticker)?;
845        let instrument = self.get_instrument(&instrument_id)?;
846
847        let open = Decimal::from_str(&candle.open)
848            .map_err(|e| DydxWsError::Parse(format!("Failed to parse open: {e}")))?;
849        let high = Decimal::from_str(&candle.high)
850            .map_err(|e| DydxWsError::Parse(format!("Failed to parse high: {e}")))?;
851        let low = Decimal::from_str(&candle.low)
852            .map_err(|e| DydxWsError::Parse(format!("Failed to parse low: {e}")))?;
853        let close = Decimal::from_str(&candle.close)
854            .map_err(|e| DydxWsError::Parse(format!("Failed to parse close: {e}")))?;
855        let volume = Decimal::from_str(&candle.base_token_volume)
856            .map_err(|e| DydxWsError::Parse(format!("Failed to parse volume: {e}")))?;
857
858        let ts_init = get_atomic_clock_realtime().get_time_ns();
859
860        // Calculate ts_event: startedAt + interval
861        let started_at_nanos = candle.started_at.timestamp_nanos_opt().ok_or_else(|| {
862            DydxWsError::Parse(format!(
863                "Timestamp out of range for candle at {}",
864                candle.started_at
865            ))
866        })?;
867        let interval_nanos = get_bar_interval_ns(bar_type);
868        let ts_event = UnixNanos::from(started_at_nanos as u64) + interval_nanos;
869
870        let bar = Bar::new(
871            *bar_type,
872            Price::from_decimal_dp(open, instrument.price_precision()).map_err(|e| {
873                DydxWsError::Parse(format!("Failed to create open Price from decimal: {e}"))
874            })?,
875            Price::from_decimal_dp(high, instrument.price_precision()).map_err(|e| {
876                DydxWsError::Parse(format!("Failed to create high Price from decimal: {e}"))
877            })?,
878            Price::from_decimal_dp(low, instrument.price_precision()).map_err(|e| {
879                DydxWsError::Parse(format!("Failed to create low Price from decimal: {e}"))
880            })?,
881            Price::from_decimal_dp(close, instrument.price_precision()).map_err(|e| {
882                DydxWsError::Parse(format!("Failed to create close Price from decimal: {e}"))
883            })?,
884            Quantity::from_decimal_dp(volume, instrument.size_precision()).map_err(|e| {
885                DydxWsError::Parse(format!(
886                    "Failed to create volume Quantity from decimal: {e}"
887                ))
888            })?,
889            ts_event,
890            ts_init,
891        );
892
893        Ok(Some(NautilusWsMessage::Data(vec![Data::Bar(bar)])))
894    }
895
896    fn parse_markets(
897        &self,
898        data: &DydxWsChannelDataMsg,
899    ) -> DydxWsResult<Option<NautilusWsMessage>> {
900        let contents: DydxMarketsContents = serde_json::from_value(data.contents.clone())
901            .map_err(|e| DydxWsError::Parse(format!("Failed to parse markets contents: {e}")))?;
902
903        // Markets channel provides oracle price updates needed for margin calculations
904        // Forward to execution client to update oracle_prices map
905        if let Some(oracle_prices) = contents.oracle_prices {
906            tracing::debug!(
907                "Forwarding oracle price updates for {} markets to execution client",
908                oracle_prices.len()
909            );
910            return Ok(Some(NautilusWsMessage::OraclePrices(oracle_prices)));
911        }
912
913        Ok(None)
914    }
915
916    fn parse_subaccounts(
917        &self,
918        data: &DydxWsChannelDataMsg,
919    ) -> DydxWsResult<Option<NautilusWsMessage>> {
920        let contents: DydxWsSubaccountsChannelContents =
921            serde_json::from_value(data.contents.clone()).map_err(|e| {
922                DydxWsError::Parse(format!("Failed to parse subaccounts contents: {e}"))
923            })?;
924
925        // Check if we have any orders or fills
926        let has_orders = contents.orders.as_ref().is_some_and(|o| !o.is_empty());
927        let has_fills = contents.fills.as_ref().is_some_and(|f| !f.is_empty());
928
929        if has_orders || has_fills {
930            // Forward raw channel data to execution client for parsing
931            // The execution client has the clob_pair_id and instrument mappings needed
932            tracing::debug!(
933                "Received {} order(s), {} fill(s) - forwarding to execution client",
934                contents.orders.as_ref().map_or(0, |o| o.len()),
935                contents.fills.as_ref().map_or(0, |f| f.len())
936            );
937
938            let channel_data = DydxWsSubaccountsChannelData {
939                msg_type: data.msg_type,
940                connection_id: data.connection_id.clone(),
941                message_id: data.message_id,
942                id: data.id.clone().unwrap_or_default(),
943                channel: data.channel,
944                version: data.version.clone().unwrap_or_default(),
945                contents,
946            };
947
948            return Ok(Some(NautilusWsMessage::SubaccountsChannelData(Box::new(
949                channel_data,
950            ))));
951        }
952
953        Ok(None)
954    }
955
956    fn parse_subaccounts_subscribed(
957        &self,
958        msg: &DydxWsSubaccountsSubscribed,
959    ) -> DydxWsResult<Option<NautilusWsMessage>> {
960        // Pass raw subaccount subscription to execution client for parsing
961        // The execution client has access to instruments and oracle prices needed for margin calculations
962        tracing::debug!("Forwarding subaccount subscription to execution client");
963        Ok(Some(NautilusWsMessage::SubaccountSubscribed(Box::new(
964            msg.clone(),
965        ))))
966    }
967
968    fn parse_instrument_id(&self, symbol: &str) -> DydxWsResult<InstrumentId> {
969        // dYdX WS uses raw symbols (e.g., "BTC-USD")
970        // Need to append "-PERP" to match Nautilus instrument IDs
971        let symbol_with_perp = format!("{symbol}-PERP");
972        Ok(parse_instrument_id(&symbol_with_perp))
973    }
974
975    fn get_instrument(&self, instrument_id: &InstrumentId) -> DydxWsResult<&InstrumentAny> {
976        self.instruments
977            .get(&instrument_id.symbol.inner())
978            .ok_or_else(|| DydxWsError::Parse(format!("No instrument cached for {instrument_id}")))
979    }
980}
981
982/// Determines if a dYdX WebSocket error should trigger a retry.
983fn should_retry_dydx_error(error: &DydxWsError) -> bool {
984    match error {
985        DydxWsError::Transport(_) => true,
986        DydxWsError::Send(_) => true,
987        DydxWsError::ClientError(msg) => {
988            let msg_lower = msg.to_lowercase();
989            msg_lower.contains("timeout")
990                || msg_lower.contains("timed out")
991                || msg_lower.contains("connection")
992                || msg_lower.contains("network")
993        }
994        DydxWsError::NotConnected
995        | DydxWsError::Json(_)
996        | DydxWsError::Parse(_)
997        | DydxWsError::Authentication(_)
998        | DydxWsError::Subscription(_)
999        | DydxWsError::Venue(_) => false,
1000    }
1001}
1002
1003/// Creates a timeout error for the retry manager.
1004fn create_dydx_timeout_error(msg: String) -> DydxWsError {
1005    DydxWsError::ClientError(msg)
1006}