1use std::{
25 fmt::{Debug, Formatter},
26 str::FromStr,
27 sync::{
28 Arc,
29 atomic::{AtomicBool, Ordering},
30 },
31};
32
33use ahash::AHashMap;
34use nautilus_core::nanos::UnixNanos;
35use nautilus_model::{
36 data::{
37 Bar, BarType, BookOrder, Data, OrderBookDelta, OrderBookDeltas, TradeTick,
38 bar::get_bar_interval_ns,
39 },
40 enums::{AggressorSide, BookAction, OrderSide, RecordFlag},
41 identifiers::{AccountId, InstrumentId, TradeId},
42 instruments::{Instrument, InstrumentAny},
43 types::{Price, Quantity},
44};
45use nautilus_network::{RECONNECTED, websocket::WebSocketClient};
46use rust_decimal::Decimal;
47use tokio_tungstenite::tungstenite::Message;
48use ustr::Ustr;
49
50use super::{
51 DydxWsError, DydxWsResult,
52 enums::DydxWsChannel,
53 messages::{
54 DydxWsChannelBatchDataMsg, DydxWsChannelDataMsg, DydxWsConnectedMsg, DydxWsGenericMsg,
55 DydxWsMessage, DydxWsSubscriptionMsg, NautilusWsMessage,
56 },
57 types::{
58 DydxCandle, DydxMarketsContents, DydxOrderbookContents, DydxOrderbookSnapshotContents,
59 DydxTradeContents,
60 },
61};
62
63#[derive(Debug, Clone)]
65pub enum HandlerCommand {
66 UpdateInstrument(Box<InstrumentAny>),
68 InitializeInstruments(Vec<InstrumentAny>),
70 RegisterBarType { topic: String, bar_type: BarType },
72 UnregisterBarType { topic: String },
74 SendText(String),
76}
77
78pub struct FeedHandler {
83 account_id: Option<AccountId>,
85 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
87 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
89 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
91 client: WebSocketClient,
93 signal: Arc<AtomicBool>,
95 instruments: AHashMap<Ustr, InstrumentAny>,
97 bar_types: AHashMap<String, BarType>,
99}
100
101impl Debug for FeedHandler {
102 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
103 f.debug_struct("FeedHandler")
104 .field("account_id", &self.account_id)
105 .field("instruments_count", &self.instruments.len())
106 .field("bar_types_count", &self.bar_types.len())
107 .finish_non_exhaustive()
108 }
109}
110
111impl FeedHandler {
112 #[must_use]
114 pub fn new(
115 account_id: Option<AccountId>,
116 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
117 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
118 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
119 client: WebSocketClient,
120 signal: Arc<AtomicBool>,
121 ) -> Self {
122 Self {
123 account_id,
124 cmd_rx,
125 out_tx,
126 raw_rx,
127 client,
128 signal,
129 instruments: AHashMap::new(),
130 bar_types: AHashMap::new(),
131 }
132 }
133
134 pub async fn run(&mut self) {
136 loop {
137 tokio::select! {
138 Some(cmd) = self.cmd_rx.recv() => {
140 self.handle_command(cmd).await;
141 }
142
143 Some(msg) = self.raw_rx.recv() => {
145 if let Some(nautilus_msg) = self.process_raw_message(msg).await
146 && self.out_tx.send(nautilus_msg).is_err()
147 {
148 tracing::debug!("Receiver dropped, stopping handler");
149 break;
150 }
151 }
152
153 else => {
154 tracing::debug!("Handler shutting down: channels closed");
155 break;
156 }
157 }
158
159 if self.signal.load(Ordering::Relaxed) {
161 tracing::debug!("Handler received stop signal");
162 break;
163 }
164 }
165 }
166
167 async fn process_raw_message(&self, msg: Message) -> Option<NautilusWsMessage> {
169 match msg {
170 Message::Text(txt) => {
171 if txt == RECONNECTED {
172 return Some(NautilusWsMessage::Reconnected);
173 }
174
175 match serde_json::from_str::<serde_json::Value>(&txt) {
176 Ok(val) => {
177 match serde_json::from_value::<DydxWsGenericMsg>(val.clone()) {
179 Ok(meta) => {
180 let result = if meta.is_connected() {
181 serde_json::from_value::<DydxWsConnectedMsg>(val)
182 .map(DydxWsMessage::Connected)
183 } else if meta.is_subscribed() {
184 if let Ok(sub_msg) =
186 serde_json::from_value::<DydxWsSubscriptionMsg>(val.clone())
187 {
188 if sub_msg.channel == DydxWsChannel::Subaccounts {
189 serde_json::from_value::<
191 crate::schemas::ws::DydxWsSubaccountsSubscribed,
192 >(val)
193 .map(DydxWsMessage::SubaccountsSubscribed)
194 } else {
195 Ok(DydxWsMessage::Subscribed(sub_msg))
196 }
197 } else {
198 serde_json::from_value::<DydxWsSubscriptionMsg>(val)
199 .map(DydxWsMessage::Subscribed)
200 }
201 } else if meta.is_unsubscribed() {
202 serde_json::from_value::<DydxWsSubscriptionMsg>(val)
203 .map(DydxWsMessage::Unsubscribed)
204 } else if meta.is_channel_data() {
205 serde_json::from_value::<DydxWsChannelDataMsg>(val)
206 .map(DydxWsMessage::ChannelData)
207 } else if meta.is_channel_batch_data() {
208 serde_json::from_value::<DydxWsChannelBatchDataMsg>(val)
209 .map(DydxWsMessage::ChannelBatchData)
210 } else if meta.is_error() {
211 serde_json::from_value::<
212 crate::websocket::error::DydxWebSocketError,
213 >(val)
214 .map(DydxWsMessage::Error)
215 } else {
216 Ok(DydxWsMessage::Raw(val))
217 };
218
219 match result {
220 Ok(dydx_msg) => self.handle_dydx_message(dydx_msg),
221 Err(e) => {
222 let err = crate::websocket::error::DydxWebSocketError::from_message(
223 e.to_string(),
224 );
225 Some(NautilusWsMessage::Error(err))
226 }
227 }
228 }
229 Err(_) => {
230 None
232 }
233 }
234 }
235 Err(e) => {
236 let err = crate::websocket::error::DydxWebSocketError::from_message(
237 e.to_string(),
238 );
239 Some(NautilusWsMessage::Error(err))
240 }
241 }
242 }
243 Message::Pong(_data) => None,
244 Message::Ping(_data) => None, Message::Binary(_bin) => None, Message::Close(_frame) => {
247 tracing::info!("WebSocket close frame received");
248 None
249 }
250 Message::Frame(_) => None,
251 }
252 }
253
254 fn handle_dydx_message(&self, msg: DydxWsMessage) -> Option<NautilusWsMessage> {
256 match self.handle_message(msg) {
257 Ok(opt_msg) => opt_msg,
258 Err(e) => {
259 tracing::error!("Error handling message: {e}");
260 None
261 }
262 }
263 }
264
265 async fn handle_command(&mut self, command: HandlerCommand) {
267 match command {
268 HandlerCommand::UpdateInstrument(instrument) => {
269 let symbol = instrument.id().symbol.inner();
270 self.instruments.insert(symbol, *instrument);
271 }
272 HandlerCommand::InitializeInstruments(instruments) => {
273 for instrument in instruments {
274 let symbol = instrument.id().symbol.inner();
275 self.instruments.insert(symbol, instrument);
276 }
277 }
278 HandlerCommand::RegisterBarType { topic, bar_type } => {
279 self.bar_types.insert(topic, bar_type);
280 }
281 HandlerCommand::UnregisterBarType { topic } => {
282 self.bar_types.remove(&topic);
283 }
284 HandlerCommand::SendText(text) => {
285 if let Err(e) = self.client.send_text(text, None).await {
286 tracing::error!("Failed to send WebSocket text: {e}");
287 }
288 }
289 }
290 }
291
292 pub fn register_bar_type(&mut self, topic: String, bar_type: BarType) {
294 self.bar_types.insert(topic, bar_type);
295 }
296
297 pub fn unregister_bar_type(&mut self, topic: &str) {
299 self.bar_types.remove(topic);
300 }
301
302 #[allow(clippy::result_large_err)]
308 pub fn handle_message(&self, msg: DydxWsMessage) -> DydxWsResult<Option<NautilusWsMessage>> {
309 match msg {
310 DydxWsMessage::Connected(_) => {
311 tracing::info!("dYdX WebSocket connected");
312 Ok(None)
313 }
314 DydxWsMessage::Subscribed(sub) => {
315 tracing::debug!("Subscribed to {} (id: {:?})", sub.channel, sub.id);
316 Ok(None)
317 }
318 DydxWsMessage::SubaccountsSubscribed(msg) => {
319 tracing::debug!("Subaccounts subscribed with initial state");
320 self.parse_subaccounts_subscribed(&msg)
321 }
322 DydxWsMessage::Unsubscribed(unsub) => {
323 tracing::debug!("Unsubscribed from {} (id: {:?})", unsub.channel, unsub.id);
324 Ok(None)
325 }
326 DydxWsMessage::ChannelData(data) => self.handle_channel_data(data),
327 DydxWsMessage::ChannelBatchData(data) => self.handle_channel_batch_data(data),
328 DydxWsMessage::Error(err) => Ok(Some(NautilusWsMessage::Error(err))),
329 DydxWsMessage::Reconnected => Ok(Some(NautilusWsMessage::Reconnected)),
330 DydxWsMessage::Pong => Ok(None),
331 DydxWsMessage::Raw(_) => Ok(None),
332 }
333 }
334
335 fn handle_channel_data(
336 &self,
337 data: DydxWsChannelDataMsg,
338 ) -> DydxWsResult<Option<NautilusWsMessage>> {
339 match data.channel {
340 DydxWsChannel::Trades => self.parse_trades(&data),
341 DydxWsChannel::Orderbook => self.parse_orderbook(&data, false),
342 DydxWsChannel::Candles => self.parse_candles(&data),
343 DydxWsChannel::Markets => self.parse_markets(&data),
344 DydxWsChannel::Subaccounts => self.parse_subaccounts(&data),
345 DydxWsChannel::BlockHeight => {
346 tracing::debug!("Block height update received");
347 Ok(None)
348 }
349 }
350 }
351
352 fn handle_channel_batch_data(
353 &self,
354 data: DydxWsChannelBatchDataMsg,
355 ) -> DydxWsResult<Option<NautilusWsMessage>> {
356 match data.channel {
357 DydxWsChannel::Orderbook => self.parse_orderbook_batch(&data),
358 _ => {
359 tracing::warn!("Unexpected batch data for channel: {:?}", data.channel);
360 Ok(None)
361 }
362 }
363 }
364
365 fn parse_trades(&self, data: &DydxWsChannelDataMsg) -> DydxWsResult<Option<NautilusWsMessage>> {
366 let symbol = data
367 .id
368 .as_ref()
369 .ok_or_else(|| DydxWsError::Parse("Missing id for trades channel".into()))?;
370
371 let instrument_id = self.parse_instrument_id(symbol)?;
372 let instrument = self.get_instrument(&instrument_id)?;
373
374 let contents: DydxTradeContents = serde_json::from_value(data.contents.clone())
375 .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade contents: {e}")))?;
376
377 let mut ticks = Vec::new();
378 let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
379
380 for trade in contents.trades {
381 let aggressor_side = match trade.side {
382 OrderSide::Buy => AggressorSide::Buyer,
383 OrderSide::Sell => AggressorSide::Seller,
384 _ => continue, };
386
387 let price = Decimal::from_str(&trade.price)
388 .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade price: {e}")))?;
389
390 let size = Decimal::from_str(&trade.size)
391 .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade size: {e}")))?;
392
393 let trade_ts = trade.created_at.timestamp_nanos_opt().ok_or_else(|| {
394 DydxWsError::Parse(format!("Timestamp out of range for trade {}", trade.id))
395 })?;
396
397 let tick = TradeTick::new(
398 instrument_id,
399 Price::from_decimal_dp(price, instrument.price_precision()).map_err(|e| {
400 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
401 })?,
402 Quantity::from_decimal_dp(size, instrument.size_precision()).map_err(|e| {
403 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
404 })?,
405 aggressor_side,
406 TradeId::new(&trade.id),
407 UnixNanos::from(trade_ts as u64),
408 ts_init,
409 );
410 ticks.push(Data::Trade(tick));
411 }
412
413 if ticks.is_empty() {
414 Ok(None)
415 } else {
416 Ok(Some(NautilusWsMessage::Data(ticks)))
417 }
418 }
419
420 fn parse_orderbook(
421 &self,
422 data: &DydxWsChannelDataMsg,
423 is_snapshot: bool,
424 ) -> DydxWsResult<Option<NautilusWsMessage>> {
425 let symbol = data
426 .id
427 .as_ref()
428 .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook channel".into()))?;
429
430 let instrument_id = self.parse_instrument_id(symbol)?;
431 let instrument = self.get_instrument(&instrument_id)?;
432
433 let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
434
435 if is_snapshot {
436 let contents: DydxOrderbookSnapshotContents =
437 serde_json::from_value(data.contents.clone()).map_err(|e| {
438 DydxWsError::Parse(format!("Failed to parse orderbook snapshot: {e}"))
439 })?;
440
441 let deltas = self.parse_orderbook_snapshot(
442 &instrument_id,
443 &contents,
444 instrument.price_precision(),
445 instrument.size_precision(),
446 ts_init,
447 )?;
448
449 Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
450 } else {
451 let contents: DydxOrderbookContents = serde_json::from_value(data.contents.clone())
452 .map_err(|e| {
453 DydxWsError::Parse(format!("Failed to parse orderbook contents: {e}"))
454 })?;
455
456 let deltas = self.parse_orderbook_deltas(
457 &instrument_id,
458 &contents,
459 instrument.price_precision(),
460 instrument.size_precision(),
461 ts_init,
462 )?;
463
464 Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
465 }
466 }
467
468 fn parse_orderbook_batch(
469 &self,
470 data: &DydxWsChannelBatchDataMsg,
471 ) -> DydxWsResult<Option<NautilusWsMessage>> {
472 let symbol = data
473 .id
474 .as_ref()
475 .ok_or_else(|| DydxWsError::Parse("Missing id for orderbook batch channel".into()))?;
476
477 let instrument_id = self.parse_instrument_id(symbol)?;
478 let instrument = self.get_instrument(&instrument_id)?;
479
480 let contents: Vec<DydxOrderbookContents> = serde_json::from_value(data.contents.clone())
481 .map_err(|e| DydxWsError::Parse(format!("Failed to parse orderbook batch: {e}")))?;
482
483 let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
484 let mut all_deltas = Vec::new();
485
486 let num_messages = contents.len();
487 for (idx, content) in contents.iter().enumerate() {
488 let is_last_message = idx == num_messages - 1;
489 let deltas = self.parse_orderbook_deltas_with_flag(
490 &instrument_id,
491 content,
492 instrument.price_precision(),
493 instrument.size_precision(),
494 ts_init,
495 is_last_message,
496 )?;
497 all_deltas.extend(deltas);
498 }
499
500 let deltas = OrderBookDeltas::new(instrument_id, all_deltas);
501 Ok(Some(NautilusWsMessage::Deltas(Box::new(deltas))))
502 }
503
504 fn parse_orderbook_snapshot(
505 &self,
506 instrument_id: &InstrumentId,
507 contents: &DydxOrderbookSnapshotContents,
508 price_precision: u8,
509 size_precision: u8,
510 ts_init: UnixNanos,
511 ) -> DydxWsResult<OrderBookDeltas> {
512 let mut deltas = Vec::new();
513
514 deltas.push(OrderBookDelta::clear(*instrument_id, 0, ts_init, ts_init));
516
517 let bids = contents.bids.as_deref().unwrap_or(&[]);
518 let asks = contents.asks.as_deref().unwrap_or(&[]);
519
520 let bids_len = bids.len();
521 let asks_len = asks.len();
522
523 for (idx, bid) in bids.iter().enumerate() {
524 let is_last = idx == bids_len - 1 && asks_len == 0;
525 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
526
527 let price = Decimal::from_str(&bid.price)
528 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
529
530 let size = Decimal::from_str(&bid.size)
531 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
532
533 let order = BookOrder::new(
534 OrderSide::Buy,
535 Price::from_decimal_dp(price, price_precision).map_err(|e| {
536 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
537 })?,
538 Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
539 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
540 })?,
541 0,
542 );
543
544 deltas.push(OrderBookDelta::new(
545 *instrument_id,
546 BookAction::Add,
547 order,
548 flags,
549 0,
550 ts_init,
551 ts_init,
552 ));
553 }
554
555 for (idx, ask) in asks.iter().enumerate() {
556 let is_last = idx == asks_len - 1;
557 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
558
559 let price = Decimal::from_str(&ask.price)
560 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
561
562 let size = Decimal::from_str(&ask.size)
563 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
564
565 let order = BookOrder::new(
566 OrderSide::Sell,
567 Price::from_decimal_dp(price, price_precision).map_err(|e| {
568 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
569 })?,
570 Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
571 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
572 })?,
573 0,
574 );
575
576 deltas.push(OrderBookDelta::new(
577 *instrument_id,
578 BookAction::Add,
579 order,
580 flags,
581 0,
582 ts_init,
583 ts_init,
584 ));
585 }
586
587 Ok(OrderBookDeltas::new(*instrument_id, deltas))
588 }
589
590 fn parse_orderbook_deltas(
591 &self,
592 instrument_id: &InstrumentId,
593 contents: &DydxOrderbookContents,
594 price_precision: u8,
595 size_precision: u8,
596 ts_init: UnixNanos,
597 ) -> DydxWsResult<OrderBookDeltas> {
598 let deltas = self.parse_orderbook_deltas_with_flag(
599 instrument_id,
600 contents,
601 price_precision,
602 size_precision,
603 ts_init,
604 true, )?;
606 Ok(OrderBookDeltas::new(*instrument_id, deltas))
607 }
608
609 #[allow(clippy::too_many_arguments)]
610 fn parse_orderbook_deltas_with_flag(
611 &self,
612 instrument_id: &InstrumentId,
613 contents: &DydxOrderbookContents,
614 price_precision: u8,
615 size_precision: u8,
616 ts_init: UnixNanos,
617 is_last_message: bool,
618 ) -> DydxWsResult<Vec<OrderBookDelta>> {
619 let mut deltas = Vec::new();
620
621 let bids = contents.bids.as_deref().unwrap_or(&[]);
622 let asks = contents.asks.as_deref().unwrap_or(&[]);
623
624 let bids_len = bids.len();
625 let asks_len = asks.len();
626
627 for (idx, (price_str, size_str)) in bids.iter().enumerate() {
628 let is_last = is_last_message && idx == bids_len - 1 && asks_len == 0;
629 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
630
631 let price = Decimal::from_str(price_str)
632 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
633
634 let size = Decimal::from_str(size_str)
635 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
636
637 let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
638 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
639 })?;
640 let action = if qty.is_zero() {
641 BookAction::Delete
642 } else {
643 BookAction::Update
644 };
645
646 let order = BookOrder::new(
647 OrderSide::Buy,
648 Price::from_decimal_dp(price, price_precision).map_err(|e| {
649 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
650 })?,
651 qty,
652 0,
653 );
654
655 deltas.push(OrderBookDelta::new(
656 *instrument_id,
657 action,
658 order,
659 flags,
660 0,
661 ts_init,
662 ts_init,
663 ));
664 }
665
666 for (idx, (price_str, size_str)) in asks.iter().enumerate() {
667 let is_last = is_last_message && idx == asks_len - 1;
668 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
669
670 let price = Decimal::from_str(price_str)
671 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
672
673 let size = Decimal::from_str(size_str)
674 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
675
676 let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
677 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
678 })?;
679 let action = if qty.is_zero() {
680 BookAction::Delete
681 } else {
682 BookAction::Update
683 };
684
685 let order = BookOrder::new(
686 OrderSide::Sell,
687 Price::from_decimal_dp(price, price_precision).map_err(|e| {
688 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
689 })?,
690 qty,
691 0,
692 );
693
694 deltas.push(OrderBookDelta::new(
695 *instrument_id,
696 action,
697 order,
698 flags,
699 0,
700 ts_init,
701 ts_init,
702 ));
703 }
704
705 Ok(deltas)
706 }
707
708 fn parse_candles(
709 &self,
710 data: &DydxWsChannelDataMsg,
711 ) -> DydxWsResult<Option<NautilusWsMessage>> {
712 let topic = data
713 .id
714 .as_ref()
715 .ok_or_else(|| DydxWsError::Parse("Missing id for candles channel".into()))?;
716
717 let bar_type = self.bar_types.get(topic).ok_or_else(|| {
718 DydxWsError::Parse(format!("No bar type registered for topic: {topic}"))
719 })?;
720
721 let candle: DydxCandle = serde_json::from_value(data.contents.clone())
722 .map_err(|e| DydxWsError::Parse(format!("Failed to parse candle contents: {e}")))?;
723
724 let instrument_id = self.parse_instrument_id(&candle.ticker)?;
725 let instrument = self.get_instrument(&instrument_id)?;
726
727 let open = Decimal::from_str(&candle.open)
728 .map_err(|e| DydxWsError::Parse(format!("Failed to parse open: {e}")))?;
729 let high = Decimal::from_str(&candle.high)
730 .map_err(|e| DydxWsError::Parse(format!("Failed to parse high: {e}")))?;
731 let low = Decimal::from_str(&candle.low)
732 .map_err(|e| DydxWsError::Parse(format!("Failed to parse low: {e}")))?;
733 let close = Decimal::from_str(&candle.close)
734 .map_err(|e| DydxWsError::Parse(format!("Failed to parse close: {e}")))?;
735 let volume = Decimal::from_str(&candle.base_token_volume)
736 .map_err(|e| DydxWsError::Parse(format!("Failed to parse volume: {e}")))?;
737
738 let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
739
740 let started_at_nanos = candle.started_at.timestamp_nanos_opt().ok_or_else(|| {
742 DydxWsError::Parse(format!(
743 "Timestamp out of range for candle at {}",
744 candle.started_at
745 ))
746 })?;
747 let interval_nanos = get_bar_interval_ns(bar_type);
748 let ts_event = UnixNanos::from(started_at_nanos as u64) + interval_nanos;
749
750 let bar = Bar::new(
751 *bar_type,
752 Price::from_decimal_dp(open, instrument.price_precision()).map_err(|e| {
753 DydxWsError::Parse(format!("Failed to create open Price from decimal: {e}"))
754 })?,
755 Price::from_decimal_dp(high, instrument.price_precision()).map_err(|e| {
756 DydxWsError::Parse(format!("Failed to create high Price from decimal: {e}"))
757 })?,
758 Price::from_decimal_dp(low, instrument.price_precision()).map_err(|e| {
759 DydxWsError::Parse(format!("Failed to create low Price from decimal: {e}"))
760 })?,
761 Price::from_decimal_dp(close, instrument.price_precision()).map_err(|e| {
762 DydxWsError::Parse(format!("Failed to create close Price from decimal: {e}"))
763 })?,
764 Quantity::from_decimal_dp(volume, instrument.size_precision()).map_err(|e| {
765 DydxWsError::Parse(format!(
766 "Failed to create volume Quantity from decimal: {e}"
767 ))
768 })?,
769 ts_event,
770 ts_init,
771 );
772
773 Ok(Some(NautilusWsMessage::Data(vec![Data::Bar(bar)])))
774 }
775
776 fn parse_markets(
777 &self,
778 data: &DydxWsChannelDataMsg,
779 ) -> DydxWsResult<Option<NautilusWsMessage>> {
780 let contents: DydxMarketsContents = serde_json::from_value(data.contents.clone())
781 .map_err(|e| DydxWsError::Parse(format!("Failed to parse markets contents: {e}")))?;
782
783 if let Some(oracle_prices) = contents.oracle_prices {
786 tracing::debug!(
787 "Forwarding oracle price updates for {} markets to execution client",
788 oracle_prices.len()
789 );
790 return Ok(Some(NautilusWsMessage::OraclePrices(oracle_prices)));
791 }
792
793 Ok(None)
794 }
795
796 fn parse_subaccounts(
797 &self,
798 data: &DydxWsChannelDataMsg,
799 ) -> DydxWsResult<Option<NautilusWsMessage>> {
800 use crate::schemas::ws::{DydxWsSubaccountsChannelContents, DydxWsSubaccountsChannelData};
801
802 let contents: DydxWsSubaccountsChannelContents =
803 serde_json::from_value(data.contents.clone()).map_err(|e| {
804 DydxWsError::Parse(format!("Failed to parse subaccounts contents: {e}"))
805 })?;
806
807 let has_orders = contents.orders.as_ref().is_some_and(|o| !o.is_empty());
809 let has_fills = contents.fills.as_ref().is_some_and(|f| !f.is_empty());
810
811 if has_orders || has_fills {
812 tracing::debug!(
815 "Received {} order(s), {} fill(s) - forwarding to execution client",
816 contents.orders.as_ref().map_or(0, |o| o.len()),
817 contents.fills.as_ref().map_or(0, |f| f.len())
818 );
819
820 let channel_data = DydxWsSubaccountsChannelData {
821 msg_type: data.msg_type,
822 connection_id: data.connection_id.clone(),
823 message_id: data.message_id,
824 id: data.id.clone().unwrap_or_default(),
825 channel: data.channel,
826 version: data.version.clone().unwrap_or_default(),
827 contents,
828 };
829
830 return Ok(Some(NautilusWsMessage::SubaccountsChannelData(Box::new(
831 channel_data,
832 ))));
833 }
834
835 Ok(None)
836 }
837
838 fn parse_subaccounts_subscribed(
839 &self,
840 msg: &crate::schemas::ws::DydxWsSubaccountsSubscribed,
841 ) -> DydxWsResult<Option<NautilusWsMessage>> {
842 tracing::debug!("Forwarding subaccount subscription to execution client");
845 Ok(Some(NautilusWsMessage::SubaccountSubscribed(Box::new(
846 msg.clone(),
847 ))))
848 }
849
850 fn parse_instrument_id(&self, symbol: &str) -> DydxWsResult<InstrumentId> {
851 let symbol_with_perp = format!("{symbol}-PERP");
854 Ok(crate::common::parse::parse_instrument_id(&symbol_with_perp))
855 }
856
857 fn get_instrument(&self, instrument_id: &InstrumentId) -> DydxWsResult<&InstrumentAny> {
858 self.instruments
859 .get(&instrument_id.symbol.inner())
860 .ok_or_else(|| DydxWsError::Parse(format!("No instrument cached for {instrument_id}")))
861 }
862}