nautilus_dydx/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Message handler for dYdX WebSocket streams.
17//!
18//! This module processes incoming WebSocket messages and converts them into
19//! Nautilus domain objects.
20//!
21//! The handler owns the WebSocketClient exclusively and runs in a dedicated
22//! Tokio task within the lock-free I/O boundary.
23
24use std::{
25    fmt::{Debug, Formatter},
26    str::FromStr,
27    sync::{
28        Arc,
29        atomic::{AtomicBool, Ordering},
30    },
31};
32
33use ahash::AHashMap;
34use nautilus_core::nanos::UnixNanos;
35use nautilus_model::{
36    data::{
37        Bar, BarType, BookOrder, Data, OrderBookDelta, OrderBookDeltas, TradeTick,
38        bar::get_bar_interval_ns,
39    },
40    enums::{AggressorSide, BookAction, OrderSide, RecordFlag},
41    identifiers::{AccountId, InstrumentId, TradeId},
42    instruments::{Instrument, InstrumentAny},
43    types::{Price, Quantity},
44};
45use nautilus_network::{RECONNECTED, websocket::WebSocketClient};
46use rust_decimal::Decimal;
47use tokio_tungstenite::tungstenite::Message;
48use ustr::Ustr;
49
50use super::{
51    DydxWsError, DydxWsResult,
52    enums::DydxWsChannel,
53    messages::{
54        DydxWsChannelBatchDataMsg, DydxWsChannelDataMsg, DydxWsConnectedMsg, DydxWsGenericMsg,
55        DydxWsMessage, DydxWsSubscriptionMsg, NautilusWsMessage,
56    },
57    types::{
58        DydxCandle, DydxMarketsContents, DydxOrderbookContents, DydxOrderbookSnapshotContents,
59        DydxTradeContents,
60    },
61};
62
63/// Commands sent to the feed handler.
64#[derive(Debug, Clone)]
65pub enum HandlerCommand {
66    /// Update a single instrument in the cache.
67    UpdateInstrument(Box<InstrumentAny>),
68    /// Initialize instruments in bulk.
69    InitializeInstruments(Vec<InstrumentAny>),
70    /// Register a bar type for candle subscriptions.
71    RegisterBarType { topic: String, bar_type: BarType },
72    /// Unregister a bar type for candle subscriptions.
73    UnregisterBarType { topic: String },
74    /// Send a text message via WebSocket.
75    SendText(String),
76}
77
78/// Processes incoming WebSocket messages and converts them to Nautilus domain objects.
79///
80/// The handler owns the WebSocketClient exclusively within the lock-free I/O boundary,
81/// eliminating RwLock contention on the hot path.
82pub struct FeedHandler {
83    /// Account ID for parsing account-specific messages.
84    account_id: Option<AccountId>,
85    /// Command receiver from outer client.
86    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
87    /// Output sender for Nautilus messages.
88    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
89    /// Raw WebSocket message receiver.
90    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
91    /// Owned WebSocket client (no RwLock).
92    client: WebSocketClient,
93    /// Manual disconnect signal.
94    signal: Arc<AtomicBool>,
95    /// Cached instruments for parsing market data.
96    instruments: AHashMap<Ustr, InstrumentAny>,
97    /// Cached bar types by topic (e.g., "BTC-USD/1MIN").
98    bar_types: AHashMap<String, BarType>,
99}
100
101impl Debug for FeedHandler {
102    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
103        f.debug_struct("FeedHandler")
104            .field("account_id", &self.account_id)
105            .field("instruments_count", &self.instruments.len())
106            .field("bar_types_count", &self.bar_types.len())
107            .finish_non_exhaustive()
108    }
109}
110
111impl FeedHandler {
112    /// Creates a new [`FeedHandler`].
113    #[must_use]
114    pub fn new(
115        account_id: Option<AccountId>,
116        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
117        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
118        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
119        client: WebSocketClient,
120        signal: Arc<AtomicBool>,
121    ) -> Self {
122        Self {
123            account_id,
124            cmd_rx,
125            out_tx,
126            raw_rx,
127            client,
128            signal,
129            instruments: AHashMap::new(),
130            bar_types: AHashMap::new(),
131        }
132    }
133
134    /// Main processing loop for the handler.
135    pub async fn run(&mut self) {
136        loop {
137            tokio::select! {
138                // Process commands from outer client
139                Some(cmd) = self.cmd_rx.recv() => {
140                    self.handle_command(cmd).await;
141                }
142
143                // Process raw WebSocket messages
144                Some(msg) = self.raw_rx.recv() => {
145                    if let Some(nautilus_msg) = self.process_raw_message(msg).await
146                        && self.out_tx.send(nautilus_msg).is_err()
147                    {
148                        tracing::debug!("Receiver dropped, stopping handler");
149                        break;
150                    }
151                }
152
153                else => {
154                    tracing::debug!("Handler shutting down: channels closed");
155                    break;
156                }
157            }
158
159            // Check for stop signal
160            if self.signal.load(Ordering::Relaxed) {
161                tracing::debug!("Handler received stop signal");
162                break;
163            }
164        }
165    }
166
167    /// Processes a raw WebSocket message.
168    async fn process_raw_message(&self, msg: Message) -> Option<NautilusWsMessage> {
169        match msg {
170            Message::Text(txt) => {
171                if txt == RECONNECTED {
172                    return Some(NautilusWsMessage::Reconnected);
173                }
174
175                match serde_json::from_str::<serde_json::Value>(&txt) {
176                    Ok(val) => {
177                        // Attempt to classify message using generic envelope
178                        match serde_json::from_value::<DydxWsGenericMsg>(val.clone()) {
179                            Ok(meta) => {
180                                let result = if meta.is_connected() {
181                                    serde_json::from_value::<DydxWsConnectedMsg>(val)
182                                        .map(DydxWsMessage::Connected)
183                                } else if meta.is_subscribed() {
184                                    // Check if this is a subaccounts subscription with initial state
185                                    if let Ok(sub_msg) =
186                                        serde_json::from_value::<DydxWsSubscriptionMsg>(val.clone())
187                                    {
188                                        if sub_msg.channel == DydxWsChannel::Subaccounts {
189                                            // Parse as subaccounts-specific subscription message
190                                            serde_json::from_value::<
191                                                crate::schemas::ws::DydxWsSubaccountsSubscribed,
192                                            >(val)
193                                            .map(DydxWsMessage::SubaccountsSubscribed)
194                                        } else {
195                                            Ok(DydxWsMessage::Subscribed(sub_msg))
196                                        }
197                                    } else {
198                                        serde_json::from_value::<DydxWsSubscriptionMsg>(val)
199                                            .map(DydxWsMessage::Subscribed)
200                                    }
201                                } else if meta.is_unsubscribed() {
202                                    serde_json::from_value::<DydxWsSubscriptionMsg>(val)
203                                        .map(DydxWsMessage::Unsubscribed)
204                                } else if meta.is_channel_data() {
205                                    serde_json::from_value::<DydxWsChannelDataMsg>(val)
206                                        .map(DydxWsMessage::ChannelData)
207                                } else if meta.is_channel_batch_data() {
208                                    serde_json::from_value::<DydxWsChannelBatchDataMsg>(val)
209                                        .map(DydxWsMessage::ChannelBatchData)
210                                } else if meta.is_error() {
211                                    serde_json::from_value::<
212                                        crate::websocket::error::DydxWebSocketError,
213                                    >(val)
214                                    .map(DydxWsMessage::Error)
215                                } else {
216                                    Ok(DydxWsMessage::Raw(val))
217                                };
218
219                                match result {
220                                    Ok(dydx_msg) => self.handle_dydx_message(dydx_msg),
221                                    Err(e) => {
222                                        let err = crate::websocket::error::DydxWebSocketError::from_message(
223                                            e.to_string(),
224                                        );
225                                        Some(NautilusWsMessage::Error(err))
226                                    }
227                                }
228                            }
229                            Err(_) => {
230                                // Fallback to raw if generic parse fails
231                                None
232                            }
233                        }
234                    }
235                    Err(e) => {
236                        let err = crate::websocket::error::DydxWebSocketError::from_message(
237                            e.to_string(),
238                        );
239                        Some(NautilusWsMessage::Error(err))
240                    }
241                }
242            }
243            Message::Pong(_data) => None,
244            Message::Ping(_data) => None,  // Handled by lower layers
245            Message::Binary(_bin) => None, // dYdX uses text frames
246            Message::Close(_frame) => {
247                tracing::info!("WebSocket close frame received");
248                None
249            }
250            Message::Frame(_) => None,
251        }
252    }
253
254    /// Handles a parsed dYdX WebSocket message.
255    fn handle_dydx_message(&self, msg: DydxWsMessage) -> Option<NautilusWsMessage> {
256        match self.handle_message(msg) {
257            Ok(opt_msg) => opt_msg,
258            Err(e) => {
259                tracing::error!("Error handling message: {e}");
260                None
261            }
262        }
263    }
264
265    /// Handles a command to update the internal state.
266    async fn handle_command(&mut self, command: HandlerCommand) {
267        match command {
268            HandlerCommand::UpdateInstrument(instrument) => {
269                let symbol = instrument.id().symbol.inner();
270                self.instruments.insert(symbol, *instrument);
271            }
272            HandlerCommand::InitializeInstruments(instruments) => {
273                for instrument in instruments {
274                    let symbol = instrument.id().symbol.inner();
275                    self.instruments.insert(symbol, instrument);
276                }
277            }
278            HandlerCommand::RegisterBarType { topic, bar_type } => {
279                self.bar_types.insert(topic, bar_type);
280            }
281            HandlerCommand::UnregisterBarType { topic } => {
282                self.bar_types.remove(&topic);
283            }
284            HandlerCommand::SendText(text) => {
285                if let Err(e) = self.client.send_text(text, None).await {
286                    tracing::error!("Failed to send WebSocket text: {e}");
287                }
288            }
289        }
290    }
291
292    /// Registers a bar type for a specific topic (e.g., "BTC-USD/1MIN").
293    pub fn register_bar_type(&mut self, topic: String, bar_type: BarType) {
294        self.bar_types.insert(topic, bar_type);
295    }
296
297    /// Unregisters a bar type for a specific topic.
298    pub fn unregister_bar_type(&mut self, topic: &str) {
299        self.bar_types.remove(topic);
300    }
301
302    /// Processes a WebSocket message and converts it to Nautilus domain objects.
303    ///
304    /// # Errors
305    ///
306    /// Returns an error if message parsing fails.
307    #[allow(clippy::result_large_err)]
308    pub fn handle_message(&self, msg: DydxWsMessage) -> DydxWsResult<Option<NautilusWsMessage>> {
309        match msg {
310            DydxWsMessage::Connected(_) => {
311                tracing::info!("dYdX WebSocket connected");
312                Ok(None)
313            }
314            DydxWsMessage::Subscribed(sub) => {
315                tracing::debug!("Subscribed to {} (id: {:?})", sub.channel, sub.id);
316                Ok(None)
317            }
318            DydxWsMessage::SubaccountsSubscribed(msg) => {
319                tracing::debug!("Subaccounts subscribed with initial state");
320                self.parse_subaccounts_subscribed(&msg)
321            }
322            DydxWsMessage::Unsubscribed(unsub) => {
323                tracing::debug!("Unsubscribed from {} (id: {:?})", unsub.channel, unsub.id);
324                Ok(None)
325            }
326            DydxWsMessage::ChannelData(data) => self.handle_channel_data(data),
327            DydxWsMessage::ChannelBatchData(data) => self.handle_channel_batch_data(data),
328            DydxWsMessage::Error(err) => Ok(Some(NautilusWsMessage::Error(err))),
329            DydxWsMessage::Reconnected => Ok(Some(NautilusWsMessage::Reconnected)),
330            DydxWsMessage::Pong => Ok(None),
331            DydxWsMessage::Raw(_) => Ok(None),
332        }
333    }
334
335    fn handle_channel_data(
336        &self,
337        data: DydxWsChannelDataMsg,
338    ) -> DydxWsResult<Option<NautilusWsMessage>> {
339        match data.channel {
340            DydxWsChannel::Trades => self.parse_trades(&data),
341            DydxWsChannel::Orderbook => self.parse_orderbook(&data, false),
342            DydxWsChannel::Candles => self.parse_candles(&data),
343            DydxWsChannel::Markets => self.parse_markets(&data),
344            DydxWsChannel::Subaccounts => self.parse_subaccounts(&data),
345            DydxWsChannel::BlockHeight => {
346                tracing::debug!("Block height update received");
347                Ok(None)
348            }
349        }
350    }
351
352    fn handle_channel_batch_data(
353        &self,
354        data: DydxWsChannelBatchDataMsg,
355    ) -> DydxWsResult<Option<NautilusWsMessage>> {
356        match data.channel {
357            DydxWsChannel::Orderbook => self.parse_orderbook_batch(&data),
358            _ => {
359                tracing::warn!("Unexpected batch data for channel: {:?}", data.channel);
360                Ok(None)
361            }
362        }
363    }
364
365    fn parse_trades(&self, data: &DydxWsChannelDataMsg) -> DydxWsResult<Option<NautilusWsMessage>> {
366        let symbol = data
367            .id
368            .as_ref()
369            .ok_or_else(|| DydxWsError::Parse("Missing id for trades channel".into()))?;
370
371        let instrument_id = self.parse_instrument_id(symbol)?;
372        let instrument = self.get_instrument(&instrument_id)?;
373
374        let contents: DydxTradeContents = serde_json::from_value(data.contents.clone())
375            .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade contents: {e}")))?;
376
377        let mut ticks = Vec::new();
378        let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
379
380        for trade in contents.trades {
381            let aggressor_side = match trade.side {
382                OrderSide::Buy => AggressorSide::Buyer,
383                OrderSide::Sell => AggressorSide::Seller,
384                _ => continue, // Skip NoOrderSide
385            };
386
387            let price = Decimal::from_str(&trade.price)
388                .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade price: {e}")))?;
389
390            let size = Decimal::from_str(&trade.size)
391                .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade size: {e}")))?;
392
393            let trade_ts = trade.created_at.timestamp_nanos_opt().ok_or_else(|| {
394                DydxWsError::Parse(format!("Timestamp out of range for trade {}", trade.id))
395            })?;
396
397            let tick = TradeTick::new(
398                instrument_id,
399                Price::from_decimal_dp(price, instrument.price_precision()).map_err(|e| {
400                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
401                })?,
402                Quantity::from_decimal_dp(size, instrument.size_precision()).map_err(|e| {
403                    DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
404                })?,
405                aggressor_side,
406                TradeId::new(&trade.id),
407                UnixNanos::from(trade_ts as u64),
408                ts_init,
409            );
410            ticks.push(Data::Trade(tick));
411        }
412
413        if ticks.is_empty() {
414            Ok(None)
415        } else {
416            Ok(Some(NautilusWsMessage::Data(ticks)))
417        }
418    }
419
420    fn parse_orderbook(
421        &self,
422        data: &DydxWsChannelDataMsg,
423        is_snapshot: bool,
424    ) -> DydxWsResult<Option<NautilusWsMessage>> {
425        let symbol = data
426            .id
427            .as_ref()
428            .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook channel".into()))?;
429
430        let instrument_id = self.parse_instrument_id(symbol)?;
431        let instrument = self.get_instrument(&instrument_id)?;
432
433        let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
434
435        if is_snapshot {
436            let contents: DydxOrderbookSnapshotContents =
437                serde_json::from_value(data.contents.clone()).map_err(|e| {
438                    DydxWsError::Parse(format!("Failed to parse orderbook snapshot: {e}"))
439                })?;
440
441            let deltas = self.parse_orderbook_snapshot(
442                &instrument_id,
443                &contents,
444                instrument.price_precision(),
445                instrument.size_precision(),
446                ts_init,
447            )?;
448
449            Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
450        } else {
451            let contents: DydxOrderbookContents = serde_json::from_value(data.contents.clone())
452                .map_err(|e| {
453                    DydxWsError::Parse(format!("Failed to parse orderbook contents: {e}"))
454                })?;
455
456            let deltas = self.parse_orderbook_deltas(
457                &instrument_id,
458                &contents,
459                instrument.price_precision(),
460                instrument.size_precision(),
461                ts_init,
462            )?;
463
464            Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
465        }
466    }
467
468    fn parse_orderbook_batch(
469        &self,
470        data: &DydxWsChannelBatchDataMsg,
471    ) -> DydxWsResult<Option<NautilusWsMessage>> {
472        let symbol = data
473            .id
474            .as_ref()
475            .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook batch channel".into()))?;
476
477        let instrument_id = self.parse_instrument_id(symbol)?;
478        let instrument = self.get_instrument(&instrument_id)?;
479
480        let contents: Vec<DydxOrderbookContents> = serde_json::from_value(data.contents.clone())
481            .map_err(|e| DydxWsError::Parse(format!("Failed to parse orderbook batch: {e}")))?;
482
483        let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
484        let mut all_deltas = Vec::new();
485
486        let num_messages = contents.len();
487        for (idx, content) in contents.iter().enumerate() {
488            let is_last_message = idx == num_messages - 1;
489            let deltas = self.parse_orderbook_deltas_with_flag(
490                &instrument_id,
491                content,
492                instrument.price_precision(),
493                instrument.size_precision(),
494                ts_init,
495                is_last_message,
496            )?;
497            all_deltas.extend(deltas);
498        }
499
500        let deltas = OrderBookDeltas::new(instrument_id, all_deltas);
501        Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
502    }
503
504    fn parse_orderbook_snapshot(
505        &self,
506        instrument_id: &InstrumentId,
507        contents: &DydxOrderbookSnapshotContents,
508        price_precision: u8,
509        size_precision: u8,
510        ts_init: UnixNanos,
511    ) -> DydxWsResult<OrderBookDeltas> {
512        let mut deltas = Vec::new();
513
514        // Add clear delta first
515        deltas.push(OrderBookDelta::clear(*instrument_id, 0, ts_init, ts_init));
516
517        let bids = contents.bids.as_deref().unwrap_or(&[]);
518        let asks = contents.asks.as_deref().unwrap_or(&[]);
519
520        let bids_len = bids.len();
521        let asks_len = asks.len();
522
523        for (idx, bid) in bids.iter().enumerate() {
524            let is_last = idx == bids_len - 1 && asks_len == 0;
525            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
526
527            let price = Decimal::from_str(&bid.price)
528                .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
529
530            let size = Decimal::from_str(&bid.size)
531                .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
532
533            let order = BookOrder::new(
534                OrderSide::Buy,
535                Price::from_decimal_dp(price, price_precision).map_err(|e| {
536                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
537                })?,
538                Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
539                    DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
540                })?,
541                0,
542            );
543
544            deltas.push(OrderBookDelta::new(
545                *instrument_id,
546                BookAction::Add,
547                order,
548                flags,
549                0,
550                ts_init,
551                ts_init,
552            ));
553        }
554
555        for (idx, ask) in asks.iter().enumerate() {
556            let is_last = idx == asks_len - 1;
557            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
558
559            let price = Decimal::from_str(&ask.price)
560                .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
561
562            let size = Decimal::from_str(&ask.size)
563                .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
564
565            let order = BookOrder::new(
566                OrderSide::Sell,
567                Price::from_decimal_dp(price, price_precision).map_err(|e| {
568                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
569                })?,
570                Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
571                    DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
572                })?,
573                0,
574            );
575
576            deltas.push(OrderBookDelta::new(
577                *instrument_id,
578                BookAction::Add,
579                order,
580                flags,
581                0,
582                ts_init,
583                ts_init,
584            ));
585        }
586
587        Ok(OrderBookDeltas::new(*instrument_id, deltas))
588    }
589
590    fn parse_orderbook_deltas(
591        &self,
592        instrument_id: &InstrumentId,
593        contents: &DydxOrderbookContents,
594        price_precision: u8,
595        size_precision: u8,
596        ts_init: UnixNanos,
597    ) -> DydxWsResult<OrderBookDeltas> {
598        let deltas = self.parse_orderbook_deltas_with_flag(
599            instrument_id,
600            contents,
601            price_precision,
602            size_precision,
603            ts_init,
604            true, // Mark as last message by default
605        )?;
606        Ok(OrderBookDeltas::new(*instrument_id, deltas))
607    }
608
609    #[allow(clippy::too_many_arguments)]
610    fn parse_orderbook_deltas_with_flag(
611        &self,
612        instrument_id: &InstrumentId,
613        contents: &DydxOrderbookContents,
614        price_precision: u8,
615        size_precision: u8,
616        ts_init: UnixNanos,
617        is_last_message: bool,
618    ) -> DydxWsResult<Vec<OrderBookDelta>> {
619        let mut deltas = Vec::new();
620
621        let bids = contents.bids.as_deref().unwrap_or(&[]);
622        let asks = contents.asks.as_deref().unwrap_or(&[]);
623
624        let bids_len = bids.len();
625        let asks_len = asks.len();
626
627        for (idx, (price_str, size_str)) in bids.iter().enumerate() {
628            let is_last = is_last_message && idx == bids_len - 1 && asks_len == 0;
629            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
630
631            let price = Decimal::from_str(price_str)
632                .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
633
634            let size = Decimal::from_str(size_str)
635                .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
636
637            let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
638                DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
639            })?;
640            let action = if qty.is_zero() {
641                BookAction::Delete
642            } else {
643                BookAction::Update
644            };
645
646            let order = BookOrder::new(
647                OrderSide::Buy,
648                Price::from_decimal_dp(price, price_precision).map_err(|e| {
649                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
650                })?,
651                qty,
652                0,
653            );
654
655            deltas.push(OrderBookDelta::new(
656                *instrument_id,
657                action,
658                order,
659                flags,
660                0,
661                ts_init,
662                ts_init,
663            ));
664        }
665
666        for (idx, (price_str, size_str)) in asks.iter().enumerate() {
667            let is_last = is_last_message && idx == asks_len - 1;
668            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
669
670            let price = Decimal::from_str(price_str)
671                .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
672
673            let size = Decimal::from_str(size_str)
674                .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
675
676            let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
677                DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
678            })?;
679            let action = if qty.is_zero() {
680                BookAction::Delete
681            } else {
682                BookAction::Update
683            };
684
685            let order = BookOrder::new(
686                OrderSide::Sell,
687                Price::from_decimal_dp(price, price_precision).map_err(|e| {
688                    DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
689                })?,
690                qty,
691                0,
692            );
693
694            deltas.push(OrderBookDelta::new(
695                *instrument_id,
696                action,
697                order,
698                flags,
699                0,
700                ts_init,
701                ts_init,
702            ));
703        }
704
705        Ok(deltas)
706    }
707
708    fn parse_candles(
709        &self,
710        data: &DydxWsChannelDataMsg,
711    ) -> DydxWsResult<Option<NautilusWsMessage>> {
712        let topic = data
713            .id
714            .as_ref()
715            .ok_or_else(|| DydxWsError::Parse("Missing id for candles channel".into()))?;
716
717        let bar_type = self.bar_types.get(topic).ok_or_else(|| {
718            DydxWsError::Parse(format!("No bar type registered for topic: {topic}"))
719        })?;
720
721        let candle: DydxCandle = serde_json::from_value(data.contents.clone())
722            .map_err(|e| DydxWsError::Parse(format!("Failed to parse candle contents: {e}")))?;
723
724        let instrument_id = self.parse_instrument_id(&candle.ticker)?;
725        let instrument = self.get_instrument(&instrument_id)?;
726
727        let open = Decimal::from_str(&candle.open)
728            .map_err(|e| DydxWsError::Parse(format!("Failed to parse open: {e}")))?;
729        let high = Decimal::from_str(&candle.high)
730            .map_err(|e| DydxWsError::Parse(format!("Failed to parse high: {e}")))?;
731        let low = Decimal::from_str(&candle.low)
732            .map_err(|e| DydxWsError::Parse(format!("Failed to parse low: {e}")))?;
733        let close = Decimal::from_str(&candle.close)
734            .map_err(|e| DydxWsError::Parse(format!("Failed to parse close: {e}")))?;
735        let volume = Decimal::from_str(&candle.base_token_volume)
736            .map_err(|e| DydxWsError::Parse(format!("Failed to parse volume: {e}")))?;
737
738        let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
739
740        // Calculate ts_event: startedAt + interval
741        let started_at_nanos = candle.started_at.timestamp_nanos_opt().ok_or_else(|| {
742            DydxWsError::Parse(format!(
743                "Timestamp out of range for candle at {}",
744                candle.started_at
745            ))
746        })?;
747        let interval_nanos = get_bar_interval_ns(bar_type);
748        let ts_event = UnixNanos::from(started_at_nanos as u64) + interval_nanos;
749
750        let bar = Bar::new(
751            *bar_type,
752            Price::from_decimal_dp(open, instrument.price_precision()).map_err(|e| {
753                DydxWsError::Parse(format!("Failed to create open Price from decimal: {e}"))
754            })?,
755            Price::from_decimal_dp(high, instrument.price_precision()).map_err(|e| {
756                DydxWsError::Parse(format!("Failed to create high Price from decimal: {e}"))
757            })?,
758            Price::from_decimal_dp(low, instrument.price_precision()).map_err(|e| {
759                DydxWsError::Parse(format!("Failed to create low Price from decimal: {e}"))
760            })?,
761            Price::from_decimal_dp(close, instrument.price_precision()).map_err(|e| {
762                DydxWsError::Parse(format!("Failed to create close Price from decimal: {e}"))
763            })?,
764            Quantity::from_decimal_dp(volume, instrument.size_precision()).map_err(|e| {
765                DydxWsError::Parse(format!(
766                    "Failed to create volume Quantity from decimal: {e}"
767                ))
768            })?,
769            ts_event,
770            ts_init,
771        );
772
773        Ok(Some(NautilusWsMessage::Data(vec![Data::Bar(bar)])))
774    }
775
776    fn parse_markets(
777        &self,
778        data: &DydxWsChannelDataMsg,
779    ) -> DydxWsResult<Option<NautilusWsMessage>> {
780        let contents: DydxMarketsContents = serde_json::from_value(data.contents.clone())
781            .map_err(|e| DydxWsError::Parse(format!("Failed to parse markets contents: {e}")))?;
782
783        // Markets channel provides oracle price updates needed for margin calculations
784        // Forward to execution client to update oracle_prices map
785        if let Some(oracle_prices) = contents.oracle_prices {
786            tracing::debug!(
787                "Forwarding oracle price updates for {} markets to execution client",
788                oracle_prices.len()
789            );
790            return Ok(Some(NautilusWsMessage::OraclePrices(oracle_prices)));
791        }
792
793        Ok(None)
794    }
795
796    fn parse_subaccounts(
797        &self,
798        data: &DydxWsChannelDataMsg,
799    ) -> DydxWsResult<Option<NautilusWsMessage>> {
800        use crate::schemas::ws::{DydxWsSubaccountsChannelContents, DydxWsSubaccountsChannelData};
801
802        let contents: DydxWsSubaccountsChannelContents =
803            serde_json::from_value(data.contents.clone()).map_err(|e| {
804                DydxWsError::Parse(format!("Failed to parse subaccounts contents: {e}"))
805            })?;
806
807        // Check if we have any orders or fills
808        let has_orders = contents.orders.as_ref().is_some_and(|o| !o.is_empty());
809        let has_fills = contents.fills.as_ref().is_some_and(|f| !f.is_empty());
810
811        if has_orders || has_fills {
812            // Forward raw channel data to execution client for parsing
813            // The execution client has the clob_pair_id and instrument mappings needed
814            tracing::debug!(
815                "Received {} order(s), {} fill(s) - forwarding to execution client",
816                contents.orders.as_ref().map_or(0, |o| o.len()),
817                contents.fills.as_ref().map_or(0, |f| f.len())
818            );
819
820            let channel_data = DydxWsSubaccountsChannelData {
821                msg_type: data.msg_type,
822                connection_id: data.connection_id.clone(),
823                message_id: data.message_id,
824                id: data.id.clone().unwrap_or_default(),
825                channel: data.channel,
826                version: data.version.clone().unwrap_or_default(),
827                contents,
828            };
829
830            return Ok(Some(NautilusWsMessage::SubaccountsChannelData(Box::new(
831                channel_data,
832            ))));
833        }
834
835        Ok(None)
836    }
837
838    fn parse_subaccounts_subscribed(
839        &self,
840        msg: &crate::schemas::ws::DydxWsSubaccountsSubscribed,
841    ) -> DydxWsResult<Option<NautilusWsMessage>> {
842        // Pass raw subaccount subscription to execution client for parsing
843        // The execution client has access to instruments and oracle prices needed for margin calculations
844        tracing::debug!("Forwarding subaccount subscription to execution client");
845        Ok(Some(NautilusWsMessage::SubaccountSubscribed(Box::new(
846            msg.clone(),
847        ))))
848    }
849
850    fn parse_instrument_id(&self, symbol: &str) -> DydxWsResult<InstrumentId> {
851        // dYdX WS uses raw symbols (e.g., "BTC-USD")
852        // Need to append "-PERP" to match Nautilus instrument IDs
853        let symbol_with_perp = format!("{symbol}-PERP");
854        Ok(crate::common::parse::parse_instrument_id(&symbol_with_perp))
855    }
856
857    fn get_instrument(&self, instrument_id: &InstrumentId) -> DydxWsResult<&InstrumentAny> {
858        self.instruments
859            .get(&instrument_id.symbol.inner())
860            .ok_or_else(|| DydxWsError::Parse(format!("No instrument cached for {instrument_id}")))
861    }
862}