1use std::{
25 fmt::Debug,
26 str::FromStr,
27 sync::{
28 Arc,
29 atomic::{AtomicBool, Ordering},
30 },
31};
32
33use ahash::AHashMap;
34use nautilus_core::{nanos::UnixNanos, time::get_atomic_clock_realtime};
35use nautilus_model::{
36 data::{
37 Bar, BarType, BookOrder, Data, OrderBookDelta, OrderBookDeltas, TradeTick,
38 bar::get_bar_interval_ns,
39 },
40 enums::{AggressorSide, BookAction, OrderSide, RecordFlag},
41 identifiers::{AccountId, InstrumentId, TradeId},
42 instruments::{Instrument, InstrumentAny},
43 types::{Price, Quantity},
44};
45use nautilus_network::{
46 RECONNECTED,
47 retry::{RetryManager, create_websocket_retry_manager},
48 websocket::{SubscriptionState, WebSocketClient},
49};
50use rust_decimal::Decimal;
51use tokio_tungstenite::tungstenite::Message;
52use ustr::Ustr;
53
54use super::{
55 DydxWsError, DydxWsResult,
56 client::DYDX_RATE_LIMIT_KEY_SUBSCRIPTION,
57 enums::{DydxWsChannel, DydxWsMessage, NautilusWsMessage},
58 error::DydxWebSocketError,
59 messages::{
60 DydxBlockHeightChannelContents, DydxCandle, DydxMarketsContents, DydxOrderbookContents,
61 DydxOrderbookSnapshotContents, DydxTradeContents, DydxWsBlockHeightMessage,
62 DydxWsCandlesMessage, DydxWsChannelBatchDataMsg, DydxWsChannelDataMsg, DydxWsConnectedMsg,
63 DydxWsFeedMessage, DydxWsGenericMsg, DydxWsMarketsMessage, DydxWsOrderbookMessage,
64 DydxWsSubaccountsChannelContents, DydxWsSubaccountsChannelData, DydxWsSubaccountsMessage,
65 DydxWsSubaccountsSubscribed, DydxWsSubscriptionMsg, DydxWsTradesMessage,
66 },
67};
68use crate::common::parse::parse_instrument_id;
69
70#[derive(Debug, Clone)]
72pub enum HandlerCommand {
73 UpdateInstrument(Box<InstrumentAny>),
75 InitializeInstruments(Vec<InstrumentAny>),
77 RegisterBarType { topic: String, bar_type: BarType },
79 UnregisterBarType { topic: String },
81 SendText(String),
83}
84
85pub struct FeedHandler {
90 account_id: Option<AccountId>,
92 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
94 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
96 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
98 client: WebSocketClient,
100 signal: Arc<AtomicBool>,
102 retry_manager: RetryManager<DydxWsError>,
104 instruments: AHashMap<Ustr, InstrumentAny>,
106 bar_types: AHashMap<String, BarType>,
108 subscriptions: SubscriptionState,
110}
111
112impl Debug for FeedHandler {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 f.debug_struct("FeedHandler")
115 .field("account_id", &self.account_id)
116 .field("instruments_count", &self.instruments.len())
117 .field("bar_types_count", &self.bar_types.len())
118 .finish_non_exhaustive()
119 }
120}
121
122impl FeedHandler {
123 #[must_use]
125 pub fn new(
126 account_id: Option<AccountId>,
127 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
128 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
129 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
130 client: WebSocketClient,
131 signal: Arc<AtomicBool>,
132 subscriptions: SubscriptionState,
133 ) -> Self {
134 Self {
135 account_id,
136 cmd_rx,
137 out_tx,
138 raw_rx,
139 client,
140 signal,
141 retry_manager: create_websocket_retry_manager(),
142 instruments: AHashMap::new(),
143 bar_types: AHashMap::new(),
144 subscriptions,
145 }
146 }
147
148 async fn send_with_retry(
152 &self,
153 payload: String,
154 rate_limit_keys: Option<Vec<String>>,
155 ) -> Result<(), DydxWsError> {
156 self.retry_manager
157 .execute_with_retry(
158 "websocket_send",
159 || {
160 let payload = payload.clone();
161 let keys = rate_limit_keys.clone();
162 async move {
163 self.client
164 .send_text(payload, keys)
165 .await
166 .map_err(|e| DydxWsError::ClientError(format!("Send failed: {e}")))
167 }
168 },
169 should_retry_dydx_error,
170 create_dydx_timeout_error,
171 )
172 .await
173 }
174
175 pub async fn run(&mut self) {
177 loop {
178 tokio::select! {
179 Some(cmd) = self.cmd_rx.recv() => {
180 self.handle_command(cmd).await;
181 }
182
183 Some(msg) = self.raw_rx.recv() => {
184 if let Some(nautilus_msg) = self.process_raw_message(msg).await
185 && self.out_tx.send(nautilus_msg).is_err()
186 {
187 tracing::debug!("Receiver dropped, stopping handler");
188 break;
189 }
190 }
191
192 else => {
193 tracing::debug!("Handler shutting down: channels closed");
194 break;
195 }
196 }
197
198 if self.signal.load(Ordering::Relaxed) {
199 tracing::debug!("Handler received stop signal");
200 break;
201 }
202 }
203 }
204
205 async fn process_raw_message(&self, msg: Message) -> Option<NautilusWsMessage> {
207 match msg {
208 Message::Text(txt) => {
209 if txt == RECONNECTED {
210 if let Err(e) = self.replay_subscriptions().await {
211 tracing::error!("Failed to replay subscriptions after reconnect: {e}");
212 }
213 return Some(NautilusWsMessage::Reconnected);
214 }
215
216 match serde_json::from_str::<serde_json::Value>(&txt) {
217 Ok(val) => {
218 let val_clone = val.clone();
219
220 if let Ok(feed_msg) =
222 serde_json::from_value::<DydxWsFeedMessage>(val.clone())
223 {
224 return self.handle_feed_message(feed_msg).await;
225 }
226
227 match serde_json::from_value::<DydxWsGenericMsg>(val.clone()) {
230 Ok(meta) => {
231 let result = if meta.is_connected() {
232 serde_json::from_value::<DydxWsConnectedMsg>(val)
233 .map(DydxWsMessage::Connected)
234 } else if meta.is_subscribed() {
235 if let Ok(sub_msg) =
236 serde_json::from_value::<DydxWsSubscriptionMsg>(val.clone())
237 {
238 if sub_msg.channel == DydxWsChannel::Subaccounts {
239 serde_json::from_value::<DydxWsSubaccountsSubscribed>(
240 val.clone(),
241 )
242 .map(DydxWsMessage::SubaccountsSubscribed)
243 .or_else(|e| {
244 tracing::debug!(
245 "Failed to parse subaccounts subscription (fallback to standard): {e}. Raw: {}",
246 serde_json::to_string(&val_clone)
247 .unwrap_or_else(|_| "<invalid json>".into())
248 );
249 Ok(DydxWsMessage::Subscribed(sub_msg))
250 })
251 } else {
252 Ok(DydxWsMessage::Subscribed(sub_msg))
253 }
254 } else {
255 serde_json::from_value::<DydxWsSubscriptionMsg>(val)
256 .map(DydxWsMessage::Subscribed)
257 }
258 } else if meta.is_unsubscribed() {
259 serde_json::from_value::<DydxWsSubscriptionMsg>(val)
260 .map(DydxWsMessage::Unsubscribed)
261 } else if meta.is_error() {
262 serde_json::from_value::<DydxWebSocketError>(val)
263 .map(DydxWsMessage::Error)
264 } else if meta.is_unknown() {
265 tracing::warn!(
266 "Received unknown WebSocket message type: {}",
267 serde_json::to_string(&val_clone)
268 .unwrap_or_else(|_| "<invalid json>".into())
269 );
270 Ok(DydxWsMessage::Raw(val))
271 } else {
272 Ok(DydxWsMessage::Raw(val))
273 };
274
275 match result {
276 Ok(dydx_msg) => self.handle_dydx_message(dydx_msg).await,
277 Err(e) => {
278 tracing::error!(
279 "Failed to parse WebSocket message: {e}. Message type: {:?}, Channel: {:?}. Raw: {}",
280 meta.msg_type,
281 meta.channel,
282 serde_json::to_string(&val_clone)
283 .unwrap_or_else(|_| "<invalid json>".into())
284 );
285 None
286 }
287 }
288 }
289 Err(e) => {
290 let raw_json = serde_json::to_string_pretty(&val_clone)
291 .unwrap_or_else(|_| format!("{val_clone:?}"));
292 tracing::error!(
293 "Failed to parse WebSocket message envelope (DydxWsGenericMsg): {e}\nRaw JSON:\n{}",
294 raw_json
295 );
296 None
297 }
298 }
299 }
300 Err(e) => {
301 let err = DydxWebSocketError::from_message(e.to_string());
302 Some(NautilusWsMessage::Error(err))
303 }
304 }
305 }
306 Message::Pong(_data) => None,
307 Message::Ping(_data) => None, Message::Binary(_bin) => None, Message::Close(_frame) => {
310 tracing::info!("WebSocket close frame received");
311 None
312 }
313 Message::Frame(_) => None,
314 }
315 }
316
317 async fn handle_dydx_message(&self, msg: DydxWsMessage) -> Option<NautilusWsMessage> {
319 match self.handle_message(msg).await {
320 Ok(opt_msg) => opt_msg,
321 Err(e) => {
322 tracing::error!("Error handling message: {e}");
323 None
324 }
325 }
326 }
327
328 async fn handle_feed_message(&self, feed_msg: DydxWsFeedMessage) -> Option<NautilusWsMessage> {
330 match feed_msg {
331 DydxWsFeedMessage::Subaccounts(msg) => match msg {
332 DydxWsSubaccountsMessage::Subscribed(data) => {
333 self.handle_dydx_message(DydxWsMessage::SubaccountsSubscribed(data))
334 .await
335 }
336 DydxWsSubaccountsMessage::ChannelData(data) => {
337 self.handle_dydx_message(DydxWsMessage::ChannelData(DydxWsChannelDataMsg {
338 msg_type: data.msg_type,
339 connection_id: data.connection_id,
340 message_id: data.message_id,
341 channel: data.channel,
342 id: Some(data.id),
343 contents: serde_json::to_value(&data.contents)
344 .unwrap_or(serde_json::Value::Null),
345 version: Some(data.version),
346 }))
347 .await
348 }
349 },
350 DydxWsFeedMessage::Orderbook(msg) => match msg {
351 DydxWsOrderbookMessage::Subscribed(data)
352 | DydxWsOrderbookMessage::ChannelData(data) => {
353 self.handle_dydx_message(DydxWsMessage::ChannelData(data))
354 .await
355 }
356 DydxWsOrderbookMessage::ChannelBatchData(data) => {
357 self.handle_dydx_message(DydxWsMessage::ChannelBatchData(data))
358 .await
359 }
360 },
361 DydxWsFeedMessage::Trades(msg) => match msg {
362 DydxWsTradesMessage::Subscribed(data) | DydxWsTradesMessage::ChannelData(data) => {
363 self.handle_dydx_message(DydxWsMessage::ChannelData(data))
364 .await
365 }
366 },
367 DydxWsFeedMessage::Markets(msg) => match msg {
368 DydxWsMarketsMessage::Subscribed(data)
369 | DydxWsMarketsMessage::ChannelData(data) => {
370 self.handle_dydx_message(DydxWsMessage::ChannelData(data))
371 .await
372 }
373 },
374 DydxWsFeedMessage::Candles(msg) => match msg {
375 DydxWsCandlesMessage::Subscribed(data)
376 | DydxWsCandlesMessage::ChannelData(data) => {
377 self.handle_dydx_message(DydxWsMessage::ChannelData(data))
378 .await
379 }
380 },
381 DydxWsFeedMessage::ParentSubaccounts(msg) => match msg {
382 super::messages::DydxWsParentSubaccountsMessage::Subscribed(data)
383 | super::messages::DydxWsParentSubaccountsMessage::ChannelData(data) => {
384 self.handle_dydx_message(DydxWsMessage::ChannelData(data))
385 .await
386 }
387 },
388 DydxWsFeedMessage::BlockHeight(msg) => match msg {
389 DydxWsBlockHeightMessage::Subscribed(data) => {
390 match data.contents.height.parse::<u64>() {
392 Ok(height) => Some(NautilusWsMessage::BlockHeight(height)),
393 Err(e) => {
394 tracing::warn!("Failed to parse block height from subscription: {e}");
395 None
396 }
397 }
398 }
399 DydxWsBlockHeightMessage::ChannelData(data) => {
400 match data.contents.block_height.parse::<u64>() {
402 Ok(height) => Some(NautilusWsMessage::BlockHeight(height)),
403 Err(e) => {
404 tracing::warn!("Failed to parse block height from channel data: {e}");
405 None
406 }
407 }
408 }
409 },
410 }
411 }
412
413 async fn handle_command(&mut self, command: HandlerCommand) {
415 match command {
416 HandlerCommand::UpdateInstrument(instrument) => {
417 let symbol = instrument.id().symbol.inner();
418 self.instruments.insert(symbol, *instrument);
419 }
420 HandlerCommand::InitializeInstruments(instruments) => {
421 for instrument in instruments {
422 let symbol = instrument.id().symbol.inner();
423 self.instruments.insert(symbol, instrument);
424 }
425 }
426 HandlerCommand::RegisterBarType { topic, bar_type } => {
427 self.bar_types.insert(topic, bar_type);
428 }
429 HandlerCommand::UnregisterBarType { topic } => {
430 self.bar_types.remove(&topic);
431 }
432 HandlerCommand::SendText(text) => {
433 if let Err(e) = self
434 .send_with_retry(
435 text,
436 Some(vec![DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
437 )
438 .await
439 {
440 tracing::error!("Failed to send WebSocket text after retries: {e}");
441 }
442 }
443 }
444 }
445
446 pub fn register_bar_type(&mut self, topic: String, bar_type: BarType) {
448 self.bar_types.insert(topic, bar_type);
449 }
450
451 pub fn unregister_bar_type(&mut self, topic: &str) {
453 self.bar_types.remove(topic);
454 }
455
456 fn topic_from_msg(&self, channel: &super::enums::DydxWsChannel, id: &Option<String>) -> String {
457 match id {
458 Some(id) => format!(
459 "{}{}{}",
460 channel.as_ref(),
461 self.subscriptions.delimiter(),
462 id
463 ),
464 None => channel.as_ref().to_string(),
465 }
466 }
467
468 fn subscription_from_topic(
469 &self,
470 topic: &str,
471 op: super::enums::DydxWsOperation,
472 ) -> Option<super::messages::DydxSubscription> {
473 let (channel, symbol) = nautilus_network::websocket::subscription::split_topic(
474 topic,
475 self.subscriptions.delimiter(),
476 );
477 let channel = super::enums::DydxWsChannel::from_str(channel).ok()?;
478 let id = symbol.map(std::string::ToString::to_string);
479
480 Some(super::messages::DydxSubscription { op, channel, id })
481 }
482
483 async fn replay_subscriptions(&self) -> DydxWsResult<()> {
484 let topics = self.subscriptions.all_topics();
485 for topic in topics {
486 let Some(subscription) =
487 self.subscription_from_topic(&topic, super::enums::DydxWsOperation::Subscribe)
488 else {
489 tracing::warn!("Failed to reconstruct subscription from topic: {topic}");
490 continue;
491 };
492
493 let payload = serde_json::to_string(&subscription)?;
494 self.subscriptions.mark_subscribe(&topic);
495
496 if let Err(e) = self
497 .send_with_retry(
498 payload,
499 Some(vec![DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
500 )
501 .await
502 {
503 self.subscriptions.mark_failure(&topic);
504 return Err(e);
505 }
506 }
507
508 Ok(())
509 }
510
511 #[allow(clippy::result_large_err)]
517 pub async fn handle_message(
518 &self,
519 msg: DydxWsMessage,
520 ) -> DydxWsResult<Option<NautilusWsMessage>> {
521 match msg {
522 DydxWsMessage::Connected(_) => {
523 tracing::info!("dYdX WebSocket connected");
524 Ok(None)
525 }
526 DydxWsMessage::Subscribed(sub) => {
527 tracing::debug!("Subscribed to {} (id: {:?})", sub.channel, sub.id);
528 let topic = self.topic_from_msg(&sub.channel, &sub.id);
529 self.subscriptions.confirm_subscribe(&topic);
530 Ok(None)
531 }
532 DydxWsMessage::SubaccountsSubscribed(msg) => {
533 tracing::debug!("Subaccounts subscribed with initial state");
534 let topic = self.topic_from_msg(&msg.channel, &Some(msg.id.clone()));
535 self.subscriptions.confirm_subscribe(&topic);
536 self.parse_subaccounts_subscribed(&msg)
537 }
538 DydxWsMessage::Unsubscribed(unsub) => {
539 tracing::debug!("Unsubscribed from {} (id: {:?})", unsub.channel, unsub.id);
540 let topic = self.topic_from_msg(&unsub.channel, &unsub.id);
541 self.subscriptions.confirm_unsubscribe(&topic);
542 Ok(None)
543 }
544 DydxWsMessage::ChannelData(data) => self.handle_channel_data(data),
545 DydxWsMessage::ChannelBatchData(data) => self.handle_channel_batch_data(data),
546 DydxWsMessage::BlockHeight(height) => Ok(Some(NautilusWsMessage::BlockHeight(height))),
547 DydxWsMessage::Error(err) => Ok(Some(NautilusWsMessage::Error(err))),
548 DydxWsMessage::Reconnected => {
549 if let Err(e) = self.replay_subscriptions().await {
550 tracing::error!("Failed to replay subscriptions after reconnect message: {e}");
551 }
552 Ok(Some(NautilusWsMessage::Reconnected))
553 }
554 DydxWsMessage::Pong => Ok(None),
555 DydxWsMessage::Raw(_) => Ok(None),
556 }
557 }
558
559 fn handle_channel_data(
560 &self,
561 data: DydxWsChannelDataMsg,
562 ) -> DydxWsResult<Option<NautilusWsMessage>> {
563 match data.channel {
564 DydxWsChannel::Trades => self.parse_trades(&data),
565 DydxWsChannel::Orderbook => self.parse_orderbook(&data, false),
566 DydxWsChannel::Candles => self.parse_candles(&data),
567 DydxWsChannel::Markets => self.parse_markets(&data),
568 DydxWsChannel::Subaccounts | DydxWsChannel::ParentSubaccounts => {
569 self.parse_subaccounts(&data)
570 }
571 DydxWsChannel::BlockHeight => self.parse_block_height(&data),
572 DydxWsChannel::Unknown => {
573 tracing::warn!(
574 "Unknown channel data received: id={:?}, msg_type={:?}",
575 data.id,
576 data.msg_type
577 );
578 Ok(None)
579 }
580 }
581 }
582
583 fn handle_channel_batch_data(
584 &self,
585 data: DydxWsChannelBatchDataMsg,
586 ) -> DydxWsResult<Option<NautilusWsMessage>> {
587 match data.channel {
588 DydxWsChannel::Orderbook => self.parse_orderbook_batch(&data),
589 _ => {
590 tracing::warn!(
591 "Unexpected batch data for channel: {:?}, id={:?}",
592 data.channel,
593 data.id
594 );
595 Ok(None)
596 }
597 }
598 }
599
600 fn parse_block_height(
601 &self,
602 data: &DydxWsChannelDataMsg,
603 ) -> DydxWsResult<Option<NautilusWsMessage>> {
604 let contents: DydxBlockHeightChannelContents =
605 serde_json::from_value(data.contents.clone()).map_err(|e| {
606 DydxWsError::Parse(format!("Failed to parse block height contents: {e}"))
607 })?;
608
609 let height = contents
610 .block_height
611 .parse::<u64>()
612 .map_err(|e| DydxWsError::Parse(format!("Failed to parse block height: {e}")))?;
613
614 Ok(Some(NautilusWsMessage::BlockHeight(height)))
615 }
616
617 fn parse_trades(&self, data: &DydxWsChannelDataMsg) -> DydxWsResult<Option<NautilusWsMessage>> {
618 let symbol = data
619 .id
620 .as_ref()
621 .ok_or_else(|| DydxWsError::Parse("Missing id for trades channel".into()))?;
622
623 let instrument_id = self.parse_instrument_id(symbol)?;
624 let instrument = self.get_instrument(&instrument_id)?;
625
626 let contents: DydxTradeContents = serde_json::from_value(data.contents.clone())
627 .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade contents: {e}")))?;
628
629 let mut ticks = Vec::new();
630 let ts_init = get_atomic_clock_realtime().get_time_ns();
631
632 for trade in contents.trades {
633 let aggressor_side = match trade.side {
634 OrderSide::Buy => AggressorSide::Buyer,
635 OrderSide::Sell => AggressorSide::Seller,
636 _ => continue, };
638
639 let price = Decimal::from_str(&trade.price)
640 .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade price: {e}")))?;
641
642 let size = Decimal::from_str(&trade.size)
643 .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade size: {e}")))?;
644
645 let trade_ts = trade.created_at.timestamp_nanos_opt().ok_or_else(|| {
646 DydxWsError::Parse(format!("Timestamp out of range for trade {}", trade.id))
647 })?;
648
649 let tick = TradeTick::new(
650 instrument_id,
651 Price::from_decimal_dp(price, instrument.price_precision()).map_err(|e| {
652 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
653 })?,
654 Quantity::from_decimal_dp(size, instrument.size_precision()).map_err(|e| {
655 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
656 })?,
657 aggressor_side,
658 TradeId::new(&trade.id),
659 UnixNanos::from(trade_ts as u64),
660 ts_init,
661 );
662 ticks.push(Data::Trade(tick));
663 }
664
665 if ticks.is_empty() {
666 Ok(None)
667 } else {
668 Ok(Some(NautilusWsMessage::Data(ticks)))
669 }
670 }
671
672 fn parse_orderbook(
673 &self,
674 data: &DydxWsChannelDataMsg,
675 is_snapshot: bool,
676 ) -> DydxWsResult<Option<NautilusWsMessage>> {
677 let symbol = data
678 .id
679 .as_ref()
680 .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook channel".into()))?;
681
682 let instrument_id = self.parse_instrument_id(symbol)?;
683 let instrument = self.get_instrument(&instrument_id)?;
684
685 let ts_init = get_atomic_clock_realtime().get_time_ns();
686
687 if is_snapshot {
688 let contents: DydxOrderbookSnapshotContents =
689 serde_json::from_value(data.contents.clone()).map_err(|e| {
690 DydxWsError::Parse(format!("Failed to parse orderbook snapshot: {e}"))
691 })?;
692
693 let deltas = self.parse_orderbook_snapshot(
694 &instrument_id,
695 &contents,
696 instrument.price_precision(),
697 instrument.size_precision(),
698 ts_init,
699 )?;
700
701 Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
702 } else {
703 let contents: DydxOrderbookContents = serde_json::from_value(data.contents.clone())
704 .map_err(|e| {
705 DydxWsError::Parse(format!("Failed to parse orderbook contents: {e}"))
706 })?;
707
708 let deltas = self.parse_orderbook_deltas(
709 &instrument_id,
710 &contents,
711 instrument.price_precision(),
712 instrument.size_precision(),
713 ts_init,
714 )?;
715
716 Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
717 }
718 }
719
720 fn parse_orderbook_batch(
721 &self,
722 data: &DydxWsChannelBatchDataMsg,
723 ) -> DydxWsResult<Option<NautilusWsMessage>> {
724 let symbol = data
725 .id
726 .as_ref()
727 .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook batch channel".into()))?;
728
729 let instrument_id = self.parse_instrument_id(symbol)?;
730 let instrument = self.get_instrument(&instrument_id)?;
731
732 let contents: Vec<DydxOrderbookContents> = serde_json::from_value(data.contents.clone())
733 .map_err(|e| DydxWsError::Parse(format!("Failed to parse orderbook batch: {e}")))?;
734
735 let ts_init = get_atomic_clock_realtime().get_time_ns();
736 let mut all_deltas = Vec::new();
737
738 let num_messages = contents.len();
739 for (idx, content) in contents.iter().enumerate() {
740 let is_last_message = idx == num_messages - 1;
741 let deltas = self.parse_orderbook_deltas_with_flag(
742 &instrument_id,
743 content,
744 instrument.price_precision(),
745 instrument.size_precision(),
746 ts_init,
747 is_last_message,
748 )?;
749 all_deltas.extend(deltas);
750 }
751
752 let deltas = OrderBookDeltas::new(instrument_id, all_deltas);
753 Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
754 }
755
756 fn parse_orderbook_snapshot(
757 &self,
758 instrument_id: &InstrumentId,
759 contents: &DydxOrderbookSnapshotContents,
760 price_precision: u8,
761 size_precision: u8,
762 ts_init: UnixNanos,
763 ) -> DydxWsResult<OrderBookDeltas> {
764 let mut deltas = Vec::new();
765 deltas.push(OrderBookDelta::clear(*instrument_id, 0, ts_init, ts_init));
766
767 let bids = contents.bids.as_deref().unwrap_or(&[]);
768 let asks = contents.asks.as_deref().unwrap_or(&[]);
769
770 let bids_len = bids.len();
771 let asks_len = asks.len();
772
773 for (idx, bid) in bids.iter().enumerate() {
774 let is_last = idx == bids_len - 1 && asks_len == 0;
775 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
776
777 let price = Decimal::from_str(&bid.price)
778 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
779
780 let size = Decimal::from_str(&bid.size)
781 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
782
783 let order = BookOrder::new(
784 OrderSide::Buy,
785 Price::from_decimal_dp(price, price_precision).map_err(|e| {
786 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
787 })?,
788 Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
789 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
790 })?,
791 0,
792 );
793
794 deltas.push(OrderBookDelta::new(
795 *instrument_id,
796 BookAction::Add,
797 order,
798 flags,
799 0,
800 ts_init,
801 ts_init,
802 ));
803 }
804
805 for (idx, ask) in asks.iter().enumerate() {
806 let is_last = idx == asks_len - 1;
807 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
808
809 let price = Decimal::from_str(&ask.price)
810 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
811
812 let size = Decimal::from_str(&ask.size)
813 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
814
815 let order = BookOrder::new(
816 OrderSide::Sell,
817 Price::from_decimal_dp(price, price_precision).map_err(|e| {
818 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
819 })?,
820 Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
821 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
822 })?,
823 0,
824 );
825
826 deltas.push(OrderBookDelta::new(
827 *instrument_id,
828 BookAction::Add,
829 order,
830 flags,
831 0,
832 ts_init,
833 ts_init,
834 ));
835 }
836
837 Ok(OrderBookDeltas::new(*instrument_id, deltas))
838 }
839
840 fn parse_orderbook_deltas(
841 &self,
842 instrument_id: &InstrumentId,
843 contents: &DydxOrderbookContents,
844 price_precision: u8,
845 size_precision: u8,
846 ts_init: UnixNanos,
847 ) -> DydxWsResult<OrderBookDeltas> {
848 let deltas = self.parse_orderbook_deltas_with_flag(
849 instrument_id,
850 contents,
851 price_precision,
852 size_precision,
853 ts_init,
854 true, )?;
856 Ok(OrderBookDeltas::new(*instrument_id, deltas))
857 }
858
859 #[allow(clippy::too_many_arguments)]
860 fn parse_orderbook_deltas_with_flag(
861 &self,
862 instrument_id: &InstrumentId,
863 contents: &DydxOrderbookContents,
864 price_precision: u8,
865 size_precision: u8,
866 ts_init: UnixNanos,
867 is_last_message: bool,
868 ) -> DydxWsResult<Vec<OrderBookDelta>> {
869 let mut deltas = Vec::new();
870
871 let bids = contents.bids.as_deref().unwrap_or(&[]);
872 let asks = contents.asks.as_deref().unwrap_or(&[]);
873
874 let bids_len = bids.len();
875 let asks_len = asks.len();
876
877 for (idx, (price_str, size_str)) in bids.iter().enumerate() {
878 let is_last = is_last_message && idx == bids_len - 1 && asks_len == 0;
879 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
880
881 let price = Decimal::from_str(price_str)
882 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
883
884 let size = Decimal::from_str(size_str)
885 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
886
887 let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
888 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
889 })?;
890 let action = if qty.is_zero() {
891 BookAction::Delete
892 } else {
893 BookAction::Update
894 };
895
896 let order = BookOrder::new(
897 OrderSide::Buy,
898 Price::from_decimal_dp(price, price_precision).map_err(|e| {
899 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
900 })?,
901 qty,
902 0,
903 );
904
905 deltas.push(OrderBookDelta::new(
906 *instrument_id,
907 action,
908 order,
909 flags,
910 0,
911 ts_init,
912 ts_init,
913 ));
914 }
915
916 for (idx, (price_str, size_str)) in asks.iter().enumerate() {
917 let is_last = is_last_message && idx == asks_len - 1;
918 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
919
920 let price = Decimal::from_str(price_str)
921 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
922
923 let size = Decimal::from_str(size_str)
924 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
925
926 let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
927 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
928 })?;
929 let action = if qty.is_zero() {
930 BookAction::Delete
931 } else {
932 BookAction::Update
933 };
934
935 let order = BookOrder::new(
936 OrderSide::Sell,
937 Price::from_decimal_dp(price, price_precision).map_err(|e| {
938 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
939 })?,
940 qty,
941 0,
942 );
943
944 deltas.push(OrderBookDelta::new(
945 *instrument_id,
946 action,
947 order,
948 flags,
949 0,
950 ts_init,
951 ts_init,
952 ));
953 }
954
955 Ok(deltas)
956 }
957
958 fn parse_candles(
959 &self,
960 data: &DydxWsChannelDataMsg,
961 ) -> DydxWsResult<Option<NautilusWsMessage>> {
962 let topic = data
963 .id
964 .as_ref()
965 .ok_or_else(|| DydxWsError::Parse("Missing id for candles channel".into()))?;
966
967 let bar_type = self.bar_types.get(topic).ok_or_else(|| {
968 DydxWsError::Parse(format!("No bar type registered for topic: {topic}"))
969 })?;
970
971 let candle: DydxCandle = serde_json::from_value(data.contents.clone())
972 .map_err(|e| DydxWsError::Parse(format!("Failed to parse candle contents: {e}")))?;
973
974 let instrument_id = self.parse_instrument_id(&candle.ticker)?;
975 let instrument = self.get_instrument(&instrument_id)?;
976
977 let open = Decimal::from_str(&candle.open)
978 .map_err(|e| DydxWsError::Parse(format!("Failed to parse open: {e}")))?;
979 let high = Decimal::from_str(&candle.high)
980 .map_err(|e| DydxWsError::Parse(format!("Failed to parse high: {e}")))?;
981 let low = Decimal::from_str(&candle.low)
982 .map_err(|e| DydxWsError::Parse(format!("Failed to parse low: {e}")))?;
983 let close = Decimal::from_str(&candle.close)
984 .map_err(|e| DydxWsError::Parse(format!("Failed to parse close: {e}")))?;
985 let volume = Decimal::from_str(&candle.base_token_volume)
986 .map_err(|e| DydxWsError::Parse(format!("Failed to parse volume: {e}")))?;
987
988 let ts_init = get_atomic_clock_realtime().get_time_ns();
989
990 let started_at_nanos = candle.started_at.timestamp_nanos_opt().ok_or_else(|| {
991 DydxWsError::Parse(format!(
992 "Timestamp out of range for candle at {}",
993 candle.started_at
994 ))
995 })?;
996 let interval_nanos = get_bar_interval_ns(bar_type);
997 let ts_event = UnixNanos::from(started_at_nanos as u64) + interval_nanos;
998
999 let bar = Bar::new(
1000 *bar_type,
1001 Price::from_decimal_dp(open, instrument.price_precision()).map_err(|e| {
1002 DydxWsError::Parse(format!("Failed to create open Price from decimal: {e}"))
1003 })?,
1004 Price::from_decimal_dp(high, instrument.price_precision()).map_err(|e| {
1005 DydxWsError::Parse(format!("Failed to create high Price from decimal: {e}"))
1006 })?,
1007 Price::from_decimal_dp(low, instrument.price_precision()).map_err(|e| {
1008 DydxWsError::Parse(format!("Failed to create low Price from decimal: {e}"))
1009 })?,
1010 Price::from_decimal_dp(close, instrument.price_precision()).map_err(|e| {
1011 DydxWsError::Parse(format!("Failed to create close Price from decimal: {e}"))
1012 })?,
1013 Quantity::from_decimal_dp(volume, instrument.size_precision()).map_err(|e| {
1014 DydxWsError::Parse(format!(
1015 "Failed to create volume Quantity from decimal: {e}"
1016 ))
1017 })?,
1018 ts_event,
1019 ts_init,
1020 );
1021
1022 Ok(Some(NautilusWsMessage::Data(vec![Data::Bar(bar)])))
1023 }
1024
1025 fn parse_markets(
1026 &self,
1027 data: &DydxWsChannelDataMsg,
1028 ) -> DydxWsResult<Option<NautilusWsMessage>> {
1029 let contents: DydxMarketsContents = serde_json::from_value(data.contents.clone())
1030 .map_err(|e| DydxWsError::Parse(format!("Failed to parse markets contents: {e}")))?;
1031
1032 if let Some(oracle_prices) = contents.oracle_prices {
1035 tracing::debug!(
1036 "Forwarding oracle price updates for {} markets to execution client",
1037 oracle_prices.len()
1038 );
1039 return Ok(Some(NautilusWsMessage::OraclePrices(oracle_prices)));
1040 }
1041
1042 Ok(None)
1043 }
1044
1045 fn parse_subaccounts(
1046 &self,
1047 data: &DydxWsChannelDataMsg,
1048 ) -> DydxWsResult<Option<NautilusWsMessage>> {
1049 let contents: DydxWsSubaccountsChannelContents =
1050 serde_json::from_value(data.contents.clone()).map_err(|e| {
1051 DydxWsError::Parse(format!("Failed to parse subaccounts contents: {e}"))
1052 })?;
1053
1054 let has_orders = contents.orders.as_ref().is_some_and(|o| !o.is_empty());
1055 let has_fills = contents.fills.as_ref().is_some_and(|f| !f.is_empty());
1056
1057 if has_orders || has_fills {
1058 tracing::debug!(
1061 "Received {} order(s), {} fill(s) - forwarding to execution client",
1062 contents.orders.as_ref().map_or(0, |o| o.len()),
1063 contents.fills.as_ref().map_or(0, |f| f.len())
1064 );
1065
1066 let channel_data = DydxWsSubaccountsChannelData {
1067 msg_type: data.msg_type,
1068 connection_id: data.connection_id.clone(),
1069 message_id: data.message_id,
1070 id: data.id.clone().unwrap_or_default(),
1071 channel: data.channel,
1072 version: data.version.clone().unwrap_or_default(),
1073 contents,
1074 };
1075
1076 return Ok(Some(NautilusWsMessage::SubaccountsChannelData(Box::new(
1077 channel_data,
1078 ))));
1079 }
1080
1081 Ok(None)
1082 }
1083
1084 fn parse_subaccounts_subscribed(
1085 &self,
1086 msg: &DydxWsSubaccountsSubscribed,
1087 ) -> DydxWsResult<Option<NautilusWsMessage>> {
1088 tracing::debug!("Forwarding subaccount subscription to execution client");
1091 Ok(Some(NautilusWsMessage::SubaccountSubscribed(Box::new(
1092 msg.clone(),
1093 ))))
1094 }
1095
1096 fn parse_instrument_id(&self, symbol: &str) -> DydxWsResult<InstrumentId> {
1097 let symbol_with_perp = format!("{symbol}-PERP");
1100 Ok(parse_instrument_id(&symbol_with_perp))
1101 }
1102
1103 fn get_instrument(&self, instrument_id: &InstrumentId) -> DydxWsResult<&InstrumentAny> {
1104 self.instruments
1105 .get(&instrument_id.symbol.inner())
1106 .ok_or_else(|| DydxWsError::Parse(format!("No instrument cached for {instrument_id}")))
1107 }
1108}
1109
1110fn should_retry_dydx_error(error: &DydxWsError) -> bool {
1112 match error {
1113 DydxWsError::Transport(_) => true,
1114 DydxWsError::Send(_) => true,
1115 DydxWsError::ClientError(msg) => {
1116 let msg_lower = msg.to_lowercase();
1117 msg_lower.contains("timeout")
1118 || msg_lower.contains("timed out")
1119 || msg_lower.contains("connection")
1120 || msg_lower.contains("network")
1121 }
1122 DydxWsError::NotConnected
1123 | DydxWsError::Json(_)
1124 | DydxWsError::Parse(_)
1125 | DydxWsError::Authentication(_)
1126 | DydxWsError::Subscription(_)
1127 | DydxWsError::Venue(_) => false,
1128 }
1129}
1130
1131fn create_dydx_timeout_error(msg: String) -> DydxWsError {
1133 DydxWsError::ClientError(msg)
1134}