1use std::{
25 fmt::{Debug, Formatter},
26 str::FromStr,
27 sync::{
28 Arc,
29 atomic::{AtomicBool, Ordering},
30 },
31};
32
33use ahash::AHashMap;
34use nautilus_core::{nanos::UnixNanos, time::get_atomic_clock_realtime};
35use nautilus_model::{
36 data::{
37 Bar, BarType, BookOrder, Data, OrderBookDelta, OrderBookDeltas, TradeTick,
38 bar::get_bar_interval_ns,
39 },
40 enums::{AggressorSide, BookAction, OrderSide, RecordFlag},
41 identifiers::{AccountId, InstrumentId, TradeId},
42 instruments::{Instrument, InstrumentAny},
43 types::{Price, Quantity},
44};
45use nautilus_network::{
46 RECONNECTED,
47 retry::{RetryManager, create_websocket_retry_manager},
48 websocket::{SubscriptionState, WebSocketClient},
49};
50use rust_decimal::Decimal;
51use tokio_tungstenite::tungstenite::Message;
52use ustr::Ustr;
53
54use super::{
55 DydxWsError, DydxWsResult,
56 client::DYDX_RATE_LIMIT_KEY_SUBSCRIPTION,
57 enums::{DydxWsChannel, DydxWsMessage, NautilusWsMessage},
58 error::DydxWebSocketError,
59 messages::{
60 DydxCandle, DydxMarketsContents, DydxOrderbookContents, DydxOrderbookSnapshotContents,
61 DydxTradeContents, DydxWsChannelBatchDataMsg, DydxWsChannelDataMsg, DydxWsConnectedMsg,
62 DydxWsGenericMsg, DydxWsSubaccountsChannelContents, DydxWsSubaccountsChannelData,
63 DydxWsSubaccountsSubscribed, DydxWsSubscriptionMsg,
64 },
65};
66use crate::common::parse::parse_instrument_id;
67
68#[derive(Debug, Clone)]
70pub enum HandlerCommand {
71 UpdateInstrument(Box<InstrumentAny>),
73 InitializeInstruments(Vec<InstrumentAny>),
75 RegisterBarType { topic: String, bar_type: BarType },
77 UnregisterBarType { topic: String },
79 SendText(String),
81}
82
83pub struct FeedHandler {
88 account_id: Option<AccountId>,
90 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
92 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
94 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
96 client: WebSocketClient,
98 signal: Arc<AtomicBool>,
100 retry_manager: RetryManager<DydxWsError>,
102 instruments: AHashMap<Ustr, InstrumentAny>,
104 bar_types: AHashMap<String, BarType>,
106 subscriptions: SubscriptionState,
108}
109
110impl Debug for FeedHandler {
111 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
112 f.debug_struct("FeedHandler")
113 .field("account_id", &self.account_id)
114 .field("instruments_count", &self.instruments.len())
115 .field("bar_types_count", &self.bar_types.len())
116 .finish_non_exhaustive()
117 }
118}
119
120impl FeedHandler {
121 #[must_use]
123 pub fn new(
124 account_id: Option<AccountId>,
125 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
126 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
127 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
128 client: WebSocketClient,
129 signal: Arc<AtomicBool>,
130 subscriptions: SubscriptionState,
131 ) -> Self {
132 Self {
133 account_id,
134 cmd_rx,
135 out_tx,
136 raw_rx,
137 client,
138 signal,
139 retry_manager: create_websocket_retry_manager(),
140 instruments: AHashMap::new(),
141 bar_types: AHashMap::new(),
142 subscriptions,
143 }
144 }
145
146 async fn send_with_retry(
150 &self,
151 payload: String,
152 rate_limit_keys: Option<Vec<String>>,
153 ) -> Result<(), DydxWsError> {
154 self.retry_manager
155 .execute_with_retry(
156 "websocket_send",
157 || {
158 let payload = payload.clone();
159 let keys = rate_limit_keys.clone();
160 async move {
161 self.client
162 .send_text(payload, keys)
163 .await
164 .map_err(|e| DydxWsError::ClientError(format!("Send failed: {e}")))
165 }
166 },
167 should_retry_dydx_error,
168 create_dydx_timeout_error,
169 )
170 .await
171 }
172
173 pub async fn run(&mut self) {
175 loop {
176 tokio::select! {
177 Some(cmd) = self.cmd_rx.recv() => {
179 self.handle_command(cmd).await;
180 }
181
182 Some(msg) = self.raw_rx.recv() => {
184 if let Some(nautilus_msg) = self.process_raw_message(msg).await
185 && self.out_tx.send(nautilus_msg).is_err()
186 {
187 tracing::debug!("Receiver dropped, stopping handler");
188 break;
189 }
190 }
191
192 else => {
193 tracing::debug!("Handler shutting down: channels closed");
194 break;
195 }
196 }
197
198 if self.signal.load(Ordering::Relaxed) {
200 tracing::debug!("Handler received stop signal");
201 break;
202 }
203 }
204 }
205
206 async fn process_raw_message(&self, msg: Message) -> Option<NautilusWsMessage> {
208 match msg {
209 Message::Text(txt) => {
210 if txt == RECONNECTED {
211 if let Err(e) = self.replay_subscriptions().await {
212 tracing::error!("Failed to replay subscriptions after reconnect: {e}");
213 }
214 return Some(NautilusWsMessage::Reconnected);
215 }
216
217 match serde_json::from_str::<serde_json::Value>(&txt) {
218 Ok(val) => {
219 match serde_json::from_value::<DydxWsGenericMsg>(val.clone()) {
221 Ok(meta) => {
222 let result = if meta.is_connected() {
223 serde_json::from_value::<DydxWsConnectedMsg>(val)
224 .map(DydxWsMessage::Connected)
225 } else if meta.is_subscribed() {
226 if let Ok(sub_msg) =
228 serde_json::from_value::<DydxWsSubscriptionMsg>(val.clone())
229 {
230 if sub_msg.channel == DydxWsChannel::Subaccounts {
231 serde_json::from_value::<DydxWsSubaccountsSubscribed>(
233 val,
234 )
235 .map(DydxWsMessage::SubaccountsSubscribed)
236 } else {
237 Ok(DydxWsMessage::Subscribed(sub_msg))
238 }
239 } else {
240 serde_json::from_value::<DydxWsSubscriptionMsg>(val)
241 .map(DydxWsMessage::Subscribed)
242 }
243 } else if meta.is_unsubscribed() {
244 serde_json::from_value::<DydxWsSubscriptionMsg>(val)
245 .map(DydxWsMessage::Unsubscribed)
246 } else if meta.is_channel_data() {
247 serde_json::from_value::<DydxWsChannelDataMsg>(val)
248 .map(DydxWsMessage::ChannelData)
249 } else if meta.is_channel_batch_data() {
250 serde_json::from_value::<DydxWsChannelBatchDataMsg>(val)
251 .map(DydxWsMessage::ChannelBatchData)
252 } else if meta.is_error() {
253 serde_json::from_value::<DydxWebSocketError>(val)
254 .map(DydxWsMessage::Error)
255 } else if meta.is_unknown() {
256 tracing::debug!("Received unknown WebSocket message type");
257 Ok(DydxWsMessage::Raw(val))
258 } else {
259 Ok(DydxWsMessage::Raw(val))
260 };
261
262 match result {
263 Ok(dydx_msg) => self.handle_dydx_message(dydx_msg).await,
264 Err(e) => {
265 tracing::warn!("Failed to parse WebSocket message: {e}");
266 None
267 }
268 }
269 }
270 Err(_) => {
271 None
273 }
274 }
275 }
276 Err(e) => {
277 let err = DydxWebSocketError::from_message(e.to_string());
278 Some(NautilusWsMessage::Error(err))
279 }
280 }
281 }
282 Message::Pong(_data) => None,
283 Message::Ping(_data) => None, Message::Binary(_bin) => None, Message::Close(_frame) => {
286 tracing::info!("WebSocket close frame received");
287 None
288 }
289 Message::Frame(_) => None,
290 }
291 }
292
293 async fn handle_dydx_message(&self, msg: DydxWsMessage) -> Option<NautilusWsMessage> {
295 match self.handle_message(msg).await {
296 Ok(opt_msg) => opt_msg,
297 Err(e) => {
298 tracing::error!("Error handling message: {e}");
299 None
300 }
301 }
302 }
303
304 async fn handle_command(&mut self, command: HandlerCommand) {
306 match command {
307 HandlerCommand::UpdateInstrument(instrument) => {
308 let symbol = instrument.id().symbol.inner();
309 self.instruments.insert(symbol, *instrument);
310 }
311 HandlerCommand::InitializeInstruments(instruments) => {
312 for instrument in instruments {
313 let symbol = instrument.id().symbol.inner();
314 self.instruments.insert(symbol, instrument);
315 }
316 }
317 HandlerCommand::RegisterBarType { topic, bar_type } => {
318 self.bar_types.insert(topic, bar_type);
319 }
320 HandlerCommand::UnregisterBarType { topic } => {
321 self.bar_types.remove(&topic);
322 }
323 HandlerCommand::SendText(text) => {
324 if let Err(e) = self
325 .send_with_retry(
326 text,
327 Some(vec![DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
328 )
329 .await
330 {
331 tracing::error!("Failed to send WebSocket text after retries: {e}");
332 }
333 }
334 }
335 }
336
337 pub fn register_bar_type(&mut self, topic: String, bar_type: BarType) {
339 self.bar_types.insert(topic, bar_type);
340 }
341
342 pub fn unregister_bar_type(&mut self, topic: &str) {
344 self.bar_types.remove(topic);
345 }
346
347 fn topic_from_msg(&self, channel: &super::enums::DydxWsChannel, id: &Option<String>) -> String {
348 match id {
349 Some(id) => format!(
350 "{}{}{}",
351 channel.as_ref(),
352 self.subscriptions.delimiter(),
353 id
354 ),
355 None => channel.as_ref().to_string(),
356 }
357 }
358
359 fn subscription_from_topic(
360 &self,
361 topic: &str,
362 op: super::enums::DydxWsOperation,
363 ) -> Option<super::messages::DydxSubscription> {
364 let (channel, symbol) = nautilus_network::websocket::subscription::split_topic(
365 topic,
366 self.subscriptions.delimiter(),
367 );
368 let channel = super::enums::DydxWsChannel::from_str(channel).ok()?;
369 let id = symbol.map(std::string::ToString::to_string);
370
371 Some(super::messages::DydxSubscription { op, channel, id })
372 }
373
374 async fn replay_subscriptions(&self) -> DydxWsResult<()> {
375 let topics = self.subscriptions.all_topics();
376 for topic in topics {
377 let Some(subscription) =
378 self.subscription_from_topic(&topic, super::enums::DydxWsOperation::Subscribe)
379 else {
380 tracing::warn!("Failed to reconstruct subscription from topic: {topic}");
381 continue;
382 };
383
384 let payload = serde_json::to_string(&subscription)?;
385 self.subscriptions.mark_subscribe(&topic);
386
387 if let Err(e) = self
388 .send_with_retry(
389 payload,
390 Some(vec![DYDX_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
391 )
392 .await
393 {
394 self.subscriptions.mark_failure(&topic);
395 return Err(e);
396 }
397 }
398
399 Ok(())
400 }
401
402 #[allow(clippy::result_large_err)]
408 pub async fn handle_message(
409 &self,
410 msg: DydxWsMessage,
411 ) -> DydxWsResult<Option<NautilusWsMessage>> {
412 match msg {
413 DydxWsMessage::Connected(_) => {
414 tracing::info!("dYdX WebSocket connected");
415 Ok(None)
416 }
417 DydxWsMessage::Subscribed(sub) => {
418 tracing::debug!("Subscribed to {} (id: {:?})", sub.channel, sub.id);
419 let topic = self.topic_from_msg(&sub.channel, &sub.id);
420 self.subscriptions.confirm_subscribe(&topic);
421 Ok(None)
422 }
423 DydxWsMessage::SubaccountsSubscribed(msg) => {
424 tracing::debug!("Subaccounts subscribed with initial state");
425 let topic = self.topic_from_msg(&msg.channel, &Some(msg.id.clone()));
426 self.subscriptions.confirm_subscribe(&topic);
427 self.parse_subaccounts_subscribed(&msg)
428 }
429 DydxWsMessage::Unsubscribed(unsub) => {
430 tracing::debug!("Unsubscribed from {} (id: {:?})", unsub.channel, unsub.id);
431 let topic = self.topic_from_msg(&unsub.channel, &unsub.id);
432 self.subscriptions.confirm_unsubscribe(&topic);
433 Ok(None)
434 }
435 DydxWsMessage::ChannelData(data) => self.handle_channel_data(data),
436 DydxWsMessage::ChannelBatchData(data) => self.handle_channel_batch_data(data),
437 DydxWsMessage::Error(err) => Ok(Some(NautilusWsMessage::Error(err))),
438 DydxWsMessage::Reconnected => {
439 if let Err(e) = self.replay_subscriptions().await {
440 tracing::error!("Failed to replay subscriptions after reconnect message: {e}");
441 }
442 Ok(Some(NautilusWsMessage::Reconnected))
443 }
444 DydxWsMessage::Pong => Ok(None),
445 DydxWsMessage::Raw(_) => Ok(None),
446 }
447 }
448
449 fn handle_channel_data(
450 &self,
451 data: DydxWsChannelDataMsg,
452 ) -> DydxWsResult<Option<NautilusWsMessage>> {
453 match data.channel {
454 DydxWsChannel::Trades => self.parse_trades(&data),
455 DydxWsChannel::Orderbook => self.parse_orderbook(&data, false),
456 DydxWsChannel::Candles => self.parse_candles(&data),
457 DydxWsChannel::Markets => self.parse_markets(&data),
458 DydxWsChannel::Subaccounts | DydxWsChannel::ParentSubaccounts => {
459 self.parse_subaccounts(&data)
460 }
461 DydxWsChannel::BlockHeight => {
462 tracing::debug!("Block height update received");
463 Ok(None)
464 }
465 DydxWsChannel::Unknown => {
466 tracing::debug!("Unknown channel data received");
467 Ok(None)
468 }
469 }
470 }
471
472 fn handle_channel_batch_data(
473 &self,
474 data: DydxWsChannelBatchDataMsg,
475 ) -> DydxWsResult<Option<NautilusWsMessage>> {
476 match data.channel {
477 DydxWsChannel::Orderbook => self.parse_orderbook_batch(&data),
478 _ => {
479 tracing::warn!("Unexpected batch data for channel: {:?}", data.channel);
480 Ok(None)
481 }
482 }
483 }
484
485 fn parse_trades(&self, data: &DydxWsChannelDataMsg) -> DydxWsResult<Option<NautilusWsMessage>> {
486 let symbol = data
487 .id
488 .as_ref()
489 .ok_or_else(|| DydxWsError::Parse("Missing id for trades channel".into()))?;
490
491 let instrument_id = self.parse_instrument_id(symbol)?;
492 let instrument = self.get_instrument(&instrument_id)?;
493
494 let contents: DydxTradeContents = serde_json::from_value(data.contents.clone())
495 .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade contents: {e}")))?;
496
497 let mut ticks = Vec::new();
498 let ts_init = get_atomic_clock_realtime().get_time_ns();
499
500 for trade in contents.trades {
501 let aggressor_side = match trade.side {
502 OrderSide::Buy => AggressorSide::Buyer,
503 OrderSide::Sell => AggressorSide::Seller,
504 _ => continue, };
506
507 let price = Decimal::from_str(&trade.price)
508 .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade price: {e}")))?;
509
510 let size = Decimal::from_str(&trade.size)
511 .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade size: {e}")))?;
512
513 let trade_ts = trade.created_at.timestamp_nanos_opt().ok_or_else(|| {
514 DydxWsError::Parse(format!("Timestamp out of range for trade {}", trade.id))
515 })?;
516
517 let tick = TradeTick::new(
518 instrument_id,
519 Price::from_decimal_dp(price, instrument.price_precision()).map_err(|e| {
520 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
521 })?,
522 Quantity::from_decimal_dp(size, instrument.size_precision()).map_err(|e| {
523 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
524 })?,
525 aggressor_side,
526 TradeId::new(&trade.id),
527 UnixNanos::from(trade_ts as u64),
528 ts_init,
529 );
530 ticks.push(Data::Trade(tick));
531 }
532
533 if ticks.is_empty() {
534 Ok(None)
535 } else {
536 Ok(Some(NautilusWsMessage::Data(ticks)))
537 }
538 }
539
540 fn parse_orderbook(
541 &self,
542 data: &DydxWsChannelDataMsg,
543 is_snapshot: bool,
544 ) -> DydxWsResult<Option<NautilusWsMessage>> {
545 let symbol = data
546 .id
547 .as_ref()
548 .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook channel".into()))?;
549
550 let instrument_id = self.parse_instrument_id(symbol)?;
551 let instrument = self.get_instrument(&instrument_id)?;
552
553 let ts_init = get_atomic_clock_realtime().get_time_ns();
554
555 if is_snapshot {
556 let contents: DydxOrderbookSnapshotContents =
557 serde_json::from_value(data.contents.clone()).map_err(|e| {
558 DydxWsError::Parse(format!("Failed to parse orderbook snapshot: {e}"))
559 })?;
560
561 let deltas = self.parse_orderbook_snapshot(
562 &instrument_id,
563 &contents,
564 instrument.price_precision(),
565 instrument.size_precision(),
566 ts_init,
567 )?;
568
569 Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
570 } else {
571 let contents: DydxOrderbookContents = serde_json::from_value(data.contents.clone())
572 .map_err(|e| {
573 DydxWsError::Parse(format!("Failed to parse orderbook contents: {e}"))
574 })?;
575
576 let deltas = self.parse_orderbook_deltas(
577 &instrument_id,
578 &contents,
579 instrument.price_precision(),
580 instrument.size_precision(),
581 ts_init,
582 )?;
583
584 Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
585 }
586 }
587
588 fn parse_orderbook_batch(
589 &self,
590 data: &DydxWsChannelBatchDataMsg,
591 ) -> DydxWsResult<Option<NautilusWsMessage>> {
592 let symbol = data
593 .id
594 .as_ref()
595 .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook batch channel".into()))?;
596
597 let instrument_id = self.parse_instrument_id(symbol)?;
598 let instrument = self.get_instrument(&instrument_id)?;
599
600 let contents: Vec<DydxOrderbookContents> = serde_json::from_value(data.contents.clone())
601 .map_err(|e| DydxWsError::Parse(format!("Failed to parse orderbook batch: {e}")))?;
602
603 let ts_init = get_atomic_clock_realtime().get_time_ns();
604 let mut all_deltas = Vec::new();
605
606 let num_messages = contents.len();
607 for (idx, content) in contents.iter().enumerate() {
608 let is_last_message = idx == num_messages - 1;
609 let deltas = self.parse_orderbook_deltas_with_flag(
610 &instrument_id,
611 content,
612 instrument.price_precision(),
613 instrument.size_precision(),
614 ts_init,
615 is_last_message,
616 )?;
617 all_deltas.extend(deltas);
618 }
619
620 let deltas = OrderBookDeltas::new(instrument_id, all_deltas);
621 Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
622 }
623
624 fn parse_orderbook_snapshot(
625 &self,
626 instrument_id: &InstrumentId,
627 contents: &DydxOrderbookSnapshotContents,
628 price_precision: u8,
629 size_precision: u8,
630 ts_init: UnixNanos,
631 ) -> DydxWsResult<OrderBookDeltas> {
632 let mut deltas = Vec::new();
633
634 deltas.push(OrderBookDelta::clear(*instrument_id, 0, ts_init, ts_init));
636
637 let bids = contents.bids.as_deref().unwrap_or(&[]);
638 let asks = contents.asks.as_deref().unwrap_or(&[]);
639
640 let bids_len = bids.len();
641 let asks_len = asks.len();
642
643 for (idx, bid) in bids.iter().enumerate() {
644 let is_last = idx == bids_len - 1 && asks_len == 0;
645 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
646
647 let price = Decimal::from_str(&bid.price)
648 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
649
650 let size = Decimal::from_str(&bid.size)
651 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
652
653 let order = BookOrder::new(
654 OrderSide::Buy,
655 Price::from_decimal_dp(price, price_precision).map_err(|e| {
656 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
657 })?,
658 Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
659 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
660 })?,
661 0,
662 );
663
664 deltas.push(OrderBookDelta::new(
665 *instrument_id,
666 BookAction::Add,
667 order,
668 flags,
669 0,
670 ts_init,
671 ts_init,
672 ));
673 }
674
675 for (idx, ask) in asks.iter().enumerate() {
676 let is_last = idx == asks_len - 1;
677 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
678
679 let price = Decimal::from_str(&ask.price)
680 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
681
682 let size = Decimal::from_str(&ask.size)
683 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
684
685 let order = BookOrder::new(
686 OrderSide::Sell,
687 Price::from_decimal_dp(price, price_precision).map_err(|e| {
688 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
689 })?,
690 Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
691 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
692 })?,
693 0,
694 );
695
696 deltas.push(OrderBookDelta::new(
697 *instrument_id,
698 BookAction::Add,
699 order,
700 flags,
701 0,
702 ts_init,
703 ts_init,
704 ));
705 }
706
707 Ok(OrderBookDeltas::new(*instrument_id, deltas))
708 }
709
710 fn parse_orderbook_deltas(
711 &self,
712 instrument_id: &InstrumentId,
713 contents: &DydxOrderbookContents,
714 price_precision: u8,
715 size_precision: u8,
716 ts_init: UnixNanos,
717 ) -> DydxWsResult<OrderBookDeltas> {
718 let deltas = self.parse_orderbook_deltas_with_flag(
719 instrument_id,
720 contents,
721 price_precision,
722 size_precision,
723 ts_init,
724 true, )?;
726 Ok(OrderBookDeltas::new(*instrument_id, deltas))
727 }
728
729 #[allow(clippy::too_many_arguments)]
730 fn parse_orderbook_deltas_with_flag(
731 &self,
732 instrument_id: &InstrumentId,
733 contents: &DydxOrderbookContents,
734 price_precision: u8,
735 size_precision: u8,
736 ts_init: UnixNanos,
737 is_last_message: bool,
738 ) -> DydxWsResult<Vec<OrderBookDelta>> {
739 let mut deltas = Vec::new();
740
741 let bids = contents.bids.as_deref().unwrap_or(&[]);
742 let asks = contents.asks.as_deref().unwrap_or(&[]);
743
744 let bids_len = bids.len();
745 let asks_len = asks.len();
746
747 for (idx, (price_str, size_str)) in bids.iter().enumerate() {
748 let is_last = is_last_message && idx == bids_len - 1 && asks_len == 0;
749 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
750
751 let price = Decimal::from_str(price_str)
752 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
753
754 let size = Decimal::from_str(size_str)
755 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
756
757 let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
758 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
759 })?;
760 let action = if qty.is_zero() {
761 BookAction::Delete
762 } else {
763 BookAction::Update
764 };
765
766 let order = BookOrder::new(
767 OrderSide::Buy,
768 Price::from_decimal_dp(price, price_precision).map_err(|e| {
769 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
770 })?,
771 qty,
772 0,
773 );
774
775 deltas.push(OrderBookDelta::new(
776 *instrument_id,
777 action,
778 order,
779 flags,
780 0,
781 ts_init,
782 ts_init,
783 ));
784 }
785
786 for (idx, (price_str, size_str)) in asks.iter().enumerate() {
787 let is_last = is_last_message && idx == asks_len - 1;
788 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
789
790 let price = Decimal::from_str(price_str)
791 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
792
793 let size = Decimal::from_str(size_str)
794 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
795
796 let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
797 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
798 })?;
799 let action = if qty.is_zero() {
800 BookAction::Delete
801 } else {
802 BookAction::Update
803 };
804
805 let order = BookOrder::new(
806 OrderSide::Sell,
807 Price::from_decimal_dp(price, price_precision).map_err(|e| {
808 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
809 })?,
810 qty,
811 0,
812 );
813
814 deltas.push(OrderBookDelta::new(
815 *instrument_id,
816 action,
817 order,
818 flags,
819 0,
820 ts_init,
821 ts_init,
822 ));
823 }
824
825 Ok(deltas)
826 }
827
828 fn parse_candles(
829 &self,
830 data: &DydxWsChannelDataMsg,
831 ) -> DydxWsResult<Option<NautilusWsMessage>> {
832 let topic = data
833 .id
834 .as_ref()
835 .ok_or_else(|| DydxWsError::Parse("Missing id for candles channel".into()))?;
836
837 let bar_type = self.bar_types.get(topic).ok_or_else(|| {
838 DydxWsError::Parse(format!("No bar type registered for topic: {topic}"))
839 })?;
840
841 let candle: DydxCandle = serde_json::from_value(data.contents.clone())
842 .map_err(|e| DydxWsError::Parse(format!("Failed to parse candle contents: {e}")))?;
843
844 let instrument_id = self.parse_instrument_id(&candle.ticker)?;
845 let instrument = self.get_instrument(&instrument_id)?;
846
847 let open = Decimal::from_str(&candle.open)
848 .map_err(|e| DydxWsError::Parse(format!("Failed to parse open: {e}")))?;
849 let high = Decimal::from_str(&candle.high)
850 .map_err(|e| DydxWsError::Parse(format!("Failed to parse high: {e}")))?;
851 let low = Decimal::from_str(&candle.low)
852 .map_err(|e| DydxWsError::Parse(format!("Failed to parse low: {e}")))?;
853 let close = Decimal::from_str(&candle.close)
854 .map_err(|e| DydxWsError::Parse(format!("Failed to parse close: {e}")))?;
855 let volume = Decimal::from_str(&candle.base_token_volume)
856 .map_err(|e| DydxWsError::Parse(format!("Failed to parse volume: {e}")))?;
857
858 let ts_init = get_atomic_clock_realtime().get_time_ns();
859
860 let started_at_nanos = candle.started_at.timestamp_nanos_opt().ok_or_else(|| {
862 DydxWsError::Parse(format!(
863 "Timestamp out of range for candle at {}",
864 candle.started_at
865 ))
866 })?;
867 let interval_nanos = get_bar_interval_ns(bar_type);
868 let ts_event = UnixNanos::from(started_at_nanos as u64) + interval_nanos;
869
870 let bar = Bar::new(
871 *bar_type,
872 Price::from_decimal_dp(open, instrument.price_precision()).map_err(|e| {
873 DydxWsError::Parse(format!("Failed to create open Price from decimal: {e}"))
874 })?,
875 Price::from_decimal_dp(high, instrument.price_precision()).map_err(|e| {
876 DydxWsError::Parse(format!("Failed to create high Price from decimal: {e}"))
877 })?,
878 Price::from_decimal_dp(low, instrument.price_precision()).map_err(|e| {
879 DydxWsError::Parse(format!("Failed to create low Price from decimal: {e}"))
880 })?,
881 Price::from_decimal_dp(close, instrument.price_precision()).map_err(|e| {
882 DydxWsError::Parse(format!("Failed to create close Price from decimal: {e}"))
883 })?,
884 Quantity::from_decimal_dp(volume, instrument.size_precision()).map_err(|e| {
885 DydxWsError::Parse(format!(
886 "Failed to create volume Quantity from decimal: {e}"
887 ))
888 })?,
889 ts_event,
890 ts_init,
891 );
892
893 Ok(Some(NautilusWsMessage::Data(vec![Data::Bar(bar)])))
894 }
895
896 fn parse_markets(
897 &self,
898 data: &DydxWsChannelDataMsg,
899 ) -> DydxWsResult<Option<NautilusWsMessage>> {
900 let contents: DydxMarketsContents = serde_json::from_value(data.contents.clone())
901 .map_err(|e| DydxWsError::Parse(format!("Failed to parse markets contents: {e}")))?;
902
903 if let Some(oracle_prices) = contents.oracle_prices {
906 tracing::debug!(
907 "Forwarding oracle price updates for {} markets to execution client",
908 oracle_prices.len()
909 );
910 return Ok(Some(NautilusWsMessage::OraclePrices(oracle_prices)));
911 }
912
913 Ok(None)
914 }
915
916 fn parse_subaccounts(
917 &self,
918 data: &DydxWsChannelDataMsg,
919 ) -> DydxWsResult<Option<NautilusWsMessage>> {
920 let contents: DydxWsSubaccountsChannelContents =
921 serde_json::from_value(data.contents.clone()).map_err(|e| {
922 DydxWsError::Parse(format!("Failed to parse subaccounts contents: {e}"))
923 })?;
924
925 let has_orders = contents.orders.as_ref().is_some_and(|o| !o.is_empty());
927 let has_fills = contents.fills.as_ref().is_some_and(|f| !f.is_empty());
928
929 if has_orders || has_fills {
930 tracing::debug!(
933 "Received {} order(s), {} fill(s) - forwarding to execution client",
934 contents.orders.as_ref().map_or(0, |o| o.len()),
935 contents.fills.as_ref().map_or(0, |f| f.len())
936 );
937
938 let channel_data = DydxWsSubaccountsChannelData {
939 msg_type: data.msg_type,
940 connection_id: data.connection_id.clone(),
941 message_id: data.message_id,
942 id: data.id.clone().unwrap_or_default(),
943 channel: data.channel,
944 version: data.version.clone().unwrap_or_default(),
945 contents,
946 };
947
948 return Ok(Some(NautilusWsMessage::SubaccountsChannelData(Box::new(
949 channel_data,
950 ))));
951 }
952
953 Ok(None)
954 }
955
956 fn parse_subaccounts_subscribed(
957 &self,
958 msg: &DydxWsSubaccountsSubscribed,
959 ) -> DydxWsResult<Option<NautilusWsMessage>> {
960 tracing::debug!("Forwarding subaccount subscription to execution client");
963 Ok(Some(NautilusWsMessage::SubaccountSubscribed(Box::new(
964 msg.clone(),
965 ))))
966 }
967
968 fn parse_instrument_id(&self, symbol: &str) -> DydxWsResult<InstrumentId> {
969 let symbol_with_perp = format!("{symbol}-PERP");
972 Ok(parse_instrument_id(&symbol_with_perp))
973 }
974
975 fn get_instrument(&self, instrument_id: &InstrumentId) -> DydxWsResult<&InstrumentAny> {
976 self.instruments
977 .get(&instrument_id.symbol.inner())
978 .ok_or_else(|| DydxWsError::Parse(format!("No instrument cached for {instrument_id}")))
979 }
980}
981
982fn should_retry_dydx_error(error: &DydxWsError) -> bool {
984 match error {
985 DydxWsError::Transport(_) => true,
986 DydxWsError::Send(_) => true,
987 DydxWsError::ClientError(msg) => {
988 let msg_lower = msg.to_lowercase();
989 msg_lower.contains("timeout")
990 || msg_lower.contains("timed out")
991 || msg_lower.contains("connection")
992 || msg_lower.contains("network")
993 }
994 DydxWsError::NotConnected
995 | DydxWsError::Json(_)
996 | DydxWsError::Parse(_)
997 | DydxWsError::Authentication(_)
998 | DydxWsError::Subscription(_)
999 | DydxWsError::Venue(_) => false,
1000 }
1001}
1002
1003fn create_dydx_timeout_error(msg: String) -> DydxWsError {
1005 DydxWsError::ClientError(msg)
1006}