1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use anyhow::Context;
24use dashmap::{DashMap, DashSet};
25use futures_util::{Stream, StreamExt, pin_mut};
26use nautilus_common::{
27 clients::DataClient,
28 live::{runner::get_data_event_sender, runtime::get_runtime},
29 messages::{
30 DataEvent, DataResponse,
31 data::{
32 BarsResponse, InstrumentResponse, InstrumentsResponse, RequestBars, RequestInstrument,
33 RequestInstruments, RequestTrades, SubscribeBars, SubscribeBookDeltas,
34 SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument, SubscribeInstruments,
35 SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
36 UnsubscribeBookDeltas, UnsubscribeFundingRates, UnsubscribeIndexPrices,
37 UnsubscribeInstrument, UnsubscribeInstruments, UnsubscribeMarkPrices,
38 UnsubscribeQuotes, UnsubscribeTrades,
39 },
40 },
41};
42use nautilus_core::{
43 datetime::datetime_to_unix_nanos,
44 time::{AtomicTime, get_atomic_clock_realtime},
45};
46use nautilus_model::{
47 data::{
48 Bar, BarSpecification, BarType, BookOrder, Data as NautilusData, OrderBookDelta,
49 OrderBookDeltas, OrderBookDeltas_API, QuoteTick,
50 },
51 enums::{BookAction, BookType, OrderSide, RecordFlag},
52 identifiers::{ClientId, InstrumentId, Venue},
53 instruments::{Instrument, InstrumentAny},
54 orderbook::OrderBook,
55 types::Quantity,
56};
57use tokio::{task::JoinHandle, time::Duration};
58use tokio_util::sync::CancellationToken;
59
60use crate::{
61 common::{
62 consts::DYDX_VENUE, enums::DydxCandleResolution, instrument_cache::InstrumentCache,
63 parse::extract_raw_symbol,
64 },
65 config::DydxDataClientConfig,
66 http::client::DydxHttpClient,
67 websocket::{client::DydxWebSocketClient, enums::NautilusWsMessage, handler::HandlerCommand},
68};
69
70struct WsMessageContext {
72 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
73 instrument_cache: Arc<InstrumentCache>,
74 order_books: Arc<DashMap<InstrumentId, OrderBook>>,
75 last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
76 ws_client: DydxWebSocketClient,
77 http_client: DydxHttpClient,
78 active_quote_subs: Arc<DashSet<InstrumentId>>,
79 active_delta_subs: Arc<DashSet<InstrumentId>>,
80 active_trade_subs: Arc<DashMap<InstrumentId, ()>>,
81 active_bar_subs: Arc<DashMap<(InstrumentId, String), BarType>>,
82 incomplete_bars: Arc<DashMap<BarType, Bar>>,
83 active_mark_price_subs: Arc<DashSet<InstrumentId>>,
84 active_index_price_subs: Arc<DashSet<InstrumentId>>,
85 active_funding_rate_subs: Arc<DashSet<InstrumentId>>,
86}
87
88#[derive(Debug)]
96pub struct DydxDataClient {
97 clock: &'static AtomicTime,
99 client_id: ClientId,
101 config: DydxDataClientConfig,
103 http_client: DydxHttpClient,
105 ws_client: DydxWebSocketClient,
107 is_connected: AtomicBool,
109 cancellation_token: CancellationToken,
111 tasks: Vec<JoinHandle<()>>,
113 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
115 instrument_cache: Arc<InstrumentCache>,
117 order_books: Arc<DashMap<InstrumentId, OrderBook>>,
119 last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
121 incomplete_bars: Arc<DashMap<BarType, Bar>>,
125 bar_type_mappings: Arc<DashMap<String, BarType>>,
129 active_quote_subs: Arc<DashSet<InstrumentId>>,
131 active_delta_subs: Arc<DashSet<InstrumentId>>,
133 active_trade_subs: Arc<DashMap<InstrumentId, ()>>,
135 active_bar_subs: Arc<DashMap<(InstrumentId, String), BarType>>,
137 active_mark_price_subs: Arc<DashSet<InstrumentId>>,
139 active_index_price_subs: Arc<DashSet<InstrumentId>>,
141 active_funding_rate_subs: Arc<DashSet<InstrumentId>>,
143}
144
145impl DydxDataClient {
146 fn map_bar_spec_to_resolution(spec: &BarSpecification) -> anyhow::Result<&'static str> {
147 let resolution: &'static str = DydxCandleResolution::from_bar_spec(spec)?.into();
148 Ok(resolution)
149 }
150
151 pub fn new(
157 client_id: ClientId,
158 config: DydxDataClientConfig,
159 http_client: DydxHttpClient,
160 ws_client: DydxWebSocketClient,
161 ) -> anyhow::Result<Self> {
162 let clock = get_atomic_clock_realtime();
163 let data_sender = get_data_event_sender();
164
165 let instrument_cache = Arc::clone(http_client.instrument_cache());
167
168 Ok(Self {
169 clock,
170 client_id,
171 config,
172 http_client,
173 ws_client,
174 is_connected: AtomicBool::new(false),
175 cancellation_token: CancellationToken::new(),
176 tasks: Vec::new(),
177 data_sender,
178 instrument_cache,
179 order_books: Arc::new(DashMap::new()),
180 last_quotes: Arc::new(DashMap::new()),
181 incomplete_bars: Arc::new(DashMap::new()),
182 bar_type_mappings: Arc::new(DashMap::new()),
183 active_quote_subs: Arc::new(DashSet::new()),
184 active_delta_subs: Arc::new(DashSet::new()),
185 active_trade_subs: Arc::new(DashMap::new()),
186 active_bar_subs: Arc::new(DashMap::new()),
187 active_mark_price_subs: Arc::new(DashSet::new()),
188 active_index_price_subs: Arc::new(DashSet::new()),
189 active_funding_rate_subs: Arc::new(DashSet::new()),
190 })
191 }
192
193 #[must_use]
195 pub fn venue(&self) -> Venue {
196 *DYDX_VENUE
197 }
198
199 #[must_use]
201 pub fn config(&self) -> &DydxDataClientConfig {
202 &self.config
203 }
204
205 #[must_use]
207 pub fn is_connected(&self) -> bool {
208 self.is_connected.load(Ordering::Relaxed)
209 }
210
211 fn spawn_ws<F>(&self, fut: F, context: &'static str)
215 where
216 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
217 {
218 get_runtime().spawn(async move {
219 if let Err(e) = fut.await {
220 log::error!("{context}: {e:?}");
221 }
222 });
223 }
224
225 fn spawn_ws_stream_handler(
227 &mut self,
228 stream: impl Stream<Item = NautilusWsMessage> + Send + 'static,
229 ctx: WsMessageContext,
230 ) {
231 let cancellation = self.cancellation_token.clone();
232
233 let handle = get_runtime().spawn(async move {
234 log::debug!("Message processing task started");
235 pin_mut!(stream);
236
237 loop {
238 tokio::select! {
239 maybe_msg = stream.next() => {
240 match maybe_msg {
241 Some(msg) => Self::handle_ws_message(msg, &ctx),
242 None => {
243 log::debug!("WebSocket message channel closed");
244 break;
245 }
246 }
247 }
248 () = cancellation.cancelled() => {
249 log::debug!("WebSocket message task cancelled");
250 break;
251 }
252 }
253 }
254 log::debug!("WebSocket stream handler ended");
255 });
256
257 self.tasks.push(handle);
258 }
259
260 async fn await_tasks_with_timeout(&mut self, timeout: Duration) {
266 for handle in self.tasks.drain(..) {
267 let _ = tokio::time::timeout(timeout, handle).await;
268 }
269 }
270
271 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
286 self.http_client
288 .fetch_and_cache_instruments()
289 .await
290 .context("failed to load instruments from dYdX")?;
291
292 let instruments: Vec<InstrumentAny> = self.http_client.all_instruments();
293
294 if instruments.is_empty() {
295 log::warn!("No instruments were loaded");
296 return Ok(instruments);
297 }
298
299 log::info!("Loaded {} instruments into shared cache", instruments.len());
300
301 self.ws_client.cache_instruments(instruments.clone());
303
304 for instrument in &instruments {
306 if let Err(e) = self
307 .data_sender
308 .send(DataEvent::Instrument(instrument.clone()))
309 {
310 log::warn!("Failed to publish instrument {}: {e}", instrument.id());
311 }
312 }
313 log::debug!("Published {} instruments to data engine", instruments.len());
314
315 Ok(instruments)
316 }
317}
318
319#[async_trait::async_trait(?Send)]
320impl DataClient for DydxDataClient {
321 fn client_id(&self) -> ClientId {
322 self.client_id
323 }
324
325 fn venue(&self) -> Option<Venue> {
326 Some(*DYDX_VENUE)
327 }
328
329 fn start(&mut self) -> anyhow::Result<()> {
330 log::info!(
331 "Starting: client_id={}, is_testnet={}",
332 self.client_id,
333 self.http_client.is_testnet()
334 );
335 Ok(())
336 }
337
338 fn stop(&mut self) -> anyhow::Result<()> {
339 log::info!("Stopping {}", self.client_id);
340 self.cancellation_token.cancel();
341 self.is_connected.store(false, Ordering::Relaxed);
342 Ok(())
343 }
344
345 fn reset(&mut self) -> anyhow::Result<()> {
346 log::debug!("Resetting {}", self.client_id);
347 self.is_connected.store(false, Ordering::Relaxed);
348 self.cancellation_token = CancellationToken::new();
349 for handle in self.tasks.drain(..) {
351 handle.abort();
352 }
353 Ok(())
354 }
355
356 fn dispose(&mut self) -> anyhow::Result<()> {
357 log::debug!("Disposing {}", self.client_id);
358 self.stop()
359 }
360
361 async fn connect(&mut self) -> anyhow::Result<()> {
362 if self.is_connected() {
363 return Ok(());
364 }
365
366 log::info!("Connecting");
367
368 self.bootstrap_instruments().await?;
370
371 self.ws_client
373 .connect()
374 .await
375 .context("failed to connect dYdX websocket")?;
376
377 self.ws_client
378 .subscribe_markets()
379 .await
380 .context("failed to subscribe to markets channel")?;
381
382 let ctx = WsMessageContext {
384 data_sender: self.data_sender.clone(),
385 instrument_cache: self.instrument_cache.clone(),
386 order_books: self.order_books.clone(),
387 last_quotes: self.last_quotes.clone(),
388 ws_client: self.ws_client.clone(),
389 http_client: self.http_client.clone(),
390 active_quote_subs: self.active_quote_subs.clone(),
391 active_delta_subs: self.active_delta_subs.clone(),
392 active_trade_subs: self.active_trade_subs.clone(),
393 active_bar_subs: self.active_bar_subs.clone(),
394 incomplete_bars: self.incomplete_bars.clone(),
395 active_mark_price_subs: self.active_mark_price_subs.clone(),
396 active_index_price_subs: self.active_index_price_subs.clone(),
397 active_funding_rate_subs: self.active_funding_rate_subs.clone(),
398 };
399
400 let stream = self.ws_client.stream();
401 self.spawn_ws_stream_handler(stream, ctx);
402
403 self.is_connected.store(true, Ordering::Relaxed);
404 log::info!("Connected");
405
406 Ok(())
407 }
408
409 async fn disconnect(&mut self) -> anyhow::Result<()> {
410 if !self.is_connected() {
411 return Ok(());
412 }
413
414 log::info!("Disconnecting");
415
416 self.cancellation_token.cancel();
418
419 self.await_tasks_with_timeout(Duration::from_secs(5)).await;
421
422 self.ws_client
423 .disconnect()
424 .await
425 .context("failed to disconnect dYdX websocket")?;
426
427 self.is_connected.store(false, Ordering::Relaxed);
428 log::info!("Disconnected dYdX data client");
429
430 Ok(())
431 }
432
433 fn is_connected(&self) -> bool {
434 self.is_connected.load(Ordering::Relaxed)
435 }
436
437 fn is_disconnected(&self) -> bool {
438 !self.is_connected()
439 }
440
441 fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
442 log::debug!("unsubscribe_instruments: dYdX markets channel is global; no-op");
446 Ok(())
447 }
448
449 fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
450 log::debug!("unsubscribe_instrument: dYdX markets channel is global; no-op");
453 Ok(())
454 }
455
456 fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
457 log::debug!("subscribe_instruments: dYdX auto-subscribes via markets channel");
460 Ok(())
461 }
462
463 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
464 if let Some(instrument) = self.instrument_cache.get(&cmd.instrument_id) {
467 log::debug!("Sending cached instrument for {}", cmd.instrument_id);
468 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
469 log::warn!("Failed to send instrument {}: {e}", cmd.instrument_id);
470 }
471 } else {
472 log::warn!(
473 "Instrument {} not found in cache (available: {})",
474 cmd.instrument_id,
475 self.instrument_cache.len()
476 );
477 }
478 Ok(())
479 }
480
481 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
482 let instrument_id = cmd.instrument_id;
483 self.active_mark_price_subs.insert(instrument_id);
484 log::info!("Subscribed to mark prices for {instrument_id} (via v4_markets channel)");
485 Ok(())
486 }
487
488 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
489 let instrument_id = cmd.instrument_id;
490 self.active_index_price_subs.insert(instrument_id);
491 log::info!("Subscribed to index prices for {instrument_id} (via v4_markets channel)");
492 Ok(())
493 }
494
495 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
496 let instrument_id = cmd.instrument_id;
497 self.active_funding_rate_subs.insert(instrument_id);
498 log::info!("Subscribed to funding rates for {instrument_id} (via v4_markets channel)");
499 Ok(())
500 }
501
502 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
503 self.active_mark_price_subs.remove(&cmd.instrument_id);
504 log::info!("Unsubscribed from mark prices for {}", cmd.instrument_id);
505 Ok(())
506 }
507
508 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
509 self.active_index_price_subs.remove(&cmd.instrument_id);
510 log::info!("Unsubscribed from index prices for {}", cmd.instrument_id);
511 Ok(())
512 }
513
514 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
515 self.active_funding_rate_subs.remove(&cmd.instrument_id);
516 log::info!("Unsubscribed from funding rates for {}", cmd.instrument_id);
517 Ok(())
518 }
519
520 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
521 let ws = self.ws_client.clone();
522 let instrument_id = cmd.instrument_id;
523
524 self.active_trade_subs.insert(instrument_id, ());
526
527 self.spawn_ws(
528 async move {
529 ws.subscribe_trades(instrument_id)
530 .await
531 .context("trade subscription")
532 },
533 "dYdX trade subscription",
534 );
535
536 Ok(())
537 }
538
539 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
540 if cmd.book_type != BookType::L2_MBP {
541 anyhow::bail!(
542 "dYdX only supports L2_MBP order book deltas, received {:?}",
543 cmd.book_type
544 );
545 }
546
547 self.ensure_order_book(cmd.instrument_id, BookType::L2_MBP);
549
550 self.active_delta_subs.insert(cmd.instrument_id);
552
553 let ws = self.ws_client.clone();
554 let instrument_id = cmd.instrument_id;
555
556 self.spawn_ws(
557 async move {
558 ws.subscribe_orderbook(instrument_id)
559 .await
560 .context("orderbook subscription")
561 },
562 "dYdX orderbook subscription",
563 );
564
565 Ok(())
566 }
567
568 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
569 log::debug!(
572 "Subscribe_quotes for {}: subscribing to orderbook WS channel for quote synthesis",
573 cmd.instrument_id
574 );
575
576 self.ensure_order_book(cmd.instrument_id, BookType::L2_MBP);
577 self.active_quote_subs.insert(cmd.instrument_id);
578 let ws = self.ws_client.clone();
579 let instrument_id = cmd.instrument_id;
580
581 self.spawn_ws(
582 async move {
583 ws.subscribe_orderbook(instrument_id)
584 .await
585 .context("orderbook subscription (for quotes)")
586 },
587 "dYdX orderbook subscription (quotes)",
588 );
589
590 Ok(())
591 }
592
593 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
594 let ws = self.ws_client.clone();
595 let instrument_id = cmd.bar_type.instrument_id();
596 let spec = cmd.bar_type.spec();
597
598 let resolution = Self::map_bar_spec_to_resolution(&spec)?;
600
601 let bar_type = cmd.bar_type;
603 self.active_bar_subs
604 .insert((instrument_id, resolution.to_string()), bar_type);
605
606 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
608 let topic = format!("{ticker}/{resolution}");
609 self.bar_type_mappings.insert(topic.clone(), bar_type);
610
611 self.spawn_ws(
612 async move {
613 if let Err(e) = ws.send_command(HandlerCommand::RegisterBarType { topic, bar_type })
615 {
616 anyhow::bail!("Failed to register bar type: {e}");
617 }
618
619 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
621
622 ws.subscribe_candles(instrument_id, resolution)
623 .await
624 .context("candles subscription")
625 },
626 "dYdX candles subscription",
627 );
628
629 Ok(())
630 }
631
632 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
633 self.active_trade_subs.remove(&cmd.instrument_id);
635
636 let ws = self.ws_client.clone();
637 let instrument_id = cmd.instrument_id;
638
639 self.spawn_ws(
640 async move {
641 ws.unsubscribe_trades(instrument_id)
642 .await
643 .context("trade unsubscription")
644 },
645 "dYdX trade unsubscription",
646 );
647
648 Ok(())
649 }
650
651 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
652 self.active_delta_subs.remove(&cmd.instrument_id);
654
655 let ws = self.ws_client.clone();
656 let instrument_id = cmd.instrument_id;
657
658 self.spawn_ws(
659 async move {
660 ws.unsubscribe_orderbook(instrument_id)
661 .await
662 .context("orderbook unsubscription")
663 },
664 "dYdX orderbook unsubscription",
665 );
666
667 Ok(())
668 }
669
670 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
671 log::debug!(
672 "unsubscribe_quotes for {}: removing quote subscription",
673 cmd.instrument_id
674 );
675
676 self.active_quote_subs.remove(&cmd.instrument_id);
678
679 let ws = self.ws_client.clone();
682 let instrument_id = cmd.instrument_id;
683
684 self.spawn_ws(
685 async move {
686 ws.unsubscribe_orderbook(instrument_id)
687 .await
688 .context("orderbook unsubscription (for quotes)")
689 },
690 "dYdX orderbook unsubscription (quotes)",
691 );
692
693 Ok(())
694 }
695
696 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
697 let ws = self.ws_client.clone();
698 let instrument_id = cmd.bar_type.instrument_id();
699 let spec = cmd.bar_type.spec();
700
701 let resolution = Self::map_bar_spec_to_resolution(&spec)?;
702
703 self.active_bar_subs
705 .remove(&(instrument_id, resolution.to_string()));
706
707 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
709 let topic = format!("{ticker}/{resolution}");
710 self.bar_type_mappings.remove(&topic);
711
712 if let Err(e) = ws.send_command(HandlerCommand::UnregisterBarType { topic }) {
713 log::warn!("Failed to unregister bar type: {e}");
714 }
715
716 self.spawn_ws(
717 async move {
718 ws.unsubscribe_candles(instrument_id, resolution)
719 .await
720 .context("candles unsubscription")
721 },
722 "dYdX candles unsubscription",
723 );
724
725 Ok(())
726 }
727
728 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
729 if request.start.is_some() {
730 log::warn!(
731 "Requesting instrument {} with specified `start` which has no effect",
732 request.instrument_id
733 );
734 }
735 if request.end.is_some() {
736 log::warn!(
737 "Requesting instrument {} with specified `end` which has no effect",
738 request.instrument_id
739 );
740 }
741
742 let instrument_cache = self.instrument_cache.clone();
743 let sender = self.data_sender.clone();
744 let http = self.http_client.clone();
745 let instrument_id = request.instrument_id;
746 let request_id = request.request_id;
747 let client_id = request.client_id.unwrap_or(self.client_id);
748 let start = request.start;
749 let end = request.end;
750 let params = request.params;
751 let clock = self.clock;
752 let start_nanos = datetime_to_unix_nanos(start);
753 let end_nanos = datetime_to_unix_nanos(end);
754
755 get_runtime().spawn(async move {
756 let instrument = if let Some(cached) = instrument_cache.get(&instrument_id) {
758 log::debug!("Found instrument {instrument_id} in cache");
759 Some(cached)
760 } else {
761 log::debug!("Instrument {instrument_id} not in cache, fetching from API");
763 match http.request_instruments(None, None, None).await {
764 Ok(instruments) => {
765 for inst in &instruments {
767 instrument_cache.insert_instrument_only(inst.clone());
768 }
769 instruments.into_iter().find(|i| i.id() == instrument_id)
771 }
772 Err(e) => {
773 log::error!("Failed to fetch instruments from dYdX: {e:?}");
774 None
775 }
776 }
777 };
778
779 if let Some(inst) = instrument {
780 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
781 request_id,
782 client_id,
783 instrument_id,
784 inst,
785 start_nanos,
786 end_nanos,
787 clock.get_time_ns(),
788 params,
789 )));
790
791 if let Err(e) = sender.send(DataEvent::Response(response)) {
792 log::error!("Failed to send instrument response: {e}");
793 }
794 } else {
795 log::error!("Instrument {instrument_id} not found");
796 }
797 });
798
799 Ok(())
800 }
801
802 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
803 let http = self.http_client.clone();
804 let sender = self.data_sender.clone();
805 let instrument_cache = self.instrument_cache.clone();
806 let request_id = request.request_id;
807 let client_id = request.client_id.unwrap_or(self.client_id);
808 let venue = self.venue();
809 let start = request.start;
810 let end = request.end;
811 let params = request.params;
812 let clock = self.clock;
813 let start_nanos = datetime_to_unix_nanos(start);
814 let end_nanos = datetime_to_unix_nanos(end);
815
816 get_runtime().spawn(async move {
817 match http.request_instruments(None, None, None).await {
818 Ok(instruments) => {
819 log::info!("Fetched {} instruments from dYdX", instruments.len());
820
821 for instrument in &instruments {
823 instrument_cache.insert_instrument_only(instrument.clone());
824 }
825
826 let response = DataResponse::Instruments(InstrumentsResponse::new(
827 request_id,
828 client_id,
829 venue,
830 instruments,
831 start_nanos,
832 end_nanos,
833 clock.get_time_ns(),
834 params,
835 ));
836
837 if let Err(e) = sender.send(DataEvent::Response(response)) {
838 log::error!("Failed to send instruments response: {e}");
839 }
840 }
841 Err(e) => {
842 log::error!("Failed to fetch instruments from dYdX: {e:?}");
843
844 let response = DataResponse::Instruments(InstrumentsResponse::new(
846 request_id,
847 client_id,
848 venue,
849 Vec::new(),
850 start_nanos,
851 end_nanos,
852 clock.get_time_ns(),
853 params,
854 ));
855
856 if let Err(e) = sender.send(DataEvent::Response(response)) {
857 log::error!("Failed to send empty instruments response: {e}");
858 }
859 }
860 }
861 });
862
863 Ok(())
864 }
865
866 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
867 let http_client = self.http_client.clone();
868 let sender = self.data_sender.clone();
869 let instrument_id = request.instrument_id;
870 let start = request.start;
871 let end = request.end;
872 let limit = request.limit.map(|n| n.get() as u32);
873 let request_id = request.request_id;
874 let client_id = request.client_id.unwrap_or(self.client_id);
875 let params = request.params;
876 let clock = self.clock;
877 let start_nanos = datetime_to_unix_nanos(start);
878 let end_nanos = datetime_to_unix_nanos(end);
879
880 get_runtime().spawn(async move {
881 match http_client
882 .request_trade_ticks(instrument_id, start, end, limit)
883 .await
884 .context("failed to request trades from dYdX")
885 {
886 Ok(trades) => {
887 let response = DataResponse::Trades(TradesResponse::new(
888 request_id,
889 client_id,
890 instrument_id,
891 trades,
892 start_nanos,
893 end_nanos,
894 clock.get_time_ns(),
895 params,
896 ));
897 if let Err(e) = sender.send(DataEvent::Response(response)) {
898 log::error!("Failed to send trades response: {e}");
899 }
900 }
901 Err(e) => log::error!("Trade request failed for {instrument_id}: {e:?}"),
902 }
903 });
904
905 Ok(())
906 }
907
908 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
909 let http_client = self.http_client.clone();
910 let sender = self.data_sender.clone();
911 let bar_type = request.bar_type;
912 let start = request.start;
913 let end = request.end;
914 let limit = request.limit.map(|n| n.get() as u32);
915 let request_id = request.request_id;
916 let client_id = request.client_id.unwrap_or(self.client_id);
917 let params = request.params;
918 let clock = self.clock;
919 let start_nanos = datetime_to_unix_nanos(start);
920 let end_nanos = datetime_to_unix_nanos(end);
921
922 get_runtime().spawn(async move {
923 match http_client
924 .request_bars(bar_type, start, end, limit, true)
925 .await
926 .context("failed to request bars from dYdX")
927 {
928 Ok(bars) => {
929 let response = DataResponse::Bars(BarsResponse::new(
930 request_id,
931 client_id,
932 bar_type,
933 bars,
934 start_nanos,
935 end_nanos,
936 clock.get_time_ns(),
937 params,
938 ));
939 if let Err(e) = sender.send(DataEvent::Response(response)) {
940 log::error!("Failed to send bars response: {e}");
941 }
942 }
943 Err(e) => log::error!("Bar request failed for {bar_type}: {e:?}"),
944 }
945 });
946
947 Ok(())
948 }
949}
950
951impl DydxDataClient {
952 #[must_use]
954 pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
955 self.instrument_cache.get(instrument_id)
956 }
957
958 #[must_use]
960 pub fn get_instruments(&self) -> Vec<InstrumentAny> {
961 self.instrument_cache.all_instruments()
962 }
963
964 pub fn cache_instrument(&self, instrument: InstrumentAny) {
966 self.instrument_cache.insert_instrument_only(instrument);
967 }
968
969 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
973 self.instrument_cache.clear();
974 self.instrument_cache.insert_instruments_only(instruments);
975 }
976
977 fn ensure_order_book(&self, instrument_id: InstrumentId, book_type: BookType) {
978 self.order_books
979 .entry(instrument_id)
980 .or_insert_with(|| OrderBook::new(instrument_id, book_type));
981 }
982
983 #[must_use]
985 pub fn get_bar_type_for_topic(&self, topic: &str) -> Option<BarType> {
986 self.bar_type_mappings
987 .get(topic)
988 .map(|entry| *entry.value())
989 }
990
991 #[must_use]
993 pub fn get_bar_topics(&self) -> Vec<String> {
994 self.bar_type_mappings
995 .iter()
996 .map(|entry| entry.key().clone())
997 .collect()
998 }
999
1000 fn handle_ws_message(message: NautilusWsMessage, ctx: &WsMessageContext) {
1001 match message {
1002 NautilusWsMessage::Data(payloads) => {
1003 Self::handle_data_message(payloads, &ctx.data_sender, &ctx.incomplete_bars);
1004 }
1005 NautilusWsMessage::Deltas(deltas) => {
1006 Self::handle_deltas_message(
1007 *deltas,
1008 &ctx.data_sender,
1009 &ctx.order_books,
1010 &ctx.last_quotes,
1011 &ctx.instrument_cache,
1012 &ctx.active_quote_subs,
1013 &ctx.active_delta_subs,
1014 );
1015 }
1016 NautilusWsMessage::MarkPrice(mark_price) => {
1017 if ctx
1018 .active_mark_price_subs
1019 .contains(&mark_price.instrument_id)
1020 {
1021 let data = NautilusData::MarkPriceUpdate(mark_price);
1022 if let Err(e) = ctx.data_sender.send(DataEvent::Data(data)) {
1023 log::error!("Failed to emit mark price: {e}");
1024 }
1025 }
1026 }
1027 NautilusWsMessage::IndexPrice(index_price) => {
1028 if ctx
1029 .active_index_price_subs
1030 .contains(&index_price.instrument_id)
1031 {
1032 let data = NautilusData::IndexPriceUpdate(index_price);
1033 if let Err(e) = ctx.data_sender.send(DataEvent::Data(data)) {
1034 log::error!("Failed to emit index price: {e}");
1035 }
1036 }
1037 }
1038 NautilusWsMessage::FundingRate(funding_rate) => {
1039 if ctx
1040 .active_funding_rate_subs
1041 .contains(&funding_rate.instrument_id)
1042 && let Err(e) = ctx.data_sender.send(DataEvent::FundingRate(funding_rate))
1043 {
1044 log::error!("Failed to emit funding rate: {e}");
1045 }
1046 }
1047 NautilusWsMessage::Error(err) => {
1048 log::error!("dYdX WS error: {err}");
1049 }
1050 NautilusWsMessage::Reconnected => {
1051 log::info!("dYdX WS reconnected - re-subscribing to active subscriptions");
1052
1053 let total_subs = ctx.active_quote_subs.len()
1054 + ctx.active_delta_subs.len()
1055 + ctx.active_trade_subs.len()
1056 + ctx.active_bar_subs.len();
1057
1058 if total_subs == 0 {
1059 log::debug!("No active subscriptions to restore");
1060 return;
1061 }
1062
1063 log::info!(
1064 "Restoring {} subscriptions (quotes={}, deltas={}, trades={}, bars={})",
1065 total_subs,
1066 ctx.active_quote_subs.len(),
1067 ctx.active_delta_subs.len(),
1068 ctx.active_trade_subs.len(),
1069 ctx.active_bar_subs.len()
1070 );
1071
1072 for instrument_id in ctx.active_quote_subs.iter() {
1074 let instrument_id = *instrument_id;
1075 let ws_clone = ctx.ws_client.clone();
1076 get_runtime().spawn(async move {
1077 if let Err(e) = ws_clone.subscribe_orderbook(instrument_id).await {
1078 log::error!(
1079 "Failed to re-subscribe to orderbook (quotes) for {instrument_id}: {e:?}"
1080 );
1081 } else {
1082 log::debug!("Re-subscribed to orderbook (quotes) for {instrument_id}");
1083 }
1084 });
1085 }
1086
1087 for instrument_id in ctx.active_delta_subs.iter() {
1089 let instrument_id = *instrument_id;
1090 let ws_clone = ctx.ws_client.clone();
1091 get_runtime().spawn(async move {
1092 if let Err(e) = ws_clone.subscribe_orderbook(instrument_id).await {
1093 log::error!(
1094 "Failed to re-subscribe to orderbook (deltas) for {instrument_id}: {e:?}"
1095 );
1096 } else {
1097 log::debug!("Re-subscribed to orderbook (deltas) for {instrument_id}");
1098 }
1099 });
1100 }
1101
1102 for entry in ctx.active_trade_subs.iter() {
1104 let instrument_id = *entry.key();
1105 let ws_clone = ctx.ws_client.clone();
1106 get_runtime().spawn(async move {
1107 if let Err(e) = ws_clone.subscribe_trades(instrument_id).await {
1108 log::error!(
1109 "Failed to re-subscribe to trades for {instrument_id}: {e:?}"
1110 );
1111 } else {
1112 log::debug!("Re-subscribed to trades for {instrument_id}");
1113 }
1114 });
1115 }
1116
1117 for entry in ctx.active_bar_subs.iter() {
1119 let (instrument_id, resolution) = entry.key();
1120 let instrument_id = *instrument_id;
1121 let resolution = resolution.clone();
1122 let bar_type = *entry.value();
1123 let ws_clone = ctx.ws_client.clone();
1124
1125 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1127 let topic = format!("{ticker}/{resolution}");
1128 if let Err(e) = ctx
1129 .ws_client
1130 .send_command(HandlerCommand::RegisterBarType { topic, bar_type })
1131 {
1132 log::warn!(
1133 "Failed to re-register bar type for {instrument_id} ({resolution}): {e}"
1134 );
1135 }
1136
1137 get_runtime().spawn(async move {
1138 if let Err(e) =
1139 ws_clone.subscribe_candles(instrument_id, &resolution).await
1140 {
1141 log::error!(
1142 "Failed to re-subscribe to candles for {instrument_id} ({resolution}): {e:?}"
1143 );
1144 } else {
1145 log::debug!(
1146 "Re-subscribed to candles for {instrument_id} ({resolution})"
1147 );
1148 }
1149 });
1150 }
1151
1152 log::info!("Completed re-subscription requests after reconnection");
1153 }
1154 NautilusWsMessage::BlockHeight { .. } => {
1155 log::debug!(
1156 "Ignoring block height message on dYdX data client (handled by execution adapter)"
1157 );
1158 }
1159 NautilusWsMessage::Order(_)
1160 | NautilusWsMessage::Fill(_)
1161 | NautilusWsMessage::Position(_)
1162 | NautilusWsMessage::AccountState(_)
1163 | NautilusWsMessage::SubaccountSubscribed(_)
1164 | NautilusWsMessage::SubaccountsChannelData(_) => {
1165 log::debug!(
1166 "Ignoring execution/subaccount message on dYdX data client (handled by execution adapter)"
1167 );
1168 }
1169 NautilusWsMessage::NewInstrumentDiscovered { ticker } => {
1170 log::info!("New instrument discovered via WebSocket: {ticker}");
1172
1173 let http_client = ctx.http_client.clone();
1174 let ws_client = ctx.ws_client.clone();
1175 let data_sender = ctx.data_sender.clone();
1176
1177 get_runtime().spawn(async move {
1178 match http_client.fetch_and_cache_single_instrument(&ticker).await {
1179 Ok(Some(instrument)) => {
1180 ws_client.cache_instrument(instrument.clone());
1182 if let Err(e) = data_sender.send(DataEvent::Instrument(instrument)) {
1186 log::error!("Failed to emit new instrument: {e}");
1187 }
1188 log::info!("Fetched and cached new instrument: {ticker}");
1189 }
1190 Ok(None) => {
1191 log::warn!("New instrument {ticker} not found or inactive");
1192 }
1193 Err(e) => {
1194 log::error!("Failed to fetch new instrument {ticker}: {e}");
1195 }
1196 }
1197 });
1198 }
1199 }
1200 }
1201
1202 fn handle_data_message(
1203 payloads: Vec<NautilusData>,
1204 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1205 incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1206 ) {
1207 for data in payloads {
1208 if let NautilusData::Bar(bar) = data {
1210 Self::handle_bar_message(bar, data_sender, incomplete_bars);
1211 } else if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1212 log::error!("Failed to emit data event: {e}");
1213 }
1214 }
1215 }
1216
1217 fn handle_bar_message(
1224 bar: Bar,
1225 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1226 incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1227 ) {
1228 let current_time_ns = get_atomic_clock_realtime().get_time_ns();
1229 let bar_type = bar.bar_type;
1230
1231 if bar.ts_event <= current_time_ns {
1232 incomplete_bars.remove(&bar_type);
1234 if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Bar(bar))) {
1235 log::error!("Failed to emit completed bar: {e}");
1236 }
1237 } else {
1238 log::trace!(
1240 "Caching incomplete bar for {} (ts_event={}, current={})",
1241 bar_type,
1242 bar.ts_event,
1243 current_time_ns
1244 );
1245 incomplete_bars.insert(bar_type, bar);
1246 }
1247 }
1248
1249 fn resolve_crossed_order_book(
1267 book: &mut OrderBook,
1268 venue_deltas: OrderBookDeltas,
1269 instrument: &InstrumentAny,
1270 ) -> anyhow::Result<OrderBookDeltas> {
1271 let instrument_id = venue_deltas.instrument_id;
1272 let ts_init = venue_deltas.ts_init;
1273 let mut all_deltas = venue_deltas.deltas.clone();
1274
1275 book.apply_deltas(&venue_deltas)?;
1277
1278 let mut is_crossed = if let (Some(bid_price), Some(ask_price)) =
1280 (book.best_bid_price(), book.best_ask_price())
1281 {
1282 bid_price >= ask_price
1283 } else {
1284 false
1285 };
1286
1287 while is_crossed {
1289 log::debug!(
1290 "Resolving crossed order book for {}: bid={:?} >= ask={:?}",
1291 instrument_id,
1292 book.best_bid_price(),
1293 book.best_ask_price()
1294 );
1295
1296 let bid_price = match book.best_bid_price() {
1297 Some(p) => p,
1298 None => break,
1299 };
1300 let ask_price = match book.best_ask_price() {
1301 Some(p) => p,
1302 None => break,
1303 };
1304 let bid_size = match book.best_bid_size() {
1305 Some(s) => s,
1306 None => break,
1307 };
1308 let ask_size = match book.best_ask_size() {
1309 Some(s) => s,
1310 None => break,
1311 };
1312
1313 let mut temp_deltas = Vec::new();
1314
1315 if bid_size > ask_size {
1316 let new_bid_size = Quantity::new(
1318 bid_size.as_f64() - ask_size.as_f64(),
1319 instrument.size_precision(),
1320 );
1321 temp_deltas.push(OrderBookDelta::new(
1322 instrument_id,
1323 BookAction::Update,
1324 BookOrder::new(OrderSide::Buy, bid_price, new_bid_size, 0),
1325 0,
1326 0,
1327 ts_init,
1328 ts_init,
1329 ));
1330 temp_deltas.push(OrderBookDelta::new(
1331 instrument_id,
1332 BookAction::Delete,
1333 BookOrder::new(
1334 OrderSide::Sell,
1335 ask_price,
1336 Quantity::new(0.0, instrument.size_precision()),
1337 0,
1338 ),
1339 0,
1340 0,
1341 ts_init,
1342 ts_init,
1343 ));
1344 } else if bid_size < ask_size {
1345 let new_ask_size = Quantity::new(
1347 ask_size.as_f64() - bid_size.as_f64(),
1348 instrument.size_precision(),
1349 );
1350 temp_deltas.push(OrderBookDelta::new(
1351 instrument_id,
1352 BookAction::Update,
1353 BookOrder::new(OrderSide::Sell, ask_price, new_ask_size, 0),
1354 0,
1355 0,
1356 ts_init,
1357 ts_init,
1358 ));
1359 temp_deltas.push(OrderBookDelta::new(
1360 instrument_id,
1361 BookAction::Delete,
1362 BookOrder::new(
1363 OrderSide::Buy,
1364 bid_price,
1365 Quantity::new(0.0, instrument.size_precision()),
1366 0,
1367 ),
1368 0,
1369 0,
1370 ts_init,
1371 ts_init,
1372 ));
1373 } else {
1374 temp_deltas.push(OrderBookDelta::new(
1376 instrument_id,
1377 BookAction::Delete,
1378 BookOrder::new(
1379 OrderSide::Buy,
1380 bid_price,
1381 Quantity::new(0.0, instrument.size_precision()),
1382 0,
1383 ),
1384 0,
1385 0,
1386 ts_init,
1387 ts_init,
1388 ));
1389 temp_deltas.push(OrderBookDelta::new(
1390 instrument_id,
1391 BookAction::Delete,
1392 BookOrder::new(
1393 OrderSide::Sell,
1394 ask_price,
1395 Quantity::new(0.0, instrument.size_precision()),
1396 0,
1397 ),
1398 0,
1399 0,
1400 ts_init,
1401 ts_init,
1402 ));
1403 }
1404
1405 let temp_deltas_obj = OrderBookDeltas::new(instrument_id, temp_deltas.clone());
1407 book.apply_deltas(&temp_deltas_obj)?;
1408 all_deltas.extend(temp_deltas);
1409
1410 is_crossed = if let (Some(bid_price), Some(ask_price)) =
1412 (book.best_bid_price(), book.best_ask_price())
1413 {
1414 bid_price >= ask_price
1415 } else {
1416 false
1417 };
1418 }
1419
1420 if let Some(last_delta) = all_deltas.last_mut() {
1422 last_delta.flags = RecordFlag::F_LAST as u8;
1423 }
1424
1425 Ok(OrderBookDeltas::new(instrument_id, all_deltas))
1426 }
1427
1428 fn handle_deltas_message(
1429 deltas: OrderBookDeltas,
1430 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1431 order_books: &Arc<DashMap<InstrumentId, OrderBook>>,
1432 last_quotes: &Arc<DashMap<InstrumentId, QuoteTick>>,
1433 instrument_cache: &Arc<InstrumentCache>,
1434 active_quote_subs: &Arc<DashSet<InstrumentId>>,
1435 active_delta_subs: &Arc<DashSet<InstrumentId>>,
1436 ) {
1437 let instrument_id = deltas.instrument_id;
1438
1439 let instrument = match instrument_cache.get(&instrument_id) {
1441 Some(inst) => inst,
1442 None => {
1443 log::error!("Cannot resolve crossed order book: no instrument for {instrument_id}");
1444 if active_delta_subs.contains(&instrument_id)
1446 && let Err(e) = data_sender.send(DataEvent::Data(NautilusData::from(
1447 OrderBookDeltas_API::new(deltas),
1448 )))
1449 {
1450 log::error!("Failed to emit order book deltas: {e}");
1451 }
1452 return;
1453 }
1454 };
1455
1456 let mut book = order_books
1458 .entry(instrument_id)
1459 .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
1460
1461 let resolved_deltas = match Self::resolve_crossed_order_book(&mut book, deltas, &instrument)
1463 {
1464 Ok(d) => d,
1465 Err(e) => {
1466 log::error!("Failed to resolve crossed order book for {instrument_id}: {e}");
1467 return;
1468 }
1469 };
1470
1471 if active_quote_subs.contains(&instrument_id) {
1473 let quote_opt = if let (Some(bid_price), Some(ask_price)) =
1476 (book.best_bid_price(), book.best_ask_price())
1477 && let (Some(bid_size), Some(ask_size)) =
1478 (book.best_bid_size(), book.best_ask_size())
1479 {
1480 Some(QuoteTick::new(
1481 instrument_id,
1482 bid_price,
1483 ask_price,
1484 bid_size,
1485 ask_size,
1486 resolved_deltas.ts_event,
1487 resolved_deltas.ts_init,
1488 ))
1489 } else {
1490 if book.best_bid_price().is_none() && book.best_ask_price().is_none() {
1492 log::debug!(
1493 "Empty orderbook for {instrument_id} after applying deltas, using last quote"
1494 );
1495 last_quotes.get(&instrument_id).map(|q| *q)
1496 } else {
1497 None
1498 }
1499 };
1500
1501 if let Some(quote) = quote_opt {
1502 let emit_quote = !matches!(
1504 last_quotes.get(&instrument_id),
1505 Some(existing) if *existing == quote
1506 );
1507
1508 if emit_quote {
1509 last_quotes.insert(instrument_id, quote);
1510 if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Quote(quote))) {
1511 log::error!("Failed to emit quote tick: {e}");
1512 }
1513 }
1514 } else if book.best_bid_price().is_some() || book.best_ask_price().is_some() {
1515 log::debug!(
1517 "Incomplete top-of-book for {instrument_id} (bid={:?}, ask={:?})",
1518 book.best_bid_price(),
1519 book.best_ask_price()
1520 );
1521 }
1522 }
1523
1524 if active_delta_subs.contains(&instrument_id) {
1526 let data: NautilusData = OrderBookDeltas_API::new(resolved_deltas).into();
1527 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1528 log::error!("Failed to emit order book deltas event: {e}");
1529 }
1530 }
1531 }
1532}