1use std::{
25 fmt::Debug,
26 str::FromStr,
27 sync::{
28 Arc,
29 atomic::{AtomicBool, Ordering},
30 },
31};
32
33use ahash::AHashMap;
34use nautilus_core::{nanos::UnixNanos, time::get_atomic_clock_realtime};
35use nautilus_model::{
36 data::{
37 Bar, BarType, BookOrder, Data, OrderBookDelta, OrderBookDeltas, TradeTick,
38 bar::get_bar_interval_ns,
39 },
40 enums::{AggressorSide, BookAction, OrderSide, RecordFlag},
41 identifiers::{AccountId, InstrumentId, TradeId},
42 instruments::{Instrument, InstrumentAny},
43 types::{Price, Quantity},
44};
45use nautilus_network::{
46 RECONNECTED,
47 retry::{RetryManager, create_websocket_retry_manager},
48 websocket::{SubscriptionState, WebSocketClient},
49};
50use rust_decimal::Decimal;
51use tokio_tungstenite::tungstenite::Message;
52use ustr::Ustr;
53
54use super::{
55 DydxWsError, DydxWsResult,
56 client::DYDX_RATE_LIMIT_KEY_SUBSCRIPTION,
57 enums::{DydxWsChannel, DydxWsMessage, NautilusWsMessage},
58 error::DydxWebSocketError,
59 messages::{
60 DydxBlockHeightChannelContents, DydxCandle, DydxMarketsContents, DydxOrderbookContents,
61 DydxOrderbookSnapshotContents, DydxTradeContents, DydxWsBlockHeightMessage,
62 DydxWsCandlesMessage, DydxWsChannelBatchDataMsg, DydxWsChannelDataMsg, DydxWsConnectedMsg,
63 DydxWsFeedMessage, DydxWsGenericMsg, DydxWsMarketsMessage, DydxWsOrderbookMessage,
64 DydxWsSubaccountsChannelContents, DydxWsSubaccountsChannelData, DydxWsSubaccountsMessage,
65 DydxWsSubaccountsSubscribed, DydxWsSubscriptionMsg, DydxWsTradesMessage,
66 },
67};
68use crate::common::parse::parse_instrument_id;
69
70#[derive(Debug, Clone)]
72pub enum HandlerCommand {
73 UpdateInstrument(Box<InstrumentAny>),
75 InitializeInstruments(Vec<InstrumentAny>),
77 RegisterBarType { topic: String, bar_type: BarType },
79 UnregisterBarType { topic: String },
81 SendText(String),
83}
84
85pub struct FeedHandler {
90 account_id: Option<AccountId>,
92 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
94 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
96 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
98 client: WebSocketClient,
100 signal: Arc<AtomicBool>,
102 retry_manager: RetryManager<DydxWsError>,
104 instruments: AHashMap<Ustr, InstrumentAny>,
106 bar_types: AHashMap<String, BarType>,
108 subscriptions: SubscriptionState,
110}
111
112impl Debug for FeedHandler {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 f.debug_struct(stringify!(FeedHandler))
115 .field("account_id", &self.account_id)
116 .field("instruments_count", &self.instruments.len())
117 .field("bar_types_count", &self.bar_types.len())
118 .finish_non_exhaustive()
119 }
120}
121
122impl FeedHandler {
123 #[must_use]
125 pub fn new(
126 account_id: Option<AccountId>,
127 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
128 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
129 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
130 client: WebSocketClient,
131 signal: Arc<AtomicBool>,
132 subscriptions: SubscriptionState,
133 ) -> Self {
134 Self {
135 account_id,
136 cmd_rx,
137 out_tx,
138 raw_rx,
139 client,
140 signal,
141 retry_manager: create_websocket_retry_manager(),
142 instruments: AHashMap::new(),
143 bar_types: AHashMap::new(),
144 subscriptions,
145 }
146 }
147
148 async fn send_with_retry(
152 &self,
153 payload: String,
154 rate_limit_keys: Option<Vec<String>>,
155 ) -> Result<(), DydxWsError> {
156 self.retry_manager
157 .execute_with_retry(
158 "websocket_send",
159 || {
160 let payload = payload.clone();
161 let keys = rate_limit_keys.clone();
162 async move {
163 self.client
164 .send_text(payload, keys)
165 .await
166 .map_err(|e| DydxWsError::ClientError(format!("Send failed: {e}")))
167 }
168 },
169 should_retry_dydx_error,
170 create_dydx_timeout_error,
171 )
172 .await
173 }
174
175 pub async fn run(&mut self) {
177 loop {
178 tokio::select! {
179 Some(cmd) = self.cmd_rx.recv() => {
180 self.handle_command(cmd).await;
181 }
182
183 Some(msg) = self.raw_rx.recv() => {
184 if let Some(nautilus_msg) = self.process_raw_message(msg).await
185 && self.out_tx.send(nautilus_msg).is_err()
186 {
187 log::debug!("Receiver dropped, stopping handler");
188 break;
189 }
190 }
191
192 else => {
193 log::debug!("Handler shutting down: channels closed");
194 break;
195 }
196 }
197
198 if self.signal.load(Ordering::Relaxed) {
199 log::debug!("Handler received stop signal");
200 break;
201 }
202 }
203 }
204
205 async fn process_raw_message(&self, msg: Message) -> Option<NautilusWsMessage> {
207 match msg {
208 Message::Text(txt) => {
209 if txt == RECONNECTED {
210 if let Err(e) = self.replay_subscriptions().await {
211 log::error!("Failed to replay subscriptions after reconnect: {e}");
212 }
213 return Some(NautilusWsMessage::Reconnected);
214 }
215
216 match serde_json::from_str::<serde_json::Value>(&txt) {
217 Ok(val) => {
218 let val_clone = val.clone();
219
220 if let Ok(feed_msg) =
222 serde_json::from_value::<DydxWsFeedMessage>(val.clone())
223 {
224 return self.handle_feed_message(feed_msg).await;
225 }
226
227 match serde_json::from_value::<DydxWsGenericMsg>(val.clone()) {
230 Ok(meta) => {
231 let result = if meta.is_connected() {
232 serde_json::from_value::<DydxWsConnectedMsg>(val)
233 .map(DydxWsMessage::Connected)
234 } else if meta.is_subscribed() {
235 if let Ok(sub_msg) =
236 serde_json::from_value::<DydxWsSubscriptionMsg>(val.clone())
237 {
238 if sub_msg.channel == DydxWsChannel::Subaccounts {
239 serde_json::from_value::<DydxWsSubaccountsSubscribed>(
240 val.clone(),
241 )
242 .map(DydxWsMessage::SubaccountsSubscribed)
243 .or_else(|e| {
244 log::debug!(
245 "Failed to parse subaccounts subscription (fallback to standard): {e}. Raw: {}",
246 serde_json::to_string(&val_clone)
247 .unwrap_or_else(|_| "<invalid json>".into())
248 );
249 Ok(DydxWsMessage::Subscribed(sub_msg))
250 })
251 } else {
252 Ok(DydxWsMessage::Subscribed(sub_msg))
253 }
254 } else {
255 serde_json::from_value::<DydxWsSubscriptionMsg>(val)
256 .map(DydxWsMessage::Subscribed)
257 }
258 } else if meta.is_unsubscribed() {
259 serde_json::from_value::<DydxWsSubscriptionMsg>(val)
260 .map(DydxWsMessage::Unsubscribed)
261 } else if meta.is_error() {
262 serde_json::from_value::<DydxWebSocketError>(val)
263 .map(DydxWsMessage::Error)
264 } else if meta.is_unknown() {
265 log::warn!(
266 "Received unknown WebSocket message type: {}",
267 serde_json::to_string(&val_clone)
268 .unwrap_or_else(|_| "<invalid json>".into())
269 );
270 Ok(DydxWsMessage::Raw(val))
271 } else {
272 Ok(DydxWsMessage::Raw(val))
273 };
274
275 match result {
276 Ok(dydx_msg) => self.handle_dydx_message(dydx_msg).await,
277 Err(e) => {
278 log::error!(
279 "Failed to parse WebSocket message: {e}. Message type: {:?}, Channel: {:?}. Raw: {}",
280 meta.msg_type,
281 meta.channel,
282 serde_json::to_string(&val_clone)
283 .unwrap_or_else(|_| "<invalid json>".into())
284 );
285 None
286 }
287 }
288 }
289 Err(e) => {
290 let raw_json = serde_json::to_string_pretty(&val_clone)
291 .unwrap_or_else(|_| format!("{val_clone:?}"));
292 log::error!(
293 "Failed to parse WebSocket message envelope (DydxWsGenericMsg): {e}\nRaw JSON:\n{raw_json}"
294 );
295 None
296 }
297 }
298 }
299 Err(e) => {
300 let err = DydxWebSocketError::from_message(e.to_string());
301 Some(NautilusWsMessage::Error(err))
302 }
303 }
304 }
305 Message::Pong(_data) => None,
306 Message::Ping(_data) => None, Message::Binary(_bin) => None, Message::Close(_frame) => {
309 log::info!("WebSocket close frame received");
310 None
311 }
312 Message::Frame(_) => None,
313 }
314 }
315
316 async fn handle_dydx_message(&self, msg: DydxWsMessage) -> Option<NautilusWsMessage> {
318 match self.handle_message(msg).await {
319 Ok(opt_msg) => opt_msg,
320 Err(e) => {
321 log::error!("Error handling message: {e}");
322 None
323 }
324 }
325 }
326
327 async fn handle_feed_message(&self, feed_msg: DydxWsFeedMessage) -> Option<NautilusWsMessage> {
329 match feed_msg {
330 DydxWsFeedMessage::Subaccounts(msg) => match msg {
331 DydxWsSubaccountsMessage::Subscribed(data) => {
332 self.handle_dydx_message(DydxWsMessage::SubaccountsSubscribed(data))
333 .await
334 }
335 DydxWsSubaccountsMessage::ChannelData(data) => {
336 self.handle_dydx_message(DydxWsMessage::ChannelData(DydxWsChannelDataMsg {
337 msg_type: data.msg_type,
338 connection_id: data.connection_id,
339 message_id: data.message_id,
340 channel: data.channel,
341 id: Some(data.id),
342 contents: serde_json::to_value(&data.contents)
343 .unwrap_or(serde_json::Value::Null),
344 version: Some(data.version),
345 }))
346 .await
347 }
348 },
349 DydxWsFeedMessage::Orderbook(msg) => match msg {
350 DydxWsOrderbookMessage::Subscribed(data)
351 | DydxWsOrderbookMessage::ChannelData(data) => {
352 self.handle_dydx_message(DydxWsMessage::ChannelData(data))
353 .await
354 }
355 DydxWsOrderbookMessage::ChannelBatchData(data) => {
356 self.handle_dydx_message(DydxWsMessage::ChannelBatchData(data))
357 .await
358 }
359 },
360 DydxWsFeedMessage::Trades(msg) => match msg {
361 DydxWsTradesMessage::Subscribed(data) | DydxWsTradesMessage::ChannelData(data) => {
362 self.handle_dydx_message(DydxWsMessage::ChannelData(data))
363 .await
364 }
365 },
366 DydxWsFeedMessage::Markets(msg) => match msg {
367 DydxWsMarketsMessage::Subscribed(data)
368 | DydxWsMarketsMessage::ChannelData(data) => {
369 self.handle_dydx_message(DydxWsMessage::ChannelData(data))
370 .await
371 }
372 },
373 DydxWsFeedMessage::Candles(msg) => match msg {
374 DydxWsCandlesMessage::Subscribed(data)
375 | DydxWsCandlesMessage::ChannelData(data) => {
376 self.handle_dydx_message(DydxWsMessage::ChannelData(data))
377 .await
378 }
379 },
380 DydxWsFeedMessage::ParentSubaccounts(msg) => match msg {
381 super::messages::DydxWsParentSubaccountsMessage::Subscribed(data)
382 | super::messages::DydxWsParentSubaccountsMessage::ChannelData(data) => {
383 self.handle_dydx_message(DydxWsMessage::ChannelData(data))
384 .await
385 }
386 },
387 DydxWsFeedMessage::BlockHeight(msg) => match msg {
388 DydxWsBlockHeightMessage::Subscribed(data) => {
389 match data.contents.height.parse::<u64>() {
391 Ok(height) => Some(NautilusWsMessage::BlockHeight(height)),
392 Err(e) => {
393 log::warn!("Failed to parse block height from subscription: {e}");
394 None
395 }
396 }
397 }
398 DydxWsBlockHeightMessage::ChannelData(data) => {
399 match data.contents.block_height.parse::<u64>() {
401 Ok(height) => Some(NautilusWsMessage::BlockHeight(height)),
402 Err(e) => {
403 log::warn!("Failed to parse block height from channel data: {e}");
404 None
405 }
406 }
407 }
408 },
409 }
410 }
411
412 async fn handle_command(&mut self, command: HandlerCommand) {
414 match command {
415 HandlerCommand::UpdateInstrument(instrument) => {
416 let symbol = instrument.id().symbol.inner();
417 self.instruments.insert(symbol, *instrument);
418 }
419 HandlerCommand::InitializeInstruments(instruments) => {
420 for instrument in instruments {
421 let symbol = instrument.id().symbol.inner();
422 self.instruments.insert(symbol, instrument);
423 }
424 }
425 HandlerCommand::RegisterBarType { topic, bar_type } => {
426 self.bar_types.insert(topic, bar_type);
427 }
428 HandlerCommand::UnregisterBarType { topic } => {
429 self.bar_types.remove(&topic);
430 }
431 HandlerCommand::SendText(text) => {
432 if let Err(e) = self
433 .send_with_retry(
434 text,
435 Some(vec![DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
436 )
437 .await
438 {
439 log::error!("Failed to send WebSocket text after retries: {e}");
440 }
441 }
442 }
443 }
444
445 pub fn register_bar_type(&mut self, topic: String, bar_type: BarType) {
447 self.bar_types.insert(topic, bar_type);
448 }
449
450 pub fn unregister_bar_type(&mut self, topic: &str) {
452 self.bar_types.remove(topic);
453 }
454
455 fn topic_from_msg(&self, channel: &super::enums::DydxWsChannel, id: &Option<String>) -> String {
456 match id {
457 Some(id) => format!(
458 "{}{}{}",
459 channel.as_ref(),
460 self.subscriptions.delimiter(),
461 id
462 ),
463 None => channel.as_ref().to_string(),
464 }
465 }
466
467 fn subscription_from_topic(
468 &self,
469 topic: &str,
470 op: super::enums::DydxWsOperation,
471 ) -> Option<super::messages::DydxSubscription> {
472 let (channel, symbol) = nautilus_network::websocket::subscription::split_topic(
473 topic,
474 self.subscriptions.delimiter(),
475 );
476 let channel = super::enums::DydxWsChannel::from_str(channel).ok()?;
477 let id = symbol.map(std::string::ToString::to_string);
478
479 Some(super::messages::DydxSubscription { op, channel, id })
480 }
481
482 async fn replay_subscriptions(&self) -> DydxWsResult<()> {
483 let topics = self.subscriptions.all_topics();
484 for topic in topics {
485 let Some(subscription) =
486 self.subscription_from_topic(&topic, super::enums::DydxWsOperation::Subscribe)
487 else {
488 log::warn!("Failed to reconstruct subscription from topic: {topic}");
489 continue;
490 };
491
492 let payload = serde_json::to_string(&subscription)?;
493 self.subscriptions.mark_subscribe(&topic);
494
495 if let Err(e) = self
496 .send_with_retry(
497 payload,
498 Some(vec![DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
499 )
500 .await
501 {
502 self.subscriptions.mark_failure(&topic);
503 return Err(e);
504 }
505 }
506
507 Ok(())
508 }
509
510 #[allow(clippy::result_large_err)]
516 pub async fn handle_message(
517 &self,
518 msg: DydxWsMessage,
519 ) -> DydxWsResult<Option<NautilusWsMessage>> {
520 match msg {
521 DydxWsMessage::Connected(_) => {
522 log::info!("dYdX WebSocket connected");
523 Ok(None)
524 }
525 DydxWsMessage::Subscribed(sub) => {
526 log::debug!("Subscribed to {} (id: {:?})", sub.channel, sub.id);
527 let topic = self.topic_from_msg(&sub.channel, &sub.id);
528 self.subscriptions.confirm_subscribe(&topic);
529 Ok(None)
530 }
531 DydxWsMessage::SubaccountsSubscribed(msg) => {
532 log::debug!("Subaccounts subscribed with initial state");
533 let topic = self.topic_from_msg(&msg.channel, &Some(msg.id.clone()));
534 self.subscriptions.confirm_subscribe(&topic);
535 self.parse_subaccounts_subscribed(&msg)
536 }
537 DydxWsMessage::Unsubscribed(unsub) => {
538 log::debug!("Unsubscribed from {} (id: {:?})", unsub.channel, unsub.id);
539 let topic = self.topic_from_msg(&unsub.channel, &unsub.id);
540 self.subscriptions.confirm_unsubscribe(&topic);
541 Ok(None)
542 }
543 DydxWsMessage::ChannelData(data) => self.handle_channel_data(data),
544 DydxWsMessage::ChannelBatchData(data) => self.handle_channel_batch_data(data),
545 DydxWsMessage::BlockHeight(height) => Ok(Some(NautilusWsMessage::BlockHeight(height))),
546 DydxWsMessage::Error(err) => Ok(Some(NautilusWsMessage::Error(err))),
547 DydxWsMessage::Reconnected => {
548 if let Err(e) = self.replay_subscriptions().await {
549 log::error!("Failed to replay subscriptions after reconnect message: {e}");
550 }
551 Ok(Some(NautilusWsMessage::Reconnected))
552 }
553 DydxWsMessage::Pong => Ok(None),
554 DydxWsMessage::Raw(_) => Ok(None),
555 }
556 }
557
558 fn handle_channel_data(
559 &self,
560 data: DydxWsChannelDataMsg,
561 ) -> DydxWsResult<Option<NautilusWsMessage>> {
562 match data.channel {
563 DydxWsChannel::Trades => self.parse_trades(&data),
564 DydxWsChannel::Orderbook => self.parse_orderbook(&data, false),
565 DydxWsChannel::Candles => self.parse_candles(&data),
566 DydxWsChannel::Markets => self.parse_markets(&data),
567 DydxWsChannel::Subaccounts | DydxWsChannel::ParentSubaccounts => {
568 self.parse_subaccounts(&data)
569 }
570 DydxWsChannel::BlockHeight => self.parse_block_height(&data),
571 DydxWsChannel::Unknown => {
572 log::warn!(
573 "Unknown channel data received: id={:?}, msg_type={:?}",
574 data.id,
575 data.msg_type
576 );
577 Ok(None)
578 }
579 }
580 }
581
582 fn handle_channel_batch_data(
583 &self,
584 data: DydxWsChannelBatchDataMsg,
585 ) -> DydxWsResult<Option<NautilusWsMessage>> {
586 match data.channel {
587 DydxWsChannel::Orderbook => self.parse_orderbook_batch(&data),
588 _ => {
589 log::warn!(
590 "Unexpected batch data for channel: {:?}, id={:?}",
591 data.channel,
592 data.id
593 );
594 Ok(None)
595 }
596 }
597 }
598
599 fn parse_block_height(
600 &self,
601 data: &DydxWsChannelDataMsg,
602 ) -> DydxWsResult<Option<NautilusWsMessage>> {
603 let contents: DydxBlockHeightChannelContents =
604 serde_json::from_value(data.contents.clone()).map_err(|e| {
605 DydxWsError::Parse(format!("Failed to parse block height contents: {e}"))
606 })?;
607
608 let height = contents
609 .block_height
610 .parse::<u64>()
611 .map_err(|e| DydxWsError::Parse(format!("Failed to parse block height: {e}")))?;
612
613 Ok(Some(NautilusWsMessage::BlockHeight(height)))
614 }
615
616 fn parse_trades(&self, data: &DydxWsChannelDataMsg) -> DydxWsResult<Option<NautilusWsMessage>> {
617 let symbol = data
618 .id
619 .as_ref()
620 .ok_or_else(|| DydxWsError::Parse("Missing id for trades channel".into()))?;
621
622 let instrument_id = self.parse_instrument_id(symbol)?;
623 let instrument = self.get_instrument(&instrument_id)?;
624
625 let contents: DydxTradeContents = serde_json::from_value(data.contents.clone())
626 .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade contents: {e}")))?;
627
628 let mut ticks = Vec::new();
629 let ts_init = get_atomic_clock_realtime().get_time_ns();
630
631 for trade in contents.trades {
632 let aggressor_side = match trade.side {
633 OrderSide::Buy => AggressorSide::Buyer,
634 OrderSide::Sell => AggressorSide::Seller,
635 _ => continue, };
637
638 let price = Decimal::from_str(&trade.price)
639 .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade price: {e}")))?;
640
641 let size = Decimal::from_str(&trade.size)
642 .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade size: {e}")))?;
643
644 let trade_ts = trade.created_at.timestamp_nanos_opt().ok_or_else(|| {
645 DydxWsError::Parse(format!("Timestamp out of range for trade {}", trade.id))
646 })?;
647
648 let tick = TradeTick::new(
649 instrument_id,
650 Price::from_decimal_dp(price, instrument.price_precision()).map_err(|e| {
651 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
652 })?,
653 Quantity::from_decimal_dp(size, instrument.size_precision()).map_err(|e| {
654 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
655 })?,
656 aggressor_side,
657 TradeId::new(&trade.id),
658 UnixNanos::from(trade_ts as u64),
659 ts_init,
660 );
661 ticks.push(Data::Trade(tick));
662 }
663
664 if ticks.is_empty() {
665 Ok(None)
666 } else {
667 Ok(Some(NautilusWsMessage::Data(ticks)))
668 }
669 }
670
671 fn parse_orderbook(
672 &self,
673 data: &DydxWsChannelDataMsg,
674 is_snapshot: bool,
675 ) -> DydxWsResult<Option<NautilusWsMessage>> {
676 let symbol = data
677 .id
678 .as_ref()
679 .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook channel".into()))?;
680
681 let instrument_id = self.parse_instrument_id(symbol)?;
682 let instrument = self.get_instrument(&instrument_id)?;
683
684 let ts_init = get_atomic_clock_realtime().get_time_ns();
685
686 if is_snapshot {
687 let contents: DydxOrderbookSnapshotContents =
688 serde_json::from_value(data.contents.clone()).map_err(|e| {
689 DydxWsError::Parse(format!("Failed to parse orderbook snapshot: {e}"))
690 })?;
691
692 let deltas = self.parse_orderbook_snapshot(
693 &instrument_id,
694 &contents,
695 instrument.price_precision(),
696 instrument.size_precision(),
697 ts_init,
698 )?;
699
700 Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
701 } else {
702 let contents: DydxOrderbookContents = serde_json::from_value(data.contents.clone())
703 .map_err(|e| {
704 DydxWsError::Parse(format!("Failed to parse orderbook contents: {e}"))
705 })?;
706
707 let deltas = self.parse_orderbook_deltas(
708 &instrument_id,
709 &contents,
710 instrument.price_precision(),
711 instrument.size_precision(),
712 ts_init,
713 )?;
714
715 Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
716 }
717 }
718
719 fn parse_orderbook_batch(
720 &self,
721 data: &DydxWsChannelBatchDataMsg,
722 ) -> DydxWsResult<Option<NautilusWsMessage>> {
723 let symbol = data
724 .id
725 .as_ref()
726 .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook batch channel".into()))?;
727
728 let instrument_id = self.parse_instrument_id(symbol)?;
729 let instrument = self.get_instrument(&instrument_id)?;
730
731 let contents: Vec<DydxOrderbookContents> = serde_json::from_value(data.contents.clone())
732 .map_err(|e| DydxWsError::Parse(format!("Failed to parse orderbook batch: {e}")))?;
733
734 let ts_init = get_atomic_clock_realtime().get_time_ns();
735 let mut all_deltas = Vec::new();
736
737 let num_messages = contents.len();
738 for (idx, content) in contents.iter().enumerate() {
739 let is_last_message = idx == num_messages - 1;
740 let deltas = self.parse_orderbook_deltas_with_flag(
741 &instrument_id,
742 content,
743 instrument.price_precision(),
744 instrument.size_precision(),
745 ts_init,
746 is_last_message,
747 )?;
748 all_deltas.extend(deltas);
749 }
750
751 let deltas = OrderBookDeltas::new(instrument_id, all_deltas);
752 Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
753 }
754
755 fn parse_orderbook_snapshot(
756 &self,
757 instrument_id: &InstrumentId,
758 contents: &DydxOrderbookSnapshotContents,
759 price_precision: u8,
760 size_precision: u8,
761 ts_init: UnixNanos,
762 ) -> DydxWsResult<OrderBookDeltas> {
763 let mut deltas = Vec::new();
764 deltas.push(OrderBookDelta::clear(*instrument_id, 0, ts_init, ts_init));
765
766 let bids = contents.bids.as_deref().unwrap_or(&[]);
767 let asks = contents.asks.as_deref().unwrap_or(&[]);
768
769 let bids_len = bids.len();
770 let asks_len = asks.len();
771
772 for (idx, bid) in bids.iter().enumerate() {
773 let is_last = idx == bids_len - 1 && asks_len == 0;
774 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
775
776 let price = Decimal::from_str(&bid.price)
777 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
778
779 let size = Decimal::from_str(&bid.size)
780 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
781
782 let order = BookOrder::new(
783 OrderSide::Buy,
784 Price::from_decimal_dp(price, price_precision).map_err(|e| {
785 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
786 })?,
787 Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
788 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
789 })?,
790 0,
791 );
792
793 deltas.push(OrderBookDelta::new(
794 *instrument_id,
795 BookAction::Add,
796 order,
797 flags,
798 0,
799 ts_init,
800 ts_init,
801 ));
802 }
803
804 for (idx, ask) in asks.iter().enumerate() {
805 let is_last = idx == asks_len - 1;
806 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
807
808 let price = Decimal::from_str(&ask.price)
809 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
810
811 let size = Decimal::from_str(&ask.size)
812 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
813
814 let order = BookOrder::new(
815 OrderSide::Sell,
816 Price::from_decimal_dp(price, price_precision).map_err(|e| {
817 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
818 })?,
819 Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
820 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
821 })?,
822 0,
823 );
824
825 deltas.push(OrderBookDelta::new(
826 *instrument_id,
827 BookAction::Add,
828 order,
829 flags,
830 0,
831 ts_init,
832 ts_init,
833 ));
834 }
835
836 Ok(OrderBookDeltas::new(*instrument_id, deltas))
837 }
838
839 fn parse_orderbook_deltas(
840 &self,
841 instrument_id: &InstrumentId,
842 contents: &DydxOrderbookContents,
843 price_precision: u8,
844 size_precision: u8,
845 ts_init: UnixNanos,
846 ) -> DydxWsResult<OrderBookDeltas> {
847 let deltas = self.parse_orderbook_deltas_with_flag(
848 instrument_id,
849 contents,
850 price_precision,
851 size_precision,
852 ts_init,
853 true, )?;
855 Ok(OrderBookDeltas::new(*instrument_id, deltas))
856 }
857
858 #[allow(clippy::too_many_arguments)]
859 fn parse_orderbook_deltas_with_flag(
860 &self,
861 instrument_id: &InstrumentId,
862 contents: &DydxOrderbookContents,
863 price_precision: u8,
864 size_precision: u8,
865 ts_init: UnixNanos,
866 is_last_message: bool,
867 ) -> DydxWsResult<Vec<OrderBookDelta>> {
868 let mut deltas = Vec::new();
869
870 let bids = contents.bids.as_deref().unwrap_or(&[]);
871 let asks = contents.asks.as_deref().unwrap_or(&[]);
872
873 let bids_len = bids.len();
874 let asks_len = asks.len();
875
876 for (idx, (price_str, size_str)) in bids.iter().enumerate() {
877 let is_last = is_last_message && idx == bids_len - 1 && asks_len == 0;
878 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
879
880 let price = Decimal::from_str(price_str)
881 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
882
883 let size = Decimal::from_str(size_str)
884 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
885
886 let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
887 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
888 })?;
889 let action = if qty.is_zero() {
890 BookAction::Delete
891 } else {
892 BookAction::Update
893 };
894
895 let order = BookOrder::new(
896 OrderSide::Buy,
897 Price::from_decimal_dp(price, price_precision).map_err(|e| {
898 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
899 })?,
900 qty,
901 0,
902 );
903
904 deltas.push(OrderBookDelta::new(
905 *instrument_id,
906 action,
907 order,
908 flags,
909 0,
910 ts_init,
911 ts_init,
912 ));
913 }
914
915 for (idx, (price_str, size_str)) in asks.iter().enumerate() {
916 let is_last = is_last_message && idx == asks_len - 1;
917 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
918
919 let price = Decimal::from_str(price_str)
920 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
921
922 let size = Decimal::from_str(size_str)
923 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
924
925 let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
926 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
927 })?;
928 let action = if qty.is_zero() {
929 BookAction::Delete
930 } else {
931 BookAction::Update
932 };
933
934 let order = BookOrder::new(
935 OrderSide::Sell,
936 Price::from_decimal_dp(price, price_precision).map_err(|e| {
937 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
938 })?,
939 qty,
940 0,
941 );
942
943 deltas.push(OrderBookDelta::new(
944 *instrument_id,
945 action,
946 order,
947 flags,
948 0,
949 ts_init,
950 ts_init,
951 ));
952 }
953
954 Ok(deltas)
955 }
956
957 fn parse_candles(
958 &self,
959 data: &DydxWsChannelDataMsg,
960 ) -> DydxWsResult<Option<NautilusWsMessage>> {
961 let topic = data
962 .id
963 .as_ref()
964 .ok_or_else(|| DydxWsError::Parse("Missing id for candles channel".into()))?;
965
966 let bar_type = self.bar_types.get(topic).ok_or_else(|| {
967 DydxWsError::Parse(format!("No bar type registered for topic: {topic}"))
968 })?;
969
970 let candle: DydxCandle = serde_json::from_value(data.contents.clone())
971 .map_err(|e| DydxWsError::Parse(format!("Failed to parse candle contents: {e}")))?;
972
973 let instrument_id = self.parse_instrument_id(&candle.ticker)?;
974 let instrument = self.get_instrument(&instrument_id)?;
975
976 let open = Decimal::from_str(&candle.open)
977 .map_err(|e| DydxWsError::Parse(format!("Failed to parse open: {e}")))?;
978 let high = Decimal::from_str(&candle.high)
979 .map_err(|e| DydxWsError::Parse(format!("Failed to parse high: {e}")))?;
980 let low = Decimal::from_str(&candle.low)
981 .map_err(|e| DydxWsError::Parse(format!("Failed to parse low: {e}")))?;
982 let close = Decimal::from_str(&candle.close)
983 .map_err(|e| DydxWsError::Parse(format!("Failed to parse close: {e}")))?;
984 let volume = Decimal::from_str(&candle.base_token_volume)
985 .map_err(|e| DydxWsError::Parse(format!("Failed to parse volume: {e}")))?;
986
987 let ts_init = get_atomic_clock_realtime().get_time_ns();
988
989 let started_at_nanos = candle.started_at.timestamp_nanos_opt().ok_or_else(|| {
990 DydxWsError::Parse(format!(
991 "Timestamp out of range for candle at {}",
992 candle.started_at
993 ))
994 })?;
995 let interval_nanos = get_bar_interval_ns(bar_type);
996 let ts_event = UnixNanos::from(started_at_nanos as u64) + interval_nanos;
997
998 let bar = Bar::new(
999 *bar_type,
1000 Price::from_decimal_dp(open, instrument.price_precision()).map_err(|e| {
1001 DydxWsError::Parse(format!("Failed to create open Price from decimal: {e}"))
1002 })?,
1003 Price::from_decimal_dp(high, instrument.price_precision()).map_err(|e| {
1004 DydxWsError::Parse(format!("Failed to create high Price from decimal: {e}"))
1005 })?,
1006 Price::from_decimal_dp(low, instrument.price_precision()).map_err(|e| {
1007 DydxWsError::Parse(format!("Failed to create low Price from decimal: {e}"))
1008 })?,
1009 Price::from_decimal_dp(close, instrument.price_precision()).map_err(|e| {
1010 DydxWsError::Parse(format!("Failed to create close Price from decimal: {e}"))
1011 })?,
1012 Quantity::from_decimal_dp(volume, instrument.size_precision()).map_err(|e| {
1013 DydxWsError::Parse(format!(
1014 "Failed to create volume Quantity from decimal: {e}"
1015 ))
1016 })?,
1017 ts_event,
1018 ts_init,
1019 );
1020
1021 Ok(Some(NautilusWsMessage::Data(vec![Data::Bar(bar)])))
1022 }
1023
1024 fn parse_markets(
1025 &self,
1026 data: &DydxWsChannelDataMsg,
1027 ) -> DydxWsResult<Option<NautilusWsMessage>> {
1028 let contents: DydxMarketsContents = serde_json::from_value(data.contents.clone())
1029 .map_err(|e| DydxWsError::Parse(format!("Failed to parse markets contents: {e}")))?;
1030
1031 if let Some(oracle_prices) = contents.oracle_prices {
1034 log::debug!(
1035 "Forwarding oracle price updates for {} markets to execution client",
1036 oracle_prices.len()
1037 );
1038 return Ok(Some(NautilusWsMessage::OraclePrices(oracle_prices)));
1039 }
1040
1041 Ok(None)
1042 }
1043
1044 fn parse_subaccounts(
1045 &self,
1046 data: &DydxWsChannelDataMsg,
1047 ) -> DydxWsResult<Option<NautilusWsMessage>> {
1048 let contents: DydxWsSubaccountsChannelContents =
1049 serde_json::from_value(data.contents.clone()).map_err(|e| {
1050 DydxWsError::Parse(format!("Failed to parse subaccounts contents: {e}"))
1051 })?;
1052
1053 let has_orders = contents.orders.as_ref().is_some_and(|o| !o.is_empty());
1054 let has_fills = contents.fills.as_ref().is_some_and(|f| !f.is_empty());
1055
1056 if has_orders || has_fills {
1057 log::debug!(
1060 "Received {} order(s), {} fill(s) - forwarding to execution client",
1061 contents.orders.as_ref().map_or(0, |o| o.len()),
1062 contents.fills.as_ref().map_or(0, |f| f.len())
1063 );
1064
1065 let channel_data = DydxWsSubaccountsChannelData {
1066 msg_type: data.msg_type,
1067 connection_id: data.connection_id.clone(),
1068 message_id: data.message_id,
1069 id: data.id.clone().unwrap_or_default(),
1070 channel: data.channel,
1071 version: data.version.clone().unwrap_or_default(),
1072 contents,
1073 };
1074
1075 return Ok(Some(NautilusWsMessage::SubaccountsChannelData(Box::new(
1076 channel_data,
1077 ))));
1078 }
1079
1080 Ok(None)
1081 }
1082
1083 fn parse_subaccounts_subscribed(
1084 &self,
1085 msg: &DydxWsSubaccountsSubscribed,
1086 ) -> DydxWsResult<Option<NautilusWsMessage>> {
1087 log::debug!("Forwarding subaccount subscription to execution client");
1090 Ok(Some(NautilusWsMessage::SubaccountSubscribed(Box::new(
1091 msg.clone(),
1092 ))))
1093 }
1094
1095 fn parse_instrument_id(&self, symbol: &str) -> DydxWsResult<InstrumentId> {
1096 let symbol_with_perp = format!("{symbol}-PERP");
1099 Ok(parse_instrument_id(&symbol_with_perp))
1100 }
1101
1102 fn get_instrument(&self, instrument_id: &InstrumentId) -> DydxWsResult<&InstrumentAny> {
1103 self.instruments
1104 .get(&instrument_id.symbol.inner())
1105 .ok_or_else(|| DydxWsError::Parse(format!("No instrument cached for {instrument_id}")))
1106 }
1107}
1108
1109fn should_retry_dydx_error(error: &DydxWsError) -> bool {
1111 match error {
1112 DydxWsError::Transport(_) => true,
1113 DydxWsError::Send(_) => true,
1114 DydxWsError::ClientError(msg) => {
1115 let msg_lower = msg.to_lowercase();
1116 msg_lower.contains("timeout")
1117 || msg_lower.contains("timed out")
1118 || msg_lower.contains("connection")
1119 || msg_lower.contains("network")
1120 }
1121 DydxWsError::NotConnected
1122 | DydxWsError::Json(_)
1123 | DydxWsError::Parse(_)
1124 | DydxWsError::Authentication(_)
1125 | DydxWsError::Subscription(_)
1126 | DydxWsError::Venue(_) => false,
1127 }
1128}
1129
1130fn create_dydx_timeout_error(msg: String) -> DydxWsError {
1132 DydxWsError::ClientError(msg)
1133}