1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use anyhow::Context;
24use dashmap::DashMap;
25use nautilus_common::{
26 clients::DataClient,
27 live::{runner::get_data_event_sender, runtime::get_runtime},
28 messages::{
29 DataEvent, DataResponse,
30 data::{
31 BarsResponse, InstrumentResponse, InstrumentsResponse, RequestBars, RequestInstrument,
32 RequestInstruments, RequestTrades, SubscribeBars, SubscribeBookDeltas,
33 SubscribeInstrument, SubscribeInstruments, SubscribeQuotes, SubscribeTrades,
34 TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeInstrument,
35 UnsubscribeInstruments, UnsubscribeQuotes, UnsubscribeTrades,
36 },
37 },
38};
39use nautilus_core::{
40 UnixNanos,
41 datetime::datetime_to_unix_nanos,
42 time::{AtomicTime, get_atomic_clock_realtime},
43};
44use nautilus_model::{
45 data::{
46 Bar, BarSpecification, BarType, BookOrder, Data as NautilusData, IndexPriceUpdate,
47 OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API, QuoteTick, TradeTick,
48 },
49 enums::{
50 AggregationSource, AggressorSide, BarAggregation, BookAction, BookType, OrderSide,
51 PriceType, RecordFlag,
52 },
53 identifiers::{ClientId, InstrumentId, TradeId, Venue},
54 instruments::{Instrument, InstrumentAny},
55 orderbook::OrderBook,
56 types::{Price, Quantity},
57};
58use rust_decimal::Decimal;
59use tokio::{task::JoinHandle, time::Duration};
60use tokio_util::sync::CancellationToken;
61use ustr::Ustr;
62
63use crate::{
64 common::{consts::DYDX_VENUE, parse::extract_raw_symbol},
65 config::DydxDataClientConfig,
66 http::client::DydxHttpClient,
67 types::DydxOraclePrice,
68 websocket::client::DydxWebSocketClient,
69};
70
71struct WsMessageContext {
73 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
74 instruments: Arc<DashMap<Ustr, InstrumentAny>>,
75 order_books: Arc<DashMap<InstrumentId, OrderBook>>,
76 last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
77 ws_client: DydxWebSocketClient,
78 active_orderbook_subs: Arc<DashMap<InstrumentId, ()>>,
79 active_trade_subs: Arc<DashMap<InstrumentId, ()>>,
80 active_bar_subs: Arc<DashMap<(InstrumentId, String), BarType>>,
81 incomplete_bars: Arc<DashMap<BarType, Bar>>,
82}
83
84#[derive(Debug)]
92pub struct DydxDataClient {
93 client_id: ClientId,
95 config: DydxDataClientConfig,
97 http_client: DydxHttpClient,
99 ws_client: DydxWebSocketClient,
101 is_connected: AtomicBool,
103 cancellation_token: CancellationToken,
105 tasks: Vec<JoinHandle<()>>,
107 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
109 instruments: Arc<DashMap<Ustr, InstrumentAny>>,
111 clock: &'static AtomicTime,
113 order_books: Arc<DashMap<InstrumentId, OrderBook>>,
115 last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
117 incomplete_bars: Arc<DashMap<BarType, Bar>>,
121 bar_type_mappings: Arc<DashMap<String, BarType>>,
125 active_orderbook_subs: Arc<DashMap<InstrumentId, ()>>,
127 active_trade_subs: Arc<DashMap<InstrumentId, ()>>,
129 active_bar_subs: Arc<DashMap<(InstrumentId, String), BarType>>,
131}
132
133impl DydxDataClient {
134 fn map_bar_spec_to_resolution(spec: &BarSpecification) -> anyhow::Result<&'static str> {
140 match spec.step.get() {
141 1 => match spec.aggregation {
142 BarAggregation::Minute => Ok("1MIN"),
143 BarAggregation::Hour => Ok("1HOUR"),
144 BarAggregation::Day => Ok("1DAY"),
145 _ => anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation),
146 },
147 5 if spec.aggregation == BarAggregation::Minute => Ok("5MINS"),
148 15 if spec.aggregation == BarAggregation::Minute => Ok("15MINS"),
149 30 if spec.aggregation == BarAggregation::Minute => Ok("30MINS"),
150 4 if spec.aggregation == BarAggregation::Hour => Ok("4HOURS"),
151 step => anyhow::bail!(
152 "Unsupported bar step: {step} with aggregation {:?}",
153 spec.aggregation
154 ),
155 }
156 }
157
158 pub fn new(
164 client_id: ClientId,
165 config: DydxDataClientConfig,
166 http_client: DydxHttpClient,
167 ws_client: DydxWebSocketClient,
168 ) -> anyhow::Result<Self> {
169 let clock = get_atomic_clock_realtime();
170 let data_sender = get_data_event_sender();
171
172 let instruments_cache = http_client.instruments().clone();
174
175 Ok(Self {
176 client_id,
177 config,
178 http_client,
179 ws_client,
180 is_connected: AtomicBool::new(false),
181 cancellation_token: CancellationToken::new(),
182 tasks: Vec::new(),
183 data_sender,
184 instruments: instruments_cache,
185 clock,
186 order_books: Arc::new(DashMap::new()),
187 last_quotes: Arc::new(DashMap::new()),
188 incomplete_bars: Arc::new(DashMap::new()),
189 bar_type_mappings: Arc::new(DashMap::new()),
190 active_orderbook_subs: Arc::new(DashMap::new()),
191 active_trade_subs: Arc::new(DashMap::new()),
192 active_bar_subs: Arc::new(DashMap::new()),
193 })
194 }
195
196 #[must_use]
198 pub fn venue(&self) -> Venue {
199 *DYDX_VENUE
200 }
201
202 #[must_use]
204 pub fn is_connected(&self) -> bool {
205 self.is_connected.load(Ordering::Relaxed)
206 }
207
208 fn spawn_ws<F>(&self, fut: F, context: &'static str)
212 where
213 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
214 {
215 get_runtime().spawn(async move {
216 if let Err(e) = fut.await {
217 log::error!("{context}: {e:?}");
218 }
219 });
220 }
221
222 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
237 log::info!("Bootstrapping dYdX instruments");
238
239 self.http_client
241 .fetch_and_cache_instruments()
242 .await
243 .context("failed to load instruments from dYdX")?;
244
245 let instruments: Vec<InstrumentAny> = self
246 .http_client
247 .instruments()
248 .iter()
249 .map(|entry| entry.value().clone())
250 .collect();
251
252 if instruments.is_empty() {
253 log::warn!("No dYdX instruments were loaded");
254 return Ok(instruments);
255 }
256
257 log::info!("Loaded {} dYdX instruments", instruments.len());
258
259 self.ws_client.cache_instruments(instruments.clone());
260
261 Ok(instruments)
262 }
263}
264
265#[async_trait::async_trait(?Send)]
266impl DataClient for DydxDataClient {
267 fn client_id(&self) -> ClientId {
268 self.client_id
269 }
270
271 fn venue(&self) -> Option<Venue> {
272 Some(*DYDX_VENUE)
273 }
274
275 fn start(&mut self) -> anyhow::Result<()> {
276 log::info!(
277 "Starting dYdX data client: client_id={}, is_testnet={}",
278 self.client_id,
279 self.http_client.is_testnet()
280 );
281 Ok(())
282 }
283
284 fn stop(&mut self) -> anyhow::Result<()> {
285 log::info!("Stopping dYdX data client {}", self.client_id);
286 self.cancellation_token.cancel();
287 self.is_connected.store(false, Ordering::Relaxed);
288 Ok(())
289 }
290
291 fn reset(&mut self) -> anyhow::Result<()> {
292 log::debug!("Resetting dYdX data client {}", self.client_id);
293 self.is_connected.store(false, Ordering::Relaxed);
294 self.cancellation_token = CancellationToken::new();
295 self.tasks.clear();
296 Ok(())
297 }
298
299 fn dispose(&mut self) -> anyhow::Result<()> {
300 log::debug!("Disposing dYdX data client {}", self.client_id);
301 self.stop()
302 }
303
304 async fn connect(&mut self) -> anyhow::Result<()> {
305 if self.is_connected() {
306 return Ok(());
307 }
308
309 log::info!("Connecting dYdX data client");
310
311 self.bootstrap_instruments().await?;
313
314 self.ws_client
316 .connect()
317 .await
318 .context("failed to connect dYdX websocket")?;
319
320 self.ws_client
321 .subscribe_markets()
322 .await
323 .context("failed to subscribe to markets channel")?;
324
325 if let Some(rx) = self.ws_client.take_receiver() {
327 let data_tx = self.data_sender.clone();
328 let instruments = self.instruments.clone();
329 let order_books = self.order_books.clone();
330 let last_quotes = self.last_quotes.clone();
331 let ws_client = self.ws_client.clone();
332 let active_orderbook_subs = self.active_orderbook_subs.clone();
333 let active_trade_subs = self.active_trade_subs.clone();
334 let active_bar_subs = self.active_bar_subs.clone();
335 let incomplete_bars = self.incomplete_bars.clone();
336
337 let ctx = WsMessageContext {
338 data_sender: data_tx,
339 instruments,
340 order_books,
341 last_quotes,
342 ws_client,
343 active_orderbook_subs,
344 active_trade_subs,
345 active_bar_subs,
346 incomplete_bars,
347 };
348
349 let task = get_runtime().spawn(async move {
350 let mut rx = rx;
351 while let Some(msg) = rx.recv().await {
352 Self::handle_ws_message(msg, &ctx);
353 }
354 });
355 self.tasks.push(task);
356 } else {
357 log::warn!("No inbound WS receiver available after connect");
358 }
359
360 self.start_orderbook_refresh_task()?;
362
363 self.start_instrument_refresh_task()?;
365
366 self.is_connected.store(true, Ordering::Relaxed);
367 log::info!("Connected dYdX data client");
368
369 Ok(())
370 }
371
372 async fn disconnect(&mut self) -> anyhow::Result<()> {
373 if !self.is_connected() {
374 return Ok(());
375 }
376
377 log::info!("Disconnecting dYdX data client");
378
379 self.ws_client
380 .disconnect()
381 .await
382 .context("failed to disconnect dYdX websocket")?;
383
384 self.is_connected.store(false, Ordering::Relaxed);
385 log::info!("Disconnected dYdX data client");
386
387 Ok(())
388 }
389
390 fn is_connected(&self) -> bool {
391 self.is_connected.load(Ordering::Relaxed)
392 }
393
394 fn is_disconnected(&self) -> bool {
395 !self.is_connected()
396 }
397
398 fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
399 log::debug!("unsubscribe_instruments: dYdX markets channel is global; no-op");
403 Ok(())
404 }
405
406 fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
407 log::debug!("unsubscribe_instrument: dYdX markets channel is global; no-op");
410 Ok(())
411 }
412
413 fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
414 log::debug!("subscribe_instruments: dYdX auto-subscribes via markets channel");
417 Ok(())
418 }
419
420 fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
421 log::debug!("subscribe_instrument: dYdX auto-subscribes via markets channel");
424 Ok(())
425 }
426
427 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
428 let ws = self.ws_client.clone();
429 let instrument_id = cmd.instrument_id;
430
431 self.active_trade_subs.insert(instrument_id, ());
433
434 self.spawn_ws(
435 async move {
436 ws.subscribe_trades(instrument_id)
437 .await
438 .context("trade subscription")
439 },
440 "dYdX trade subscription",
441 );
442
443 Ok(())
444 }
445
446 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
447 if cmd.book_type != BookType::L2_MBP {
448 anyhow::bail!(
449 "dYdX only supports L2_MBP order book deltas, received {:?}",
450 cmd.book_type
451 );
452 }
453
454 self.ensure_order_book(cmd.instrument_id, BookType::L2_MBP);
456
457 self.active_orderbook_subs.insert(cmd.instrument_id, ());
459
460 let ws = self.ws_client.clone();
461 let instrument_id = cmd.instrument_id;
462
463 self.spawn_ws(
464 async move {
465 ws.subscribe_orderbook(instrument_id)
466 .await
467 .context("orderbook subscription")
468 },
469 "dYdX orderbook subscription",
470 );
471
472 Ok(())
473 }
474
475 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
476 log::debug!(
479 "subscribe_quotes for {}: delegating to subscribe_book_deltas (no native quotes channel)",
480 cmd.instrument_id
481 );
482
483 let book_cmd = SubscribeBookDeltas {
485 client_id: cmd.client_id,
486 venue: cmd.venue,
487 instrument_id: cmd.instrument_id,
488 book_type: BookType::L2_MBP,
489 depth: None,
490 managed: false,
491 correlation_id: None,
492 params: None,
493 command_id: cmd.command_id,
494 ts_init: cmd.ts_init,
495 };
496
497 self.subscribe_book_deltas(&book_cmd)
498 }
499
500 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
501 let ws = self.ws_client.clone();
502 let instrument_id = cmd.bar_type.instrument_id();
503 let spec = cmd.bar_type.spec();
504
505 let resolution = Self::map_bar_spec_to_resolution(&spec)?;
507
508 let bar_type = cmd.bar_type;
510 self.active_bar_subs
511 .insert((instrument_id, resolution.to_string()), bar_type);
512
513 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
515 let topic = format!("{ticker}/{resolution}");
516 self.bar_type_mappings.insert(topic.clone(), bar_type);
517
518 self.spawn_ws(
519 async move {
520 if let Err(e) =
522 ws.send_command(crate::websocket::handler::HandlerCommand::RegisterBarType {
523 topic,
524 bar_type,
525 })
526 {
527 anyhow::bail!("Failed to register bar type: {e}");
528 }
529
530 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
532
533 ws.subscribe_candles(instrument_id, resolution)
534 .await
535 .context("candles subscription")
536 },
537 "dYdX candles subscription",
538 );
539
540 Ok(())
541 }
542
543 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
544 self.active_trade_subs.remove(&cmd.instrument_id);
546
547 let ws = self.ws_client.clone();
548 let instrument_id = cmd.instrument_id;
549
550 self.spawn_ws(
551 async move {
552 ws.unsubscribe_trades(instrument_id)
553 .await
554 .context("trade unsubscription")
555 },
556 "dYdX trade unsubscription",
557 );
558
559 Ok(())
560 }
561
562 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
563 self.active_orderbook_subs.remove(&cmd.instrument_id);
565
566 let ws = self.ws_client.clone();
567 let instrument_id = cmd.instrument_id;
568
569 self.spawn_ws(
570 async move {
571 ws.unsubscribe_orderbook(instrument_id)
572 .await
573 .context("orderbook unsubscription")
574 },
575 "dYdX orderbook unsubscription",
576 );
577
578 Ok(())
579 }
580
581 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
582 log::debug!(
584 "unsubscribe_quotes for {}: delegating to unsubscribe_book_deltas (no native quotes channel)",
585 cmd.instrument_id
586 );
587
588 let book_cmd = UnsubscribeBookDeltas {
589 instrument_id: cmd.instrument_id,
590 client_id: cmd.client_id,
591 venue: cmd.venue,
592 command_id: cmd.command_id,
593 ts_init: cmd.ts_init,
594 correlation_id: None,
595 params: cmd.params.clone(),
596 };
597
598 self.unsubscribe_book_deltas(&book_cmd)
599 }
600
601 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
602 let ws = self.ws_client.clone();
603 let instrument_id = cmd.bar_type.instrument_id();
604 let spec = cmd.bar_type.spec();
605
606 let resolution = match spec.step.get() {
608 1 => match spec.aggregation {
609 BarAggregation::Minute => "1MIN",
610 BarAggregation::Hour => "1HOUR",
611 BarAggregation::Day => "1DAY",
612 _ => {
613 anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation);
614 }
615 },
616 5 => {
617 if spec.aggregation == BarAggregation::Minute {
618 "5MINS"
619 } else {
620 anyhow::bail!("Unsupported 5-step aggregation: {:?}", spec.aggregation);
621 }
622 }
623 15 => {
624 if spec.aggregation == BarAggregation::Minute {
625 "15MINS"
626 } else {
627 anyhow::bail!("Unsupported 15-step aggregation: {:?}", spec.aggregation);
628 }
629 }
630 30 => {
631 if spec.aggregation == BarAggregation::Minute {
632 "30MINS"
633 } else {
634 anyhow::bail!("Unsupported 30-step aggregation: {:?}", spec.aggregation);
635 }
636 }
637 4 => {
638 if spec.aggregation == BarAggregation::Hour {
639 "4HOURS"
640 } else {
641 anyhow::bail!("Unsupported 4-step aggregation: {:?}", spec.aggregation);
642 }
643 }
644 step => {
645 anyhow::bail!("Unsupported bar step: {step}");
646 }
647 };
648
649 self.active_bar_subs
651 .remove(&(instrument_id, resolution.to_string()));
652
653 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
655 let topic = format!("{ticker}/{resolution}");
656 self.bar_type_mappings.remove(&topic);
657
658 if let Err(e) =
659 ws.send_command(crate::websocket::handler::HandlerCommand::UnregisterBarType { topic })
660 {
661 log::warn!("Failed to unregister bar type: {e}");
662 }
663
664 self.spawn_ws(
665 async move {
666 ws.unsubscribe_candles(instrument_id, resolution)
667 .await
668 .context("candles unsubscription")
669 },
670 "dYdX candles unsubscription",
671 );
672
673 Ok(())
674 }
675
676 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
677 let instruments_cache = self.instruments.clone();
678 let sender = self.data_sender.clone();
679 let http = self.http_client.clone();
680 let instrument_id = request.instrument_id;
681 let request_id = request.request_id;
682 let client_id = request.client_id.unwrap_or(self.client_id);
683 let start = request.start;
684 let end = request.end;
685 let params = request.params.clone();
686 let clock = self.clock;
687 let start_nanos = datetime_to_unix_nanos(start);
688 let end_nanos = datetime_to_unix_nanos(end);
689
690 get_runtime().spawn(async move {
691 let symbol = Ustr::from(instrument_id.symbol.as_str());
693 let instrument = if let Some(cached) = instruments_cache.get(&symbol) {
694 log::debug!("Found instrument {instrument_id} in cache");
695 Some(cached.clone())
696 } else {
697 log::debug!("Instrument {instrument_id} not in cache, fetching from API");
699 match http.request_instruments(None, None, None).await {
700 Ok(instruments) => {
701 for inst in &instruments {
703 upsert_instrument(&instruments_cache, inst.clone());
704 }
705 instruments.into_iter().find(|i| i.id() == instrument_id)
707 }
708 Err(e) => {
709 log::error!("Failed to fetch instruments from dYdX: {e:?}");
710 None
711 }
712 }
713 };
714
715 if let Some(inst) = instrument {
716 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
717 request_id,
718 client_id,
719 instrument_id,
720 inst,
721 start_nanos,
722 end_nanos,
723 clock.get_time_ns(),
724 params,
725 )));
726
727 if let Err(e) = sender.send(DataEvent::Response(response)) {
728 log::error!("Failed to send instrument response: {e}");
729 }
730 } else {
731 log::error!("Instrument {instrument_id} not found");
732 }
733 });
734
735 Ok(())
736 }
737
738 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
739 let http = self.http_client.clone();
740 let sender = self.data_sender.clone();
741 let instruments_cache = self.instruments.clone();
742 let request_id = request.request_id;
743 let client_id = request.client_id.unwrap_or(self.client_id);
744 let venue = self.venue();
745 let start = request.start;
746 let end = request.end;
747 let params = request.params.clone();
748 let clock = self.clock;
749 let start_nanos = datetime_to_unix_nanos(start);
750 let end_nanos = datetime_to_unix_nanos(end);
751
752 get_runtime().spawn(async move {
753 match http.request_instruments(None, None, None).await {
754 Ok(instruments) => {
755 log::info!("Fetched {} instruments from dYdX", instruments.len());
756
757 for instrument in &instruments {
759 upsert_instrument(&instruments_cache, instrument.clone());
760 }
761
762 let response = DataResponse::Instruments(InstrumentsResponse::new(
763 request_id,
764 client_id,
765 venue,
766 instruments,
767 start_nanos,
768 end_nanos,
769 clock.get_time_ns(),
770 params,
771 ));
772
773 if let Err(e) = sender.send(DataEvent::Response(response)) {
774 log::error!("Failed to send instruments response: {e}");
775 }
776 }
777 Err(e) => {
778 log::error!("Failed to fetch instruments from dYdX: {e:?}");
779
780 let response = DataResponse::Instruments(InstrumentsResponse::new(
782 request_id,
783 client_id,
784 venue,
785 Vec::new(),
786 start_nanos,
787 end_nanos,
788 clock.get_time_ns(),
789 params,
790 ));
791
792 if let Err(e) = sender.send(DataEvent::Response(response)) {
793 log::error!("Failed to send empty instruments response: {e}");
794 }
795 }
796 }
797 });
798
799 Ok(())
800 }
801
802 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
803 let http = self.http_client.clone();
804 let instruments = self.instruments.clone();
805 let sender = self.data_sender.clone();
806 let instrument_id = request.instrument_id;
807 let start = request.start;
808 let end = request.end;
809 let limit = request.limit.map(|n| n.get() as u32);
810 let request_id = request.request_id;
811 let client_id = request.client_id.unwrap_or(self.client_id);
812 let params = request.params.clone();
813 let clock = self.clock;
814 let start_nanos = datetime_to_unix_nanos(start);
815 let end_nanos = datetime_to_unix_nanos(end);
816
817 get_runtime().spawn(async move {
818 let ticker = instrument_id
822 .symbol
823 .as_str()
824 .trim_end_matches("-PERP")
825 .to_string();
826
827 let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
829 Some(inst) => inst.clone(),
830 None => {
831 log::error!(
832 "request_trades: instrument {instrument_id} not found in cache; cannot convert trades"
833 );
834 let ts_now = clock.get_time_ns();
835 let response = DataResponse::Trades(TradesResponse::new(
836 request_id,
837 client_id,
838 instrument_id,
839 Vec::new(),
840 start_nanos,
841 end_nanos,
842 ts_now,
843 params,
844 ));
845 if let Err(e) = sender.send(DataEvent::Response(response)) {
846 log::error!("Failed to send empty trades response: {e}");
847 }
848 return;
849 }
850 };
851
852 let price_precision = instrument.price_precision();
853 let size_precision = instrument.size_precision();
854
855 match http
856 .inner
857 .get_trades(&ticker, limit)
858 .await
859 .context("failed to request trades from dYdX")
860 {
861 Ok(trades_response) => {
862 let mut ticks = Vec::new();
863
864 for trade in trades_response.trades {
865 let aggressor_side = match trade.side {
866 OrderSide::Buy => AggressorSide::Buyer,
867 OrderSide::Sell => AggressorSide::Seller,
868 _ => continue, };
870
871 let price = match Price::from_decimal_dp(trade.price, price_precision) {
872 Ok(p) => p,
873 Err(e) => {
874 log::warn!(
875 "request_trades: failed to convert price for trade {}: {e}",
876 trade.id
877 );
878 continue;
879 }
880 };
881
882 let size = match Quantity::from_decimal_dp(trade.size, size_precision) {
883 Ok(q) => q,
884 Err(e) => {
885 log::warn!(
886 "request_trades: failed to convert size for trade {}: {e}",
887 trade.id
888 );
889 continue;
890 }
891 };
892
893 let ts_event = match trade.created_at.timestamp_nanos_opt() {
894 Some(ns) if ns >= 0 => UnixNanos::from(ns as u64),
895 _ => {
896 log::warn!(
897 "request_trades: timestamp out of range for trade {}",
898 trade.id
899 );
900 continue;
901 }
902 };
903
904 if let Some(start_ts) = start_nanos
906 && ts_event < start_ts
907 {
908 continue;
909 }
910 if let Some(end_ts) = end_nanos
911 && ts_event > end_ts
912 {
913 continue;
914 }
915
916 let tick = TradeTick::new(
917 instrument_id,
918 price,
919 size,
920 aggressor_side,
921 TradeId::new(&trade.id),
922 ts_event,
923 clock.get_time_ns(),
924 );
925 ticks.push(tick);
926 }
927
928 let response = DataResponse::Trades(TradesResponse::new(
929 request_id,
930 client_id,
931 instrument_id,
932 ticks,
933 start_nanos,
934 end_nanos,
935 clock.get_time_ns(),
936 params,
937 ));
938
939 if let Err(e) = sender.send(DataEvent::Response(response)) {
940 log::error!("Failed to send trades response: {e}");
941 }
942 }
943 Err(e) => {
944 log::error!("Trade request failed for {instrument_id}: {e:?}");
945
946 let response = DataResponse::Trades(TradesResponse::new(
947 request_id,
948 client_id,
949 instrument_id,
950 Vec::new(),
951 start_nanos,
952 end_nanos,
953 clock.get_time_ns(),
954 params,
955 ));
956
957 if let Err(e) = sender.send(DataEvent::Response(response)) {
958 log::error!("Failed to send empty trades response: {e}");
959 }
960 }
961 }
962 });
963
964 Ok(())
965 }
966
967 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
968 const DYDX_MAX_BARS_PER_REQUEST: u32 = 1_000;
969
970 let bar_type = request.bar_type;
971 let spec = bar_type.spec();
972
973 if bar_type.aggregation_source() != AggregationSource::External {
975 anyhow::bail!(
976 "dYdX only supports EXTERNAL aggregation, got {:?}",
977 bar_type.aggregation_source()
978 );
979 }
980
981 if spec.price_type != PriceType::Last {
982 anyhow::bail!(
983 "dYdX only supports LAST price type, got {:?}",
984 spec.price_type
985 );
986 }
987
988 let resolution = match spec.step.get() {
990 1 => match spec.aggregation {
991 BarAggregation::Minute => "1MIN",
992 BarAggregation::Hour => "1HOUR",
993 BarAggregation::Day => "1DAY",
994 _ => {
995 anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation);
996 }
997 },
998 5 if spec.aggregation == BarAggregation::Minute => "5MINS",
999 15 if spec.aggregation == BarAggregation::Minute => "15MINS",
1000 30 if spec.aggregation == BarAggregation::Minute => "30MINS",
1001 4 if spec.aggregation == BarAggregation::Hour => "4HOURS",
1002 step => {
1003 anyhow::bail!("Unsupported bar step: {step}");
1004 }
1005 };
1006
1007 let http = self.http_client.clone();
1008 let instruments = self.instruments.clone();
1009 let sender = self.data_sender.clone();
1010 let instrument_id = bar_type.instrument_id();
1011 let symbol = instrument_id
1013 .symbol
1014 .as_str()
1015 .trim_end_matches("-PERP")
1016 .to_string();
1017 let request_id = request.request_id;
1018 let client_id = request.client_id.unwrap_or(self.client_id);
1019 let params = request.params.clone();
1020 let clock = self.clock;
1021
1022 let start = request.start;
1023 let end = request.end;
1024 let overall_limit = request.limit.map(|n| n.get() as u32);
1025
1026 let start_nanos = datetime_to_unix_nanos(start);
1028 let end_nanos = datetime_to_unix_nanos(end);
1029
1030 let resolution_enum = match resolution {
1032 "1MIN" => crate::common::enums::DydxCandleResolution::OneMinute,
1033 "5MINS" => crate::common::enums::DydxCandleResolution::FiveMinutes,
1034 "15MINS" => crate::common::enums::DydxCandleResolution::FifteenMinutes,
1035 "30MINS" => crate::common::enums::DydxCandleResolution::ThirtyMinutes,
1036 "1HOUR" => crate::common::enums::DydxCandleResolution::OneHour,
1037 "4HOURS" => crate::common::enums::DydxCandleResolution::FourHours,
1038 "1DAY" => crate::common::enums::DydxCandleResolution::OneDay,
1039 _ => {
1040 anyhow::bail!("Unsupported resolution: {resolution}");
1041 }
1042 };
1043
1044 get_runtime().spawn(async move {
1045 let bar_secs: i64 = match spec.aggregation {
1047 BarAggregation::Minute => spec.step.get() as i64 * 60,
1048 BarAggregation::Hour => spec.step.get() as i64 * 3_600,
1049 BarAggregation::Day => spec.step.get() as i64 * 86_400,
1050 _ => {
1051 log::error!(
1052 "Unsupported aggregation for request_bars: {:?}",
1053 spec.aggregation
1054 );
1055 return;
1056 }
1057 };
1058
1059 let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
1061 Some(inst) => inst.clone(),
1062 None => {
1063 log::error!(
1064 "request_bars: instrument {instrument_id} not found in cache; cannot convert candles"
1065 );
1066 let ts_now = clock.get_time_ns();
1067 let response = DataResponse::Bars(BarsResponse::new(
1068 request_id,
1069 client_id,
1070 bar_type,
1071 Vec::new(),
1072 start_nanos,
1073 end_nanos,
1074 ts_now,
1075 params,
1076 ));
1077 if let Err(e) = sender.send(DataEvent::Response(response)) {
1078 log::error!("Failed to send empty bars response: {e}");
1079 }
1080 return;
1081 }
1082 };
1083
1084 let price_precision = instrument.price_precision();
1085 let size_precision = instrument.size_precision();
1086
1087 let mut all_bars: Vec<Bar> = Vec::new();
1088
1089 let (range_start, range_end) = match (start, end) {
1091 (Some(s), Some(e)) if e > s => (s, e),
1092 _ => {
1093 let limit = overall_limit.unwrap_or(DYDX_MAX_BARS_PER_REQUEST);
1094 match http
1095 .inner
1096 .get_candles(&symbol, resolution_enum, Some(limit), None, None)
1097 .await
1098 {
1099 Ok(candles_response) => {
1100 log::debug!(
1101 "request_bars fetched {} candles without explicit date range",
1102 candles_response.candles.len()
1103 );
1104
1105 for candle in &candles_response.candles {
1106 match Self::candle_to_bar(
1107 candle,
1108 bar_type,
1109 price_precision,
1110 size_precision,
1111 bar_secs,
1112 clock,
1113 ) {
1114 Ok(bar) => all_bars.push(bar),
1115 Err(e) => {
1116 log::warn!(
1117 "Failed to convert dYdX candle to bar for {instrument_id}: {e}"
1118 );
1119 }
1120 }
1121 }
1122
1123 let current_time_ns = clock.get_time_ns();
1124 all_bars.retain(|bar| bar.ts_event < current_time_ns);
1125
1126 let response = DataResponse::Bars(BarsResponse::new(
1127 request_id,
1128 client_id,
1129 bar_type,
1130 all_bars,
1131 start_nanos,
1132 end_nanos,
1133 current_time_ns,
1134 params,
1135 ));
1136
1137 if let Err(e) = sender.send(DataEvent::Response(response)) {
1138 log::error!("Failed to send bars response: {e}");
1139 }
1140 }
1141 Err(e) => {
1142 log::error!(
1143 "Failed to request candles for {symbol} without date range: {e:?}"
1144 );
1145 }
1146 }
1147 return;
1148 }
1149 };
1150
1151 let total_secs = (range_end - range_start).num_seconds().max(0);
1153 let expected_bars = (total_secs / bar_secs).max(1) as u64;
1154
1155 log::debug!(
1156 "request_bars range {range_start:?} -> {range_end:?}, expected_bars ~= {expected_bars}"
1157 );
1158
1159 let mut remaining = overall_limit.unwrap_or(u32::MAX);
1160
1161 let bars_per_call = DYDX_MAX_BARS_PER_REQUEST.min(remaining);
1163 let chunk_duration = chrono::Duration::seconds(bar_secs * bars_per_call as i64);
1164
1165 let mut chunk_start = range_start;
1166
1167 while chunk_start < range_end && remaining > 0 {
1168 let mut chunk_end = chunk_start + chunk_duration;
1169 if chunk_end > range_end {
1170 chunk_end = range_end;
1171 }
1172
1173 let per_call_limit = remaining.min(DYDX_MAX_BARS_PER_REQUEST);
1174
1175 log::debug!(
1176 "request_bars chunk: {chunk_start} -> {chunk_end}, limit={per_call_limit}"
1177 );
1178
1179 match http
1180 .inner
1181 .get_candles(
1182 &symbol,
1183 resolution_enum,
1184 Some(per_call_limit),
1185 Some(chunk_start),
1186 Some(chunk_end),
1187 )
1188 .await
1189 {
1190 Ok(candles_response) => {
1191 let count = candles_response.candles.len() as u32;
1192
1193 if count == 0 {
1194 break;
1196 }
1197
1198 for candle in &candles_response.candles {
1200 match Self::candle_to_bar(
1201 candle,
1202 bar_type,
1203 price_precision,
1204 size_precision,
1205 bar_secs,
1206 clock,
1207 ) {
1208 Ok(bar) => all_bars.push(bar),
1209 Err(e) => {
1210 log::warn!(
1211 "Failed to convert dYdX candle to bar for {instrument_id}: {e}"
1212 );
1213 }
1214 }
1215 }
1216
1217 if remaining <= count {
1218 break;
1219 } else {
1220 remaining -= count;
1221 }
1222 }
1223 Err(e) => {
1224 log::error!(
1225 "Failed to request candles for {symbol} in chunk {chunk_start:?} -> {chunk_end:?}: {e:?}"
1226 );
1227 break;
1228 }
1229 }
1230
1231 chunk_start += chunk_duration;
1232 }
1233
1234 log::debug!("request_bars completed partitioned fetch for {bar_type}");
1235
1236 let current_time_ns = clock.get_time_ns();
1238 all_bars.retain(|bar| bar.ts_event < current_time_ns);
1239
1240 log::debug!(
1241 "request_bars filtered to {} completed bars (current_time_ns={})",
1242 all_bars.len(),
1243 current_time_ns
1244 );
1245
1246 let response = DataResponse::Bars(BarsResponse::new(
1247 request_id,
1248 client_id,
1249 bar_type,
1250 all_bars,
1251 start_nanos,
1252 end_nanos,
1253 current_time_ns,
1254 params,
1255 ));
1256
1257 if let Err(e) = sender.send(DataEvent::Response(response)) {
1258 log::error!("Failed to send bars response: {e}");
1259 }
1260 });
1261
1262 Ok(())
1263 }
1264}
1265
1266fn upsert_instrument(cache: &Arc<DashMap<Ustr, InstrumentAny>>, instrument: InstrumentAny) {
1268 let symbol = Ustr::from(instrument.id().symbol.as_str());
1269 cache.insert(symbol, instrument);
1270}
1271
1272impl DydxDataClient {
1273 pub fn start_instrument_refresh_task(&mut self) -> anyhow::Result<()> {
1282 let interval_secs = match self.config.instrument_refresh_interval_secs {
1283 Some(secs) if secs > 0 => secs,
1284 _ => {
1285 log::info!("Instrument refresh disabled (interval not configured)");
1286 return Ok(());
1287 }
1288 };
1289
1290 let interval = Duration::from_secs(interval_secs);
1291 let http_client = self.http_client.clone();
1292 let instruments_cache = self.instruments.clone();
1293 let ws_client = self.ws_client.clone();
1294 let cancellation_token = self.cancellation_token.clone();
1295
1296 log::info!("Starting instrument refresh task (interval: {interval_secs}s)");
1297
1298 let task = get_runtime().spawn(async move {
1299 let mut interval_timer = tokio::time::interval(interval);
1300 interval_timer.tick().await; loop {
1303 tokio::select! {
1304 () = cancellation_token.cancelled() => {
1305 log::info!("Instrument refresh task cancelled");
1306 break;
1307 }
1308 _ = interval_timer.tick() => {
1309 log::debug!("Refreshing instruments");
1310
1311 match http_client.fetch_and_cache_instruments().await {
1313 Ok(()) => {
1314 let instruments: Vec<_> = http_client
1315 .instruments()
1316 .iter()
1317 .map(|entry| entry.value().clone())
1318 .collect();
1319
1320 log::debug!("Refreshed {} instruments", instruments.len());
1321
1322 for instrument in &instruments {
1323 upsert_instrument(&instruments_cache, instrument.clone());
1324 }
1325
1326 ws_client.cache_instruments(instruments);
1328 }
1329 Err(e) => {
1330 log::error!("Failed to refresh instruments: {e}");
1331 }
1332 }
1333 }
1334 }
1335 }
1336 });
1337
1338 self.tasks.push(task);
1339 Ok(())
1340 }
1341
1342 fn start_orderbook_refresh_task(&mut self) -> anyhow::Result<()> {
1352 let interval_secs = match self.config.orderbook_refresh_interval_secs {
1353 Some(secs) if secs > 0 => secs,
1354 _ => {
1355 log::info!("Orderbook snapshot refresh disabled (interval not configured)");
1356 return Ok(());
1357 }
1358 };
1359
1360 let interval = Duration::from_secs(interval_secs);
1361 let http_client = self.http_client.clone();
1362 let instruments = self.instruments.clone();
1363 let order_books = self.order_books.clone();
1364 let active_subs = self.active_orderbook_subs.clone();
1365 let cancellation_token = self.cancellation_token.clone();
1366 let data_sender = self.data_sender.clone();
1367
1368 log::info!("Starting orderbook snapshot refresh task (interval: {interval_secs}s)");
1369
1370 let task = get_runtime().spawn(async move {
1371 let mut interval_timer = tokio::time::interval(interval);
1372 interval_timer.tick().await; loop {
1375 tokio::select! {
1376 () = cancellation_token.cancelled() => {
1377 log::info!("Orderbook refresh task cancelled");
1378 break;
1379 }
1380 _ = interval_timer.tick() => {
1381 let active_instruments: Vec<InstrumentId> = active_subs
1382 .iter()
1383 .map(|entry| *entry.key())
1384 .collect();
1385
1386 if active_instruments.is_empty() {
1387 log::debug!("No active orderbook subscriptions to refresh");
1388 continue;
1389 }
1390
1391 log::debug!(
1392 "Refreshing {} orderbook snapshots",
1393 active_instruments.len()
1394 );
1395
1396 for instrument_id in active_instruments {
1397 let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
1399 Some(inst) => inst.clone(),
1400 None => {
1401 log::warn!(
1402 "Cannot refresh orderbook: no instrument for {instrument_id}"
1403 );
1404 continue;
1405 }
1406 };
1407
1408 let symbol = instrument_id.symbol.as_str().trim_end_matches("-PERP");
1410 let snapshot_result = http_client.inner.get_orderbook(symbol).await;
1411
1412 let snapshot = match snapshot_result {
1413 Ok(s) => s,
1414 Err(e) => {
1415 log::error!(
1416 "Failed to fetch orderbook snapshot for {instrument_id}: {e}"
1417 );
1418 continue;
1419 }
1420 };
1421
1422 let deltas_result = Self::parse_orderbook_snapshot(
1424 instrument_id,
1425 &snapshot,
1426 &instrument,
1427 );
1428
1429 let deltas = match deltas_result {
1430 Ok(d) => d,
1431 Err(e) => {
1432 log::error!(
1433 "Failed to parse orderbook snapshot for {instrument_id}: {e}"
1434 );
1435 continue;
1436 }
1437 };
1438
1439 if let Some(mut book) = order_books.get_mut(&instrument_id) {
1441 if let Err(e) = book.apply_deltas(&deltas) {
1442 log::error!(
1443 "Failed to apply orderbook snapshot for {instrument_id}: {e}"
1444 );
1445 continue;
1446 }
1447
1448 log::debug!(
1449 "Refreshed orderbook snapshot for {} (bid={:?}, ask={:?})",
1450 instrument_id,
1451 book.best_bid_price(),
1452 book.best_ask_price()
1453 );
1454 }
1455
1456 let data = NautilusData::from(OrderBookDeltas_API::new(deltas));
1458 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1459 log::error!("Failed to emit orderbook snapshot: {e}");
1460 }
1461 }
1462 }
1463 }
1464 }
1465 });
1466
1467 self.tasks.push(task);
1468 Ok(())
1469 }
1470
1471 fn parse_orderbook_snapshot(
1475 instrument_id: InstrumentId,
1476 snapshot: &crate::http::models::OrderbookResponse,
1477 instrument: &InstrumentAny,
1478 ) -> anyhow::Result<OrderBookDeltas> {
1479 let ts_init = get_atomic_clock_realtime().get_time_ns();
1480 let mut deltas = Vec::new();
1481
1482 deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init));
1484
1485 let price_precision = instrument.price_precision();
1486 let size_precision = instrument.size_precision();
1487
1488 let bids_len = snapshot.bids.len();
1489 let asks_len = snapshot.asks.len();
1490
1491 for (idx, bid) in snapshot.bids.iter().enumerate() {
1493 let is_last = idx == bids_len - 1 && asks_len == 0;
1494 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1495
1496 let price = Price::from_decimal_dp(bid.price, price_precision)
1497 .context("failed to parse bid price")?;
1498 let size = Quantity::from_decimal_dp(bid.size, size_precision)
1499 .context("failed to parse bid size")?;
1500
1501 let order = BookOrder::new(OrderSide::Buy, price, size, 0);
1502 deltas.push(OrderBookDelta::new(
1503 instrument_id,
1504 BookAction::Add,
1505 order,
1506 flags,
1507 0,
1508 ts_init,
1509 ts_init,
1510 ));
1511 }
1512
1513 for (idx, ask) in snapshot.asks.iter().enumerate() {
1515 let is_last = idx == asks_len - 1;
1516 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1517
1518 let price = Price::from_decimal_dp(ask.price, price_precision)
1519 .context("failed to parse ask price")?;
1520 let size = Quantity::from_decimal_dp(ask.size, size_precision)
1521 .context("failed to parse ask size")?;
1522
1523 let order = BookOrder::new(OrderSide::Sell, price, size, 0);
1524 deltas.push(OrderBookDelta::new(
1525 instrument_id,
1526 BookAction::Add,
1527 order,
1528 flags,
1529 0,
1530 ts_init,
1531 ts_init,
1532 ));
1533 }
1534
1535 Ok(OrderBookDeltas::new(instrument_id, deltas))
1536 }
1537
1538 #[must_use]
1540 pub fn get_instrument(&self, symbol: &str) -> Option<InstrumentAny> {
1541 self.instruments.get(&Ustr::from(symbol)).map(|i| i.clone())
1542 }
1543
1544 #[must_use]
1546 pub fn get_instruments(&self) -> Vec<InstrumentAny> {
1547 self.instruments.iter().map(|i| i.clone()).collect()
1548 }
1549
1550 fn ensure_order_book(&self, instrument_id: InstrumentId, book_type: BookType) {
1551 self.order_books
1552 .entry(instrument_id)
1553 .or_insert_with(|| OrderBook::new(instrument_id, book_type));
1554 }
1555
1556 #[must_use]
1558 pub fn get_bar_type_for_topic(&self, topic: &str) -> Option<BarType> {
1559 self.bar_type_mappings
1560 .get(topic)
1561 .map(|entry| *entry.value())
1562 }
1563
1564 #[must_use]
1566 pub fn get_bar_topics(&self) -> Vec<String> {
1567 self.bar_type_mappings
1568 .iter()
1569 .map(|entry| entry.key().clone())
1570 .collect()
1571 }
1572
1573 fn candle_to_bar(
1580 candle: &crate::http::models::Candle,
1581 bar_type: BarType,
1582 price_precision: u8,
1583 size_precision: u8,
1584 bar_secs: i64,
1585 clock: &AtomicTime,
1586 ) -> anyhow::Result<Bar> {
1587 let ts_init =
1589 datetime_to_unix_nanos(Some(candle.started_at)).unwrap_or_else(|| clock.get_time_ns());
1590
1591 let ts_event_ns = ts_init
1593 .as_u64()
1594 .saturating_add((bar_secs as u64).saturating_mul(1_000_000_000));
1595 let ts_event = UnixNanos::from(ts_event_ns);
1596
1597 let open = Price::from_decimal_dp(candle.open, price_precision)
1598 .context("failed to parse candle open price")?;
1599 let high = Price::from_decimal_dp(candle.high, price_precision)
1600 .context("failed to parse candle high price")?;
1601 let low = Price::from_decimal_dp(candle.low, price_precision)
1602 .context("failed to parse candle low price")?;
1603 let close = Price::from_decimal_dp(candle.close, price_precision)
1604 .context("failed to parse candle close price")?;
1605
1606 let volume = Quantity::from_decimal_dp(candle.base_token_volume, size_precision)
1608 .context("failed to parse candle base_token_volume")?;
1609
1610 Ok(Bar::new(
1611 bar_type, open, high, low, close, volume, ts_event, ts_init,
1612 ))
1613 }
1614
1615 fn handle_ws_message(
1616 message: crate::websocket::enums::NautilusWsMessage,
1617 ctx: &WsMessageContext,
1618 ) {
1619 match message {
1620 crate::websocket::enums::NautilusWsMessage::Data(payloads) => {
1621 Self::handle_data_message(payloads, &ctx.data_sender, &ctx.incomplete_bars);
1622 }
1623 crate::websocket::enums::NautilusWsMessage::Deltas(deltas) => {
1624 Self::handle_deltas_message(
1625 *deltas,
1626 &ctx.data_sender,
1627 &ctx.order_books,
1628 &ctx.last_quotes,
1629 &ctx.instruments,
1630 );
1631 }
1632 crate::websocket::enums::NautilusWsMessage::OraclePrices(oracle_prices) => {
1633 Self::handle_oracle_prices(oracle_prices, &ctx.instruments, &ctx.data_sender);
1634 }
1635 crate::websocket::enums::NautilusWsMessage::Error(err) => {
1636 log::error!("dYdX WS error: {err}");
1637 }
1638 crate::websocket::enums::NautilusWsMessage::Reconnected => {
1639 log::info!("dYdX WS reconnected - re-subscribing to active subscriptions");
1640
1641 let total_subs = ctx.active_orderbook_subs.len()
1642 + ctx.active_trade_subs.len()
1643 + ctx.active_bar_subs.len();
1644
1645 if total_subs == 0 {
1646 log::debug!("No active subscriptions to restore");
1647 return;
1648 }
1649
1650 log::info!(
1651 "Restoring {} subscriptions (orderbook={}, trades={}, bars={})",
1652 total_subs,
1653 ctx.active_orderbook_subs.len(),
1654 ctx.active_trade_subs.len(),
1655 ctx.active_bar_subs.len()
1656 );
1657
1658 for entry in ctx.active_orderbook_subs.iter() {
1660 let instrument_id = *entry.key();
1661 let ws_clone = ctx.ws_client.clone();
1662 get_runtime().spawn(async move {
1663 if let Err(e) = ws_clone.subscribe_orderbook(instrument_id).await {
1664 log::error!(
1665 "Failed to re-subscribe to orderbook for {instrument_id}: {e:?}"
1666 );
1667 } else {
1668 log::debug!("Re-subscribed to orderbook for {instrument_id}");
1669 }
1670 });
1671 }
1672
1673 for entry in ctx.active_trade_subs.iter() {
1675 let instrument_id = *entry.key();
1676 let ws_clone = ctx.ws_client.clone();
1677 get_runtime().spawn(async move {
1678 if let Err(e) = ws_clone.subscribe_trades(instrument_id).await {
1679 log::error!(
1680 "Failed to re-subscribe to trades for {instrument_id}: {e:?}"
1681 );
1682 } else {
1683 log::debug!("Re-subscribed to trades for {instrument_id}");
1684 }
1685 });
1686 }
1687
1688 for entry in ctx.active_bar_subs.iter() {
1690 let (instrument_id, resolution) = entry.key();
1691 let instrument_id = *instrument_id;
1692 let resolution = resolution.clone();
1693 let bar_type = *entry.value();
1694 let ws_clone = ctx.ws_client.clone();
1695
1696 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1698 let topic = format!("{ticker}/{resolution}");
1699 if let Err(e) = ctx.ws_client.send_command(
1700 crate::websocket::handler::HandlerCommand::RegisterBarType {
1701 topic,
1702 bar_type,
1703 },
1704 ) {
1705 log::warn!(
1706 "Failed to re-register bar type for {instrument_id} ({resolution}): {e}"
1707 );
1708 }
1709
1710 get_runtime().spawn(async move {
1711 if let Err(e) =
1712 ws_clone.subscribe_candles(instrument_id, &resolution).await
1713 {
1714 log::error!(
1715 "Failed to re-subscribe to candles for {instrument_id} ({resolution}): {e:?}"
1716 );
1717 } else {
1718 log::debug!(
1719 "Re-subscribed to candles for {instrument_id} ({resolution})"
1720 );
1721 }
1722 });
1723 }
1724
1725 log::info!("Completed re-subscription requests after reconnection");
1726 }
1727 crate::websocket::enums::NautilusWsMessage::BlockHeight(_) => {
1728 log::debug!(
1729 "Ignoring block height message on dYdX data client (handled by execution adapter)"
1730 );
1731 }
1732 crate::websocket::enums::NautilusWsMessage::Order(_)
1733 | crate::websocket::enums::NautilusWsMessage::Fill(_)
1734 | crate::websocket::enums::NautilusWsMessage::Position(_)
1735 | crate::websocket::enums::NautilusWsMessage::AccountState(_)
1736 | crate::websocket::enums::NautilusWsMessage::SubaccountSubscribed(_)
1737 | crate::websocket::enums::NautilusWsMessage::SubaccountsChannelData(_) => {
1738 log::debug!(
1739 "Ignoring execution/subaccount message on dYdX data client (handled by execution adapter)"
1740 );
1741 }
1742 }
1743 }
1744
1745 fn handle_data_message(
1746 payloads: Vec<NautilusData>,
1747 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1748 incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1749 ) {
1750 for data in payloads {
1751 if let NautilusData::Bar(bar) = data {
1753 Self::handle_bar_message(bar, data_sender, incomplete_bars);
1754 } else if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1755 log::error!("Failed to emit data event: {e}");
1756 }
1757 }
1758 }
1759
1760 fn handle_bar_message(
1767 bar: Bar,
1768 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1769 incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1770 ) {
1771 let current_time_ns = get_atomic_clock_realtime().get_time_ns();
1772 let bar_type = bar.bar_type;
1773
1774 if bar.ts_event <= current_time_ns {
1775 incomplete_bars.remove(&bar_type);
1777 if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Bar(bar))) {
1778 log::error!("Failed to emit completed bar: {e}");
1779 }
1780 } else {
1781 log::trace!(
1783 "Caching incomplete bar for {} (ts_event={}, current={})",
1784 bar_type,
1785 bar.ts_event,
1786 current_time_ns
1787 );
1788 incomplete_bars.insert(bar_type, bar);
1789 }
1790 }
1791
1792 fn resolve_crossed_order_book(
1810 book: &mut OrderBook,
1811 venue_deltas: OrderBookDeltas,
1812 instrument: &InstrumentAny,
1813 ) -> anyhow::Result<OrderBookDeltas> {
1814 let instrument_id = venue_deltas.instrument_id;
1815 let ts_init = venue_deltas.ts_init;
1816 let mut all_deltas = venue_deltas.deltas.clone();
1817
1818 book.apply_deltas(&venue_deltas)?;
1820
1821 let mut is_crossed = if let (Some(bid_price), Some(ask_price)) =
1823 (book.best_bid_price(), book.best_ask_price())
1824 {
1825 bid_price >= ask_price
1826 } else {
1827 false
1828 };
1829
1830 while is_crossed {
1832 log::debug!(
1833 "Resolving crossed order book for {}: bid={:?} >= ask={:?}",
1834 instrument_id,
1835 book.best_bid_price(),
1836 book.best_ask_price()
1837 );
1838
1839 let bid_price = match book.best_bid_price() {
1840 Some(p) => p,
1841 None => break,
1842 };
1843 let ask_price = match book.best_ask_price() {
1844 Some(p) => p,
1845 None => break,
1846 };
1847 let bid_size = match book.best_bid_size() {
1848 Some(s) => s,
1849 None => break,
1850 };
1851 let ask_size = match book.best_ask_size() {
1852 Some(s) => s,
1853 None => break,
1854 };
1855
1856 let mut temp_deltas = Vec::new();
1857
1858 if bid_size > ask_size {
1859 let new_bid_size = Quantity::new(
1861 bid_size.as_f64() - ask_size.as_f64(),
1862 instrument.size_precision(),
1863 );
1864 temp_deltas.push(OrderBookDelta::new(
1865 instrument_id,
1866 BookAction::Update,
1867 BookOrder::new(OrderSide::Buy, bid_price, new_bid_size, 0),
1868 0,
1869 0,
1870 ts_init,
1871 ts_init,
1872 ));
1873 temp_deltas.push(OrderBookDelta::new(
1874 instrument_id,
1875 BookAction::Delete,
1876 BookOrder::new(
1877 OrderSide::Sell,
1878 ask_price,
1879 Quantity::new(0.0, instrument.size_precision()),
1880 0,
1881 ),
1882 0,
1883 0,
1884 ts_init,
1885 ts_init,
1886 ));
1887 } else if bid_size < ask_size {
1888 let new_ask_size = Quantity::new(
1890 ask_size.as_f64() - bid_size.as_f64(),
1891 instrument.size_precision(),
1892 );
1893 temp_deltas.push(OrderBookDelta::new(
1894 instrument_id,
1895 BookAction::Update,
1896 BookOrder::new(OrderSide::Sell, ask_price, new_ask_size, 0),
1897 0,
1898 0,
1899 ts_init,
1900 ts_init,
1901 ));
1902 temp_deltas.push(OrderBookDelta::new(
1903 instrument_id,
1904 BookAction::Delete,
1905 BookOrder::new(
1906 OrderSide::Buy,
1907 bid_price,
1908 Quantity::new(0.0, instrument.size_precision()),
1909 0,
1910 ),
1911 0,
1912 0,
1913 ts_init,
1914 ts_init,
1915 ));
1916 } else {
1917 temp_deltas.push(OrderBookDelta::new(
1919 instrument_id,
1920 BookAction::Delete,
1921 BookOrder::new(
1922 OrderSide::Buy,
1923 bid_price,
1924 Quantity::new(0.0, instrument.size_precision()),
1925 0,
1926 ),
1927 0,
1928 0,
1929 ts_init,
1930 ts_init,
1931 ));
1932 temp_deltas.push(OrderBookDelta::new(
1933 instrument_id,
1934 BookAction::Delete,
1935 BookOrder::new(
1936 OrderSide::Sell,
1937 ask_price,
1938 Quantity::new(0.0, instrument.size_precision()),
1939 0,
1940 ),
1941 0,
1942 0,
1943 ts_init,
1944 ts_init,
1945 ));
1946 }
1947
1948 let temp_deltas_obj = OrderBookDeltas::new(instrument_id, temp_deltas.clone());
1950 book.apply_deltas(&temp_deltas_obj)?;
1951 all_deltas.extend(temp_deltas);
1952
1953 is_crossed = if let (Some(bid_price), Some(ask_price)) =
1955 (book.best_bid_price(), book.best_ask_price())
1956 {
1957 bid_price >= ask_price
1958 } else {
1959 false
1960 };
1961 }
1962
1963 if let Some(last_delta) = all_deltas.last_mut() {
1965 last_delta.flags = RecordFlag::F_LAST as u8;
1966 }
1967
1968 Ok(OrderBookDeltas::new(instrument_id, all_deltas))
1969 }
1970
1971 fn handle_deltas_message(
1972 deltas: OrderBookDeltas,
1973 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1974 order_books: &Arc<DashMap<InstrumentId, OrderBook>>,
1975 last_quotes: &Arc<DashMap<InstrumentId, QuoteTick>>,
1976 instruments: &Arc<DashMap<Ustr, InstrumentAny>>,
1977 ) {
1978 let instrument_id = deltas.instrument_id;
1979
1980 let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
1982 Some(inst) => inst.clone(),
1983 None => {
1984 log::error!("Cannot resolve crossed order book: no instrument for {instrument_id}");
1985 if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::from(
1987 OrderBookDeltas_API::new(deltas),
1988 ))) {
1989 log::error!("Failed to emit order book deltas: {e}");
1990 }
1991 return;
1992 }
1993 };
1994
1995 let mut book = order_books
1997 .entry(instrument_id)
1998 .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
1999
2000 let resolved_deltas = match Self::resolve_crossed_order_book(&mut book, deltas, &instrument)
2002 {
2003 Ok(d) => d,
2004 Err(e) => {
2005 log::error!("Failed to resolve crossed order book for {instrument_id}: {e}");
2006 return;
2007 }
2008 };
2009
2010 let quote_opt = if let (Some(bid_price), Some(ask_price)) =
2013 (book.best_bid_price(), book.best_ask_price())
2014 && let (Some(bid_size), Some(ask_size)) = (book.best_bid_size(), book.best_ask_size())
2015 {
2016 Some(QuoteTick::new(
2017 instrument_id,
2018 bid_price,
2019 ask_price,
2020 bid_size,
2021 ask_size,
2022 resolved_deltas.ts_event,
2023 resolved_deltas.ts_init,
2024 ))
2025 } else {
2026 if book.best_bid_price().is_none() && book.best_ask_price().is_none() {
2028 log::debug!(
2029 "Empty orderbook for {instrument_id} after applying deltas, using last quote"
2030 );
2031 last_quotes.get(&instrument_id).map(|q| *q)
2032 } else {
2033 None
2034 }
2035 };
2036
2037 if let Some(quote) = quote_opt {
2038 let emit_quote =
2040 !matches!(last_quotes.get(&instrument_id), Some(existing) if *existing == quote);
2041
2042 if emit_quote {
2043 last_quotes.insert(instrument_id, quote);
2044 if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Quote(quote))) {
2045 log::error!("Failed to emit quote tick: {e}");
2046 }
2047 }
2048 } else if book.best_bid_price().is_some() || book.best_ask_price().is_some() {
2049 log::debug!(
2051 "Incomplete top-of-book for {instrument_id} (bid={:?}, ask={:?})",
2052 book.best_bid_price(),
2053 book.best_ask_price()
2054 );
2055 }
2056
2057 let data: NautilusData = OrderBookDeltas_API::new(resolved_deltas).into();
2059 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
2060 log::error!("Failed to emit order book deltas event: {e}");
2061 }
2062 }
2063
2064 fn handle_oracle_prices(
2065 oracle_prices: std::collections::HashMap<
2066 String,
2067 crate::websocket::messages::DydxOraclePriceMarket,
2068 >,
2069 instruments: &Arc<DashMap<Ustr, InstrumentAny>>,
2070 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
2071 ) {
2072 let ts_init = get_atomic_clock_realtime().get_time_ns();
2073
2074 for (symbol_str, oracle_market) in oracle_prices {
2075 let perp_symbol = format!("{symbol_str}-PERP");
2078 let symbol = Ustr::from(&perp_symbol);
2079
2080 let Some(instrument) = instruments.get(&symbol) else {
2082 log::debug!(
2083 "Received oracle price for unknown instrument (not cached yet): symbol={symbol}"
2084 );
2085 continue;
2086 };
2087
2088 let instrument_id = instrument.id();
2089
2090 let oracle_price_str = &oracle_market.oracle_price;
2092 let Ok(oracle_price_dec) = oracle_price_str.parse::<Decimal>() else {
2093 log::error!(
2094 "Failed to parse oracle price: symbol={symbol}, price_str={oracle_price_str}"
2095 );
2096 continue;
2097 };
2098
2099 let price_precision = instrument.price_precision();
2100 let Ok(oracle_price) = Price::from_decimal_dp(oracle_price_dec, price_precision) else {
2101 log::error!(
2102 "Failed to create oracle Price: symbol={symbol}, price={oracle_price_dec}"
2103 );
2104 continue;
2105 };
2106
2107 let oracle_price_event = DydxOraclePrice::new(
2108 instrument_id,
2109 oracle_price,
2110 ts_init, ts_init,
2112 );
2113
2114 log::debug!(
2115 "Received dYdX oracle price: instrument_id={instrument_id}, oracle_price={oracle_price}, {oracle_price_event:?}"
2116 );
2117
2118 let data = NautilusData::IndexPriceUpdate(IndexPriceUpdate::new(
2119 instrument_id,
2120 oracle_price,
2121 ts_init, ts_init,
2123 ));
2124
2125 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
2126 log::error!("Failed to emit oracle price: {e}");
2127 }
2128 }
2129 }
2130}
2131
2132#[cfg(test)]
2133mod tests {
2134 use std::{collections::HashMap, net::SocketAddr, time::Duration};
2135
2136 use axum::{
2137 Router,
2138 extract::{Path, Query, State},
2139 response::Json,
2140 routing::get,
2141 };
2142 use chrono::Utc;
2143 use indexmap::IndexMap;
2144 use nautilus_common::{
2145 live::runner::set_data_event_sender,
2146 messages::{DataEvent, data::DataResponse},
2147 testing::wait_until_async,
2148 };
2149 use nautilus_core::UUID4;
2150 use nautilus_model::{
2151 data::{
2152 BarSpecification, BarType, Data as NautilusData, OrderBookDelta, OrderBookDeltas,
2153 TradeTick, order::BookOrder,
2154 },
2155 enums::{
2156 AggregationSource, AggressorSide, BarAggregation, BookAction, BookType, OrderSide,
2157 PriceType,
2158 },
2159 identifiers::{ClientId, InstrumentId, Symbol, Venue},
2160 instruments::{CryptoPerpetual, Instrument, InstrumentAny},
2161 orderbook::OrderBook,
2162 types::{Currency, Price, Quantity},
2163 };
2164 use rstest::rstest;
2165 use rust_decimal::Decimal;
2166 use rust_decimal_macros::dec;
2167 use tokio::net::{TcpListener, TcpStream};
2168
2169 use super::*;
2170 use crate::http::models::{Candle, CandlesResponse};
2171
2172 fn setup_test_env() {
2173 let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel();
2175 set_data_event_sender(sender);
2176 }
2177
2178 async fn wait_for_server(addr: SocketAddr) {
2179 wait_until_async(
2180 || async move { TcpStream::connect(addr).await.is_ok() },
2181 Duration::from_secs(5),
2182 )
2183 .await;
2184 }
2185
2186 fn create_test_ws_client() -> DydxWebSocketClient {
2187 DydxWebSocketClient::new_public("ws://test".to_string(), None)
2188 }
2189
2190 #[rstest]
2191 fn test_new_data_client() {
2192 setup_test_env();
2193
2194 let client_id = ClientId::from("DYDX-001");
2195 let config = DydxDataClientConfig::default();
2196 let http_client = DydxHttpClient::default();
2197
2198 let client = DydxDataClient::new(client_id, config, http_client, create_test_ws_client());
2199 assert!(client.is_ok());
2200
2201 let client = client.unwrap();
2202 assert_eq!(client.client_id(), client_id);
2203 assert_eq!(client.venue(), *DYDX_VENUE);
2204 assert!(!client.is_connected());
2205 }
2206
2207 #[tokio::test]
2208 async fn test_data_client_lifecycle() {
2209 setup_test_env();
2210
2211 let client_id = ClientId::from("DYDX-001");
2212 let config = DydxDataClientConfig::default();
2213 let http_client = DydxHttpClient::default();
2214
2215 let mut client =
2216 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
2217
2218 assert!(client.start().is_ok());
2220
2221 assert!(client.stop().is_ok());
2223 assert!(!client.is_connected());
2224
2225 assert!(client.reset().is_ok());
2227
2228 assert!(client.dispose().is_ok());
2230 }
2231
2232 #[rstest]
2233 fn test_subscribe_unsubscribe_instruments_noop() {
2234 setup_test_env();
2235
2236 let client_id = ClientId::from("DYDX-TEST");
2237 let config = DydxDataClientConfig::default();
2238 let http_client = DydxHttpClient::default();
2239
2240 let mut client =
2241 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
2242
2243 let venue = *DYDX_VENUE;
2244 let command_id = UUID4::new();
2245 let ts_init = get_atomic_clock_realtime().get_time_ns();
2246
2247 let subscribe = SubscribeInstruments {
2248 client_id: Some(client_id),
2249 venue,
2250 command_id,
2251 ts_init,
2252 correlation_id: None,
2253 params: None,
2254 };
2255 let unsubscribe = UnsubscribeInstruments::new(None, venue, command_id, ts_init, None, None);
2256
2257 assert!(client.subscribe_instruments(&subscribe).is_ok());
2259 assert!(client.unsubscribe_instruments(&unsubscribe).is_ok());
2260 }
2261
2262 #[rstest]
2263 fn test_bar_type_mappings_registration() {
2264 setup_test_env();
2265
2266 let client_id = ClientId::from("DYDX-TEST");
2267 let config = DydxDataClientConfig::default();
2268 let http_client = DydxHttpClient::default();
2269
2270 let client =
2271 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
2272
2273 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2274 let spec = BarSpecification {
2275 step: std::num::NonZeroUsize::new(1).unwrap(),
2276 aggregation: BarAggregation::Minute,
2277 price_type: PriceType::Last,
2278 };
2279 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2280
2281 assert!(client.get_bar_topics().is_empty());
2283 assert!(client.get_bar_type_for_topic("BTC-USD/1MIN").is_none());
2284
2285 client
2287 .bar_type_mappings
2288 .insert("BTC-USD/1MIN".to_string(), bar_type);
2289
2290 assert_eq!(client.get_bar_topics().len(), 1);
2292 assert!(
2293 client
2294 .get_bar_topics()
2295 .contains(&"BTC-USD/1MIN".to_string())
2296 );
2297 assert_eq!(
2298 client.get_bar_type_for_topic("BTC-USD/1MIN"),
2299 Some(bar_type)
2300 );
2301
2302 let spec_5min = BarSpecification {
2304 step: std::num::NonZeroUsize::new(5).unwrap(),
2305 aggregation: BarAggregation::Minute,
2306 price_type: PriceType::Last,
2307 };
2308 let bar_type_5min = BarType::new(instrument_id, spec_5min, AggregationSource::External);
2309 client
2310 .bar_type_mappings
2311 .insert("BTC-USD/5MINS".to_string(), bar_type_5min);
2312
2313 assert_eq!(client.get_bar_topics().len(), 2);
2315 assert_eq!(
2316 client.get_bar_type_for_topic("BTC-USD/5MINS"),
2317 Some(bar_type_5min)
2318 );
2319 }
2320
2321 #[rstest]
2322 fn test_bar_type_mappings_unregistration() {
2323 setup_test_env();
2324
2325 let client_id = ClientId::from("DYDX-TEST");
2326 let config = DydxDataClientConfig::default();
2327 let http_client = DydxHttpClient::default();
2328
2329 let client =
2330 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
2331
2332 let instrument_id = InstrumentId::from("ETH-USD-PERP.DYDX");
2333 let spec = BarSpecification {
2334 step: std::num::NonZeroUsize::new(1).unwrap(),
2335 aggregation: BarAggregation::Hour,
2336 price_type: PriceType::Last,
2337 };
2338 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2339
2340 client
2342 .bar_type_mappings
2343 .insert("ETH-USD/1HOUR".to_string(), bar_type);
2344 assert_eq!(client.get_bar_topics().len(), 1);
2345
2346 client.bar_type_mappings.remove("ETH-USD/1HOUR");
2348
2349 assert!(client.get_bar_topics().is_empty());
2351 assert!(client.get_bar_type_for_topic("ETH-USD/1HOUR").is_none());
2352 }
2353
2354 #[rstest]
2355 fn test_bar_type_mappings_lookup_nonexistent() {
2356 setup_test_env();
2357
2358 let client_id = ClientId::from("DYDX-TEST");
2359 let config = DydxDataClientConfig::default();
2360 let http_client = DydxHttpClient::default();
2361
2362 let client =
2363 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
2364
2365 assert!(client.get_bar_type_for_topic("NONEXISTENT/1MIN").is_none());
2367 assert!(client.get_bar_topics().is_empty());
2368 }
2369
2370 #[tokio::test]
2371 async fn test_handle_ws_message_deltas_updates_orderbook_and_emits_quote() {
2372 setup_test_env();
2373
2374 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2375 let instruments = Arc::new(DashMap::new());
2376 let order_books = Arc::new(DashMap::new());
2377 let last_quotes = Arc::new(DashMap::new());
2378 let ws_client = DydxWebSocketClient::new_public("ws://test".to_string(), None);
2379 let active_orderbook_subs = Arc::new(DashMap::new());
2380 let active_trade_subs = Arc::new(DashMap::new());
2381 let active_bar_subs = Arc::new(DashMap::new());
2382
2383 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2384 let bar_ts = get_atomic_clock_realtime().get_time_ns();
2385
2386 let symbol = Symbol::from("BTC-USD-PERP");
2388 let instrument = CryptoPerpetual::new(
2389 instrument_id,
2390 symbol,
2391 Currency::BTC(),
2392 Currency::USD(),
2393 Currency::USD(),
2394 false,
2395 2,
2396 4,
2397 Price::from("0.01"),
2398 Quantity::from("0.0001"),
2399 None,
2400 None,
2401 None,
2402 None,
2403 None,
2404 None,
2405 None,
2406 None,
2407 None,
2408 None,
2409 None,
2410 None,
2411 bar_ts,
2412 bar_ts,
2413 );
2414 instruments.insert(
2415 Ustr::from("BTC-USD-PERP"),
2416 InstrumentAny::CryptoPerpetual(instrument),
2417 );
2418
2419 let price = Price::from("100.00");
2420 let size = Quantity::from("1.0");
2421
2422 let bid_delta = OrderBookDelta::new(
2424 instrument_id,
2425 BookAction::Add,
2426 BookOrder::new(OrderSide::Buy, price, size, 1),
2427 0,
2428 1,
2429 bar_ts,
2430 bar_ts,
2431 );
2432 let ask_delta = OrderBookDelta::new(
2433 instrument_id,
2434 BookAction::Add,
2435 BookOrder::new(OrderSide::Sell, Price::from("101.00"), size, 1),
2436 0,
2437 1,
2438 bar_ts,
2439 bar_ts,
2440 );
2441 let deltas = OrderBookDeltas::new(instrument_id, vec![bid_delta, ask_delta]);
2442
2443 let message = crate::websocket::enums::NautilusWsMessage::Deltas(Box::new(deltas));
2444
2445 let incomplete_bars = Arc::new(DashMap::new());
2446 let ctx = WsMessageContext {
2447 data_sender: sender,
2448 instruments,
2449 order_books,
2450 last_quotes,
2451 ws_client,
2452 active_orderbook_subs,
2453 active_trade_subs,
2454 active_bar_subs,
2455 incomplete_bars,
2456 };
2457 DydxDataClient::handle_ws_message(message, &ctx);
2458
2459 assert!(ctx.order_books.get(&instrument_id).is_some());
2461 assert!(ctx.last_quotes.get(&instrument_id).is_some());
2462
2463 let mut saw_quote = false;
2465 let mut saw_deltas = false;
2466
2467 while let Ok(event) = rx.try_recv() {
2468 if let DataEvent::Data(data) = event {
2469 match data {
2470 NautilusData::Quote(_) => saw_quote = true,
2471 NautilusData::Deltas(_) => saw_deltas = true,
2472 _ => {}
2473 }
2474 }
2475 }
2476
2477 assert!(saw_quote);
2478 assert!(saw_deltas);
2479 }
2480
2481 #[rstest]
2482 fn test_handle_ws_message_error_does_not_panic() {
2483 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2486 let instruments = Arc::new(DashMap::new());
2487 let order_books = Arc::new(DashMap::new());
2488 let last_quotes = Arc::new(DashMap::new());
2489 let ws_client = DydxWebSocketClient::new_public("ws://test".to_string(), None);
2490 let active_orderbook_subs = Arc::new(DashMap::new());
2491 let active_trade_subs = Arc::new(DashMap::new());
2492 let active_bar_subs = Arc::new(DashMap::new());
2493 let incomplete_bars = Arc::new(DashMap::new());
2494
2495 let ctx = WsMessageContext {
2496 data_sender: sender,
2497 instruments,
2498 order_books,
2499 last_quotes,
2500 ws_client,
2501 active_orderbook_subs,
2502 active_trade_subs,
2503 active_bar_subs,
2504 incomplete_bars,
2505 };
2506
2507 let err = crate::websocket::error::DydxWebSocketError::from_message(
2508 "malformed WebSocket payload".to_string(),
2509 );
2510
2511 DydxDataClient::handle_ws_message(
2512 crate::websocket::enums::NautilusWsMessage::Error(err),
2513 &ctx,
2514 );
2515 }
2516
2517 #[tokio::test]
2518 async fn test_request_bars_partitioning_math_does_not_panic() {
2519 setup_test_env();
2520
2521 let client_id = ClientId::from("DYDX-BARS");
2522 let config = DydxDataClientConfig::default();
2523 let http_client = DydxHttpClient::default();
2524
2525 let client =
2526 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
2527
2528 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2529 let spec = BarSpecification {
2530 step: std::num::NonZeroUsize::new(1).unwrap(),
2531 aggregation: BarAggregation::Minute,
2532 price_type: PriceType::Last,
2533 };
2534 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2535
2536 let now = Utc::now();
2537 let start = Some(now - chrono::Duration::hours(10));
2538 let end = Some(now);
2539
2540 let request = RequestBars::new(
2541 bar_type,
2542 start,
2543 end,
2544 None,
2545 Some(client_id),
2546 UUID4::new(),
2547 get_atomic_clock_realtime().get_time_ns(),
2548 None,
2549 );
2550
2551 assert!(client.request_bars(&request).is_ok());
2554 }
2555
2556 #[tokio::test]
2557 async fn test_request_bars_partitioning_months_range_does_not_overflow() {
2558 setup_test_env();
2559
2560 let now = Utc::now();
2562 let candle = crate::http::models::Candle {
2563 started_at: now - chrono::Duration::minutes(1),
2564 ticker: "BTC-USD".to_string(),
2565 resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2566 open: dec!(100.0),
2567 high: dec!(101.0),
2568 low: dec!(99.0),
2569 close: dec!(100.5),
2570 base_token_volume: dec!(1.0),
2571 usd_volume: dec!(100.0),
2572 trades: 10,
2573 starting_open_interest: dec!(1000.0),
2574 };
2575 let candles_response = crate::http::models::CandlesResponse {
2576 candles: vec![candle],
2577 };
2578 let state = CandlesTestState {
2579 response: Arc::new(candles_response),
2580 };
2581 let addr = start_candles_test_server(state).await;
2582 let base_url = format!("http://{addr}");
2583
2584 let client_id = ClientId::from("DYDX-BARS-MONTHS");
2585 let config = DydxDataClientConfig {
2586 base_url_http: Some(base_url),
2587 is_testnet: true,
2588 ..Default::default()
2589 };
2590
2591 let http_client = DydxHttpClient::new(
2592 config.base_url_http.clone(),
2593 config.http_timeout_secs,
2594 config.http_proxy_url.clone(),
2595 config.is_testnet,
2596 None,
2597 )
2598 .unwrap();
2599
2600 let client =
2601 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
2602
2603 let instrument = create_test_instrument_any();
2605 let instrument_id = instrument.id();
2606 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
2607 client.instruments.insert(symbol_key, instrument);
2608
2609 let spec = BarSpecification {
2610 step: std::num::NonZeroUsize::new(1).unwrap(),
2611 aggregation: BarAggregation::Minute,
2612 price_type: PriceType::Last,
2613 };
2614 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2615
2616 let start = Some(now - chrono::Duration::days(90));
2618 let end = Some(now);
2619
2620 let limit = Some(std::num::NonZeroUsize::new(10).unwrap());
2622
2623 let request = RequestBars::new(
2624 bar_type,
2625 start,
2626 end,
2627 limit,
2628 Some(client_id),
2629 UUID4::new(),
2630 get_atomic_clock_realtime().get_time_ns(),
2631 None,
2632 );
2633
2634 assert!(client.request_bars(&request).is_ok());
2635 }
2636
2637 #[derive(Clone)]
2638 struct OrderbookTestState {
2639 snapshot: Arc<crate::http::models::OrderbookResponse>,
2640 }
2641
2642 #[derive(Clone)]
2643 struct TradesTestState {
2644 response: Arc<crate::http::models::TradesResponse>,
2645 last_ticker: Arc<tokio::sync::Mutex<Option<String>>>,
2646 last_limit: Arc<tokio::sync::Mutex<Option<Option<u32>>>>,
2647 }
2648
2649 #[derive(Clone)]
2650 struct CandlesTestState {
2651 response: Arc<crate::http::models::CandlesResponse>,
2652 }
2653
2654 async fn start_orderbook_test_server(state: OrderbookTestState) -> SocketAddr {
2655 async fn handle_orderbook(
2656 Path(_ticker): Path<String>,
2657 State(state): State<OrderbookTestState>,
2658 ) -> Json<crate::http::models::OrderbookResponse> {
2659 Json((*state.snapshot).clone())
2660 }
2661
2662 let router = Router::new().route(
2663 "/v4/orderbooks/perpetualMarket/{ticker}",
2664 get(handle_orderbook).with_state(state),
2665 );
2666
2667 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2668 let addr = listener.local_addr().unwrap();
2669
2670 get_runtime().spawn(async move {
2671 axum::serve(listener, router.into_make_service())
2672 .await
2673 .unwrap();
2674 });
2675
2676 wait_for_server(addr).await;
2677 addr
2678 }
2679
2680 async fn start_trades_test_server(state: TradesTestState) -> SocketAddr {
2681 async fn handle_trades(
2682 Path(ticker): Path<String>,
2683 Query(params): Query<HashMap<String, String>>,
2684 State(state): State<TradesTestState>,
2685 ) -> Json<crate::http::models::TradesResponse> {
2686 {
2687 let mut last_ticker = state.last_ticker.lock().await;
2688 *last_ticker = Some(ticker);
2689 }
2690
2691 let limit = params
2692 .get("limit")
2693 .and_then(|value| value.parse::<u32>().ok());
2694 {
2695 let mut last_limit = state.last_limit.lock().await;
2696 *last_limit = Some(limit);
2697 }
2698
2699 Json((*state.response).clone())
2700 }
2701
2702 let router = Router::new().route(
2703 "/v4/trades/perpetualMarket/{ticker}",
2704 get(handle_trades).with_state(state),
2705 );
2706
2707 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2708 let addr = listener.local_addr().unwrap();
2709
2710 get_runtime().spawn(async move {
2711 axum::serve(listener, router.into_make_service())
2712 .await
2713 .unwrap();
2714 });
2715
2716 wait_for_server(addr).await;
2717 addr
2718 }
2719
2720 async fn start_candles_test_server(state: CandlesTestState) -> SocketAddr {
2721 async fn handle_candles(
2722 Path(_ticker): Path<String>,
2723 Query(_params): Query<HashMap<String, String>>,
2724 State(state): State<CandlesTestState>,
2725 ) -> Json<crate::http::models::CandlesResponse> {
2726 Json((*state.response).clone())
2727 }
2728
2729 let router = Router::new().route(
2730 "/v4/candles/perpetualMarkets/{ticker}",
2731 get(handle_candles).with_state(state),
2732 );
2733
2734 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2735 let addr = listener.local_addr().unwrap();
2736
2737 get_runtime().spawn(async move {
2738 axum::serve(listener, router.into_make_service())
2739 .await
2740 .unwrap();
2741 });
2742
2743 wait_for_server(addr).await;
2744 addr
2745 }
2746
2747 fn create_test_instrument_any() -> InstrumentAny {
2748 let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
2749
2750 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
2751 instrument_id,
2752 instrument_id.symbol,
2753 Currency::BTC(),
2754 Currency::USD(),
2755 Currency::USD(),
2756 false,
2757 2, 8, Price::new(0.01, 2), Quantity::new(0.001, 8), Some(Quantity::new(1.0, 0)), Some(Quantity::new(0.001, 8)), Some(Quantity::new(100000.0, 8)), Some(Quantity::new(0.001, 8)), None, None, Some(Price::new(1000000.0, 2)), Some(Price::new(0.01, 2)), Some(dec!(0.05)), Some(dec!(0.03)), Some(dec!(0.0002)), Some(dec!(0.0005)), UnixNanos::default(), UnixNanos::default(), ))
2776 }
2777
2778 #[tokio::test]
2783 async fn test_candle_to_bar_price_size_edge_cases() {
2784 setup_test_env();
2785
2786 let clock = get_atomic_clock_realtime();
2787 let now = Utc::now();
2788
2789 let candle = Candle {
2791 started_at: now,
2792 ticker: "BTC-USD".to_string(),
2793 resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2794 open: dec!(123456789.123456),
2795 high: dec!(987654321.987654), low: dec!(123456.789), close: dec!(223456789.123456), base_token_volume: dec!(0.00000001),
2799 usd_volume: dec!(1234500.0),
2800 trades: 42,
2801 starting_open_interest: dec!(1000.0),
2802 };
2803
2804 let instrument = create_test_instrument_any();
2805 let instrument_id = instrument.id();
2806 let spec = BarSpecification {
2807 step: std::num::NonZeroUsize::new(1).unwrap(),
2808 aggregation: BarAggregation::Minute,
2809 price_type: PriceType::Last,
2810 };
2811 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2812
2813 let bar = DydxDataClient::candle_to_bar(
2814 &candle,
2815 bar_type,
2816 instrument.price_precision(),
2817 instrument.size_precision(),
2818 60,
2819 clock,
2820 )
2821 .expect("candle_to_bar should handle large/scientific values");
2822
2823 assert!(bar.open.as_f64() > 0.0);
2824 assert!(bar.high.as_f64() >= bar.low.as_f64());
2825 assert!(bar.volume.as_f64() > 0.0);
2826 }
2827
2828 #[tokio::test]
2829 async fn test_candle_to_bar_ts_event_overflow_safe() {
2830 setup_test_env();
2831
2832 let clock = get_atomic_clock_realtime();
2833 let now = Utc::now();
2834
2835 let candle = Candle {
2836 started_at: now,
2837 ticker: "BTC-USD".to_string(),
2838 resolution: crate::common::enums::DydxCandleResolution::OneDay,
2839 open: Decimal::from(1),
2840 high: Decimal::from(1),
2841 low: Decimal::from(1),
2842 close: Decimal::from(1),
2843 base_token_volume: Decimal::from(1),
2844 usd_volume: Decimal::from(1),
2845 trades: 1,
2846 starting_open_interest: Decimal::from(1),
2847 };
2848
2849 let instrument = create_test_instrument_any();
2850 let instrument_id = instrument.id();
2851 let spec = BarSpecification {
2852 step: std::num::NonZeroUsize::new(1).unwrap(),
2853 aggregation: BarAggregation::Day,
2854 price_type: PriceType::Last,
2855 };
2856 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2857
2858 let bar_secs = i64::MAX / 1_000_000_000;
2860 let bar = DydxDataClient::candle_to_bar(
2861 &candle,
2862 bar_type,
2863 instrument.price_precision(),
2864 instrument.size_precision(),
2865 bar_secs,
2866 clock,
2867 )
2868 .expect("candle_to_bar should not overflow on ts_event");
2869
2870 assert!(bar.ts_event.as_u64() >= bar.ts_init.as_u64());
2871 }
2872
2873 #[tokio::test]
2874 async fn test_request_bars_incomplete_bar_filtering_with_clock_skew() {
2875 let clock = get_atomic_clock_realtime();
2878 let now = Utc::now();
2879
2880 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2883 set_data_event_sender(sender);
2884
2885 let candle_past = Candle {
2887 started_at: now - chrono::Duration::minutes(2),
2888 ticker: "BTC-USD".to_string(),
2889 resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2890 open: Decimal::from(1),
2891 high: Decimal::from(2),
2892 low: Decimal::from(1),
2893 close: Decimal::from(1),
2894 base_token_volume: Decimal::from(1),
2895 usd_volume: Decimal::from(1),
2896 trades: 1,
2897 starting_open_interest: Decimal::from(1),
2898 };
2899 let candle_future = Candle {
2900 started_at: now + chrono::Duration::minutes(2),
2901 ..candle_past.clone()
2902 };
2903
2904 let candles_response = CandlesResponse {
2905 candles: vec![candle_past, candle_future],
2906 };
2907
2908 let state = CandlesTestState {
2909 response: Arc::new(candles_response),
2910 };
2911 let addr = start_candles_test_server(state).await;
2912 let base_url = format!("http://{addr}");
2913
2914 let client_id = ClientId::from("DYDX-BARS-SKEW");
2915 let config = DydxDataClientConfig {
2916 base_url_http: Some(base_url),
2917 is_testnet: true,
2918 ..Default::default()
2919 };
2920
2921 let http_client = DydxHttpClient::new(
2922 config.base_url_http.clone(),
2923 config.http_timeout_secs,
2924 config.http_proxy_url.clone(),
2925 config.is_testnet,
2926 None,
2927 )
2928 .unwrap();
2929
2930 let client =
2931 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
2932
2933 let instrument = create_test_instrument_any();
2934 let instrument_id = instrument.id();
2935 let symbol_key = Ustr::from(instrument_id.symbol.as_ref());
2936 client.instruments.insert(symbol_key, instrument);
2937
2938 let spec = BarSpecification {
2939 step: std::num::NonZeroUsize::new(1).unwrap(),
2940 aggregation: BarAggregation::Minute,
2941 price_type: PriceType::Last,
2942 };
2943 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2944
2945 let request = RequestBars::new(
2946 bar_type,
2947 Some(now - chrono::Duration::minutes(5)),
2948 Some(now + chrono::Duration::minutes(5)),
2949 None,
2950 Some(client_id),
2951 UUID4::new(),
2952 clock.get_time_ns(),
2953 None,
2954 );
2955
2956 assert!(client.request_bars(&request).is_ok());
2957
2958 let timeout = tokio::time::Duration::from_secs(3);
2959 if let Ok(Some(DataEvent::Response(DataResponse::Bars(resp)))) =
2960 tokio::time::timeout(timeout, rx.recv()).await
2961 {
2962 assert_eq!(resp.data.len(), 1);
2964 }
2965 }
2966
2967 #[rstest]
2968 fn test_decimal_to_f64_precision_loss_within_tolerance() {
2969 let price_value = 12345.125_f64;
2971 let qty_value = 0.00012345_f64;
2972
2973 let price = Price::new(price_value, 6);
2974 let qty = Quantity::new(qty_value, 8);
2975
2976 let price_diff = (price.as_f64() - price_value).abs();
2977 let qty_diff = (qty.as_f64() - qty_value).abs();
2978
2979 assert!(price_diff < 1e-10);
2981 assert!(qty_diff < 1e-12);
2982 }
2983
2984 #[tokio::test]
2985 async fn test_orderbook_refresh_task_applies_http_snapshot_and_emits_event() {
2986 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2988 set_data_event_sender(sender);
2989
2990 let snapshot = crate::http::models::OrderbookResponse {
2992 bids: vec![crate::http::models::OrderbookLevel {
2993 price: dec!(100.0),
2994 size: dec!(1.0),
2995 }],
2996 asks: vec![crate::http::models::OrderbookLevel {
2997 price: dec!(101.0),
2998 size: dec!(2.0),
2999 }],
3000 };
3001 let state = OrderbookTestState {
3002 snapshot: Arc::new(snapshot),
3003 };
3004 let addr = start_orderbook_test_server(state).await;
3005 let base_url = format!("http://{addr}");
3006
3007 let client_id = ClientId::from("DYDX-REFRESH");
3009 let config = DydxDataClientConfig {
3010 is_testnet: true,
3011 base_url_http: Some(base_url),
3012 orderbook_refresh_interval_secs: Some(1),
3013 instrument_refresh_interval_secs: None,
3014 ..Default::default()
3015 };
3016
3017 let http_client = DydxHttpClient::new(
3018 config.base_url_http.clone(),
3019 config.http_timeout_secs,
3020 config.http_proxy_url.clone(),
3021 config.is_testnet,
3022 None,
3023 )
3024 .unwrap();
3025
3026 let mut client =
3027 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3028
3029 let instrument = create_test_instrument_any();
3031 let instrument_id = instrument.id();
3032 let symbol_key = Ustr::from(instrument_id.symbol.as_ref());
3033 client.instruments.insert(symbol_key, instrument);
3034 client.order_books.insert(
3035 instrument_id,
3036 OrderBook::new(instrument_id, BookType::L2_MBP),
3037 );
3038 client.active_orderbook_subs.insert(instrument_id, ());
3039
3040 client.start_orderbook_refresh_task().unwrap();
3042
3043 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
3044 let mut saw_snapshot_event = false;
3045
3046 while std::time::Instant::now() < deadline {
3047 if let Ok(Some(DataEvent::Data(NautilusData::Deltas(_)))) =
3048 tokio::time::timeout(std::time::Duration::from_millis(250), rx.recv()).await
3049 {
3050 saw_snapshot_event = true;
3051 break;
3052 }
3053 }
3054
3055 assert!(
3056 saw_snapshot_event,
3057 "expected at least one snapshot deltas event from refresh task"
3058 );
3059
3060 let book = client
3062 .order_books
3063 .get(&instrument_id)
3064 .expect("orderbook should exist after refresh");
3065 let best_bid = book.best_bid_price().expect("best bid should be set");
3066 let best_ask = book.best_ask_price().expect("best ask should be set");
3067
3068 assert_eq!(best_bid, Price::from("100.00"));
3069 assert_eq!(best_ask, Price::from("101.00"));
3070 }
3071
3072 #[rstest]
3073 fn test_resolve_crossed_order_book_bid_larger_than_ask() {
3074 let instrument = create_test_instrument_any();
3077 let instrument_id = instrument.id();
3078 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3079 let ts_init = get_atomic_clock_realtime().get_time_ns();
3080
3081 let initial_deltas = vec![
3083 OrderBookDelta::new(
3084 instrument_id,
3085 BookAction::Add,
3086 BookOrder::new(
3087 OrderSide::Buy,
3088 Price::from("99.00"),
3089 Quantity::from("1.0"),
3090 0,
3091 ),
3092 0,
3093 0,
3094 ts_init,
3095 ts_init,
3096 ),
3097 OrderBookDelta::new(
3098 instrument_id,
3099 BookAction::Add,
3100 BookOrder::new(
3101 OrderSide::Sell,
3102 Price::from("101.00"),
3103 Quantity::from("2.0"),
3104 0,
3105 ),
3106 0,
3107 0,
3108 ts_init,
3109 ts_init,
3110 ),
3111 ];
3112 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3113 .unwrap();
3114
3115 let crossed_deltas = vec![OrderBookDelta::new(
3117 instrument_id,
3118 BookAction::Add,
3119 BookOrder::new(
3120 OrderSide::Buy,
3121 Price::from("102.00"),
3122 Quantity::from("5.0"),
3123 0,
3124 ),
3125 0,
3126 0,
3127 ts_init,
3128 ts_init,
3129 )];
3130 let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3131
3132 let resolved =
3133 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3134 .unwrap();
3135
3136 assert_eq!(book.best_bid_price(), Some(Price::from("102.00")));
3139 assert!(book.best_bid_size().unwrap().as_f64() < 5.0); assert!(
3141 book.best_ask_price().is_none()
3142 || book.best_ask_price().unwrap() > book.best_bid_price().unwrap()
3143 ); assert!(resolved.deltas.len() > 1); }
3148
3149 #[rstest]
3150 fn test_resolve_crossed_order_book_ask_larger_than_bid() {
3151 let instrument = create_test_instrument_any();
3154 let instrument_id = instrument.id();
3155 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3156 let ts_init = get_atomic_clock_realtime().get_time_ns();
3157
3158 let initial_deltas = vec![
3160 OrderBookDelta::new(
3161 instrument_id,
3162 BookAction::Add,
3163 BookOrder::new(
3164 OrderSide::Buy,
3165 Price::from("99.00"),
3166 Quantity::from("1.0"),
3167 0,
3168 ),
3169 0,
3170 0,
3171 ts_init,
3172 ts_init,
3173 ),
3174 OrderBookDelta::new(
3175 instrument_id,
3176 BookAction::Add,
3177 BookOrder::new(
3178 OrderSide::Sell,
3179 Price::from("101.00"),
3180 Quantity::from("5.0"),
3181 0,
3182 ),
3183 0,
3184 0,
3185 ts_init,
3186 ts_init,
3187 ),
3188 ];
3189 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3190 .unwrap();
3191
3192 let crossed_deltas = vec![OrderBookDelta::new(
3194 instrument_id,
3195 BookAction::Add,
3196 BookOrder::new(
3197 OrderSide::Buy,
3198 Price::from("102.00"),
3199 Quantity::from("2.0"),
3200 0,
3201 ),
3202 0,
3203 0,
3204 ts_init,
3205 ts_init,
3206 )];
3207 let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3208
3209 let resolved =
3210 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3211 .unwrap();
3212
3213 assert_eq!(book.best_ask_price(), Some(Price::from("101.00")));
3215 assert!(book.best_ask_size().unwrap().as_f64() < 5.0); assert_eq!(book.best_bid_price(), Some(Price::from("99.00"))); assert!(book.best_ask_price().unwrap() > book.best_bid_price().unwrap()); assert!(resolved.deltas.len() > 1);
3221 }
3222
3223 #[rstest]
3224 fn test_resolve_crossed_order_book_equal_sizes() {
3225 let instrument = create_test_instrument_any();
3228 let instrument_id = instrument.id();
3229 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3230 let ts_init = get_atomic_clock_realtime().get_time_ns();
3231
3232 let initial_deltas = vec![
3234 OrderBookDelta::new(
3235 instrument_id,
3236 BookAction::Add,
3237 BookOrder::new(
3238 OrderSide::Buy,
3239 Price::from("99.00"),
3240 Quantity::from("1.0"),
3241 0,
3242 ),
3243 0,
3244 0,
3245 ts_init,
3246 ts_init,
3247 ),
3248 OrderBookDelta::new(
3249 instrument_id,
3250 BookAction::Add,
3251 BookOrder::new(
3252 OrderSide::Sell,
3253 Price::from("101.00"),
3254 Quantity::from("3.0"),
3255 0,
3256 ),
3257 0,
3258 0,
3259 ts_init,
3260 ts_init,
3261 ),
3262 ];
3263 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3264 .unwrap();
3265
3266 let crossed_deltas = vec![OrderBookDelta::new(
3268 instrument_id,
3269 BookAction::Add,
3270 BookOrder::new(
3271 OrderSide::Buy,
3272 Price::from("102.00"),
3273 Quantity::from("3.0"),
3274 0,
3275 ),
3276 0,
3277 0,
3278 ts_init,
3279 ts_init,
3280 )];
3281 let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3282
3283 let resolved =
3284 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3285 .unwrap();
3286
3287 assert_eq!(book.best_bid_price(), Some(Price::from("99.00"))); if let Some(ask_price) = book.best_ask_price() {
3291 assert!(ask_price > book.best_bid_price().unwrap()); }
3293
3294 assert!(resolved.deltas.len() > 1);
3296 }
3297
3298 #[rstest]
3299 fn test_resolve_crossed_order_book_multiple_iterations() {
3300 let instrument = create_test_instrument_any();
3302 let instrument_id = instrument.id();
3303 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3304 let ts_init = get_atomic_clock_realtime().get_time_ns();
3305
3306 let initial_deltas = vec![
3308 OrderBookDelta::new(
3309 instrument_id,
3310 BookAction::Add,
3311 BookOrder::new(
3312 OrderSide::Buy,
3313 Price::from("98.00"),
3314 Quantity::from("1.0"),
3315 0,
3316 ),
3317 0,
3318 0,
3319 ts_init,
3320 ts_init,
3321 ),
3322 OrderBookDelta::new(
3323 instrument_id,
3324 BookAction::Add,
3325 BookOrder::new(
3326 OrderSide::Sell,
3327 Price::from("100.00"),
3328 Quantity::from("1.0"),
3329 0,
3330 ),
3331 0,
3332 0,
3333 ts_init,
3334 ts_init,
3335 ),
3336 OrderBookDelta::new(
3337 instrument_id,
3338 BookAction::Add,
3339 BookOrder::new(
3340 OrderSide::Sell,
3341 Price::from("101.00"),
3342 Quantity::from("1.0"),
3343 0,
3344 ),
3345 0,
3346 0,
3347 ts_init,
3348 ts_init,
3349 ),
3350 ];
3351 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3352 .unwrap();
3353
3354 let crossed_deltas = vec![
3356 OrderBookDelta::new(
3357 instrument_id,
3358 BookAction::Add,
3359 BookOrder::new(
3360 OrderSide::Buy,
3361 Price::from("102.00"),
3362 Quantity::from("1.0"),
3363 0,
3364 ),
3365 0,
3366 0,
3367 ts_init,
3368 ts_init,
3369 ),
3370 OrderBookDelta::new(
3371 instrument_id,
3372 BookAction::Add,
3373 BookOrder::new(
3374 OrderSide::Buy,
3375 Price::from("103.00"),
3376 Quantity::from("1.0"),
3377 0,
3378 ),
3379 0,
3380 0,
3381 ts_init,
3382 ts_init,
3383 ),
3384 ];
3385 let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3386
3387 let resolved =
3388 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3389 .unwrap();
3390
3391 if let (Some(bid_price), Some(ask_price)) = (book.best_bid_price(), book.best_ask_price()) {
3393 assert!(ask_price > bid_price, "Book should be uncrossed");
3394 }
3395
3396 assert!(resolved.deltas.len() > 2); }
3399
3400 #[rstest]
3401 fn test_resolve_crossed_order_book_non_crossed_passthrough() {
3402 let instrument = create_test_instrument_any();
3404 let instrument_id = instrument.id();
3405 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3406 let ts_init = get_atomic_clock_realtime().get_time_ns();
3407
3408 let initial_deltas = vec![
3410 OrderBookDelta::new(
3411 instrument_id,
3412 BookAction::Add,
3413 BookOrder::new(
3414 OrderSide::Buy,
3415 Price::from("99.00"),
3416 Quantity::from("1.0"),
3417 0,
3418 ),
3419 0,
3420 0,
3421 ts_init,
3422 ts_init,
3423 ),
3424 OrderBookDelta::new(
3425 instrument_id,
3426 BookAction::Add,
3427 BookOrder::new(
3428 OrderSide::Sell,
3429 Price::from("101.00"),
3430 Quantity::from("1.0"),
3431 0,
3432 ),
3433 0,
3434 0,
3435 ts_init,
3436 ts_init,
3437 ),
3438 ];
3439 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3440 .unwrap();
3441
3442 let new_deltas = vec![OrderBookDelta::new(
3444 instrument_id,
3445 BookAction::Add,
3446 BookOrder::new(
3447 OrderSide::Buy,
3448 Price::from("98.50"),
3449 Quantity::from("2.0"),
3450 0,
3451 ),
3452 0,
3453 0,
3454 ts_init,
3455 ts_init,
3456 )];
3457 let venue_deltas = OrderBookDeltas::new(instrument_id, new_deltas.clone());
3458
3459 let original_bid = book.best_bid_price();
3460 let original_ask = book.best_ask_price();
3461
3462 let resolved =
3463 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3464 .unwrap();
3465
3466 assert_eq!(resolved.deltas.len(), new_deltas.len());
3468 assert_eq!(book.best_bid_price(), original_bid);
3469 assert_eq!(book.best_ask_price(), original_ask);
3470 assert!(book.best_ask_price().unwrap() > book.best_bid_price().unwrap());
3471 }
3472
3473 #[tokio::test]
3474 async fn test_request_instruments_successful_fetch() {
3475 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3477 set_data_event_sender(sender);
3478
3479 let client_id = ClientId::from("DYDX-TEST");
3480 let config = DydxDataClientConfig::default();
3481 let http_client = DydxHttpClient::default();
3482 let client =
3483 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3484
3485 let request = RequestInstruments::new(
3486 None,
3487 None,
3488 Some(client_id),
3489 Some(*DYDX_VENUE),
3490 UUID4::new(),
3491 get_atomic_clock_realtime().get_time_ns(),
3492 None,
3493 );
3494
3495 assert!(client.request_instruments(&request).is_ok());
3497
3498 let timeout = tokio::time::Duration::from_secs(5);
3500 let result = tokio::time::timeout(timeout, rx.recv()).await;
3501
3502 match result {
3503 Ok(Some(DataEvent::Response(resp))) => {
3504 if let DataResponse::Instruments(inst_resp) = resp {
3505 assert_eq!(inst_resp.correlation_id, request.request_id);
3507 assert_eq!(inst_resp.client_id, client_id);
3508 assert_eq!(inst_resp.venue, *DYDX_VENUE);
3509 assert!(inst_resp.start.is_none());
3510 assert!(inst_resp.end.is_none());
3511 }
3513 }
3514 Ok(Some(_)) => panic!("Expected InstrumentsResponse"),
3515 Ok(None) => panic!("Channel closed unexpectedly"),
3516 Err(_) => {
3517 println!("Test timed out - testnet may be unreachable");
3519 }
3520 }
3521 }
3522
3523 #[tokio::test]
3524 async fn test_request_instruments_empty_response_on_http_error() {
3525 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3527 set_data_event_sender(sender);
3528
3529 let client_id = ClientId::from("DYDX-ERROR-TEST");
3530 let config = DydxDataClientConfig {
3531 base_url_http: Some("http://invalid-url-does-not-exist.local".to_string()),
3532 ..Default::default()
3533 };
3534 let http_client = DydxHttpClient::new(
3535 config.base_url_http.clone(),
3536 config.http_timeout_secs,
3537 config.http_proxy_url.clone(),
3538 config.is_testnet,
3539 None,
3540 )
3541 .unwrap();
3542
3543 let client =
3544 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3545
3546 let request = RequestInstruments::new(
3547 None,
3548 None,
3549 Some(client_id),
3550 Some(*DYDX_VENUE),
3551 UUID4::new(),
3552 get_atomic_clock_realtime().get_time_ns(),
3553 None,
3554 );
3555
3556 assert!(client.request_instruments(&request).is_ok());
3557
3558 let timeout = tokio::time::Duration::from_secs(3);
3560 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3561 tokio::time::timeout(timeout, rx.recv()).await
3562 {
3563 assert!(
3564 resp.data.is_empty(),
3565 "Expected empty instruments on HTTP error"
3566 );
3567 assert_eq!(resp.correlation_id, request.request_id);
3568 assert_eq!(resp.client_id, client_id);
3569 }
3570 }
3571
3572 #[tokio::test]
3573 async fn test_request_instruments_caching() {
3574 setup_test_env();
3576
3577 let client_id = ClientId::from("DYDX-CACHE-TEST");
3578 let config = DydxDataClientConfig::default();
3579 let http_client = DydxHttpClient::default();
3580 let client =
3581 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3582
3583 let initial_cache_size = client.instruments.len();
3584
3585 let request = RequestInstruments::new(
3586 None,
3587 None,
3588 Some(client_id),
3589 Some(*DYDX_VENUE),
3590 UUID4::new(),
3591 get_atomic_clock_realtime().get_time_ns(),
3592 None,
3593 );
3594
3595 assert!(client.request_instruments(&request).is_ok());
3596
3597 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
3599
3600 let final_cache_size = client.instruments.len();
3602 assert!(final_cache_size >= initial_cache_size);
3605 }
3606
3607 #[tokio::test]
3608 async fn test_request_instruments_correlation_id_matching() {
3609 setup_test_env();
3611
3612 let client_id = ClientId::from("DYDX-CORR-TEST");
3613 let config = DydxDataClientConfig::default();
3614 let http_client = DydxHttpClient::default();
3615 let client =
3616 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3617
3618 let request_id = UUID4::new();
3619 let request = RequestInstruments::new(
3620 None,
3621 None,
3622 Some(client_id),
3623 Some(*DYDX_VENUE),
3624 request_id,
3625 get_atomic_clock_realtime().get_time_ns(),
3626 None,
3627 );
3628
3629 assert!(client.request_instruments(&request).is_ok());
3631 }
3632
3633 #[tokio::test]
3634 async fn test_request_instruments_venue_assignment() {
3635 setup_test_env();
3637
3638 let client_id = ClientId::from("DYDX-VENUE-TEST");
3639 let config = DydxDataClientConfig::default();
3640 let http_client = DydxHttpClient::default();
3641 let client =
3642 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3643
3644 assert_eq!(client.venue(), *DYDX_VENUE);
3645
3646 let request = RequestInstruments::new(
3647 None,
3648 None,
3649 Some(client_id),
3650 Some(*DYDX_VENUE),
3651 UUID4::new(),
3652 get_atomic_clock_realtime().get_time_ns(),
3653 None,
3654 );
3655
3656 assert!(client.request_instruments(&request).is_ok());
3657 }
3658
3659 #[tokio::test]
3660 async fn test_request_instruments_timestamp_handling() {
3661 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3663 set_data_event_sender(sender);
3664
3665 let client_id = ClientId::from("DYDX-TS-TEST");
3666 let config = DydxDataClientConfig::default();
3667 let http_client = DydxHttpClient::default();
3668 let client =
3669 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3670
3671 let now = Utc::now();
3672 let start = Some(now - chrono::Duration::hours(24));
3673 let end = Some(now);
3674
3675 let request = RequestInstruments::new(
3676 start,
3677 end,
3678 Some(client_id),
3679 Some(*DYDX_VENUE),
3680 UUID4::new(),
3681 get_atomic_clock_realtime().get_time_ns(),
3682 None,
3683 );
3684
3685 assert!(client.request_instruments(&request).is_ok());
3686
3687 let timeout = tokio::time::Duration::from_secs(3);
3689 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3690 tokio::time::timeout(timeout, rx.recv()).await
3691 {
3692 assert!(resp.start.unwrap() > 0);
3694 assert!(resp.end.unwrap() > 0);
3695 assert!(resp.start.unwrap() <= resp.end.unwrap());
3696 assert!(resp.ts_init > 0);
3697 }
3698 }
3699
3700 #[tokio::test]
3701 async fn test_request_instruments_with_start_only() {
3702 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3704 set_data_event_sender(sender);
3705
3706 let client_id = ClientId::from("DYDX-TS-START-ONLY");
3707 let config = DydxDataClientConfig::default();
3708 let http_client = DydxHttpClient::default();
3709 let client =
3710 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3711
3712 let now = Utc::now();
3713 let start = Some(now - chrono::Duration::hours(24));
3714
3715 let request = RequestInstruments::new(
3716 start,
3717 None,
3718 Some(client_id),
3719 Some(*DYDX_VENUE),
3720 UUID4::new(),
3721 get_atomic_clock_realtime().get_time_ns(),
3722 None,
3723 );
3724
3725 assert!(client.request_instruments(&request).is_ok());
3726
3727 let timeout = tokio::time::Duration::from_secs(3);
3728 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3729 tokio::time::timeout(timeout, rx.recv()).await
3730 {
3731 assert!(resp.start.is_some());
3732 assert!(resp.end.is_none());
3733 assert!(resp.ts_init > 0);
3734 }
3735 }
3736
3737 #[tokio::test]
3738 async fn test_request_instruments_with_end_only() {
3739 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3741 set_data_event_sender(sender);
3742
3743 let client_id = ClientId::from("DYDX-TS-END-ONLY");
3744 let config = DydxDataClientConfig::default();
3745 let http_client = DydxHttpClient::default();
3746 let client =
3747 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3748
3749 let now = Utc::now();
3750 let end = Some(now);
3751
3752 let request = RequestInstruments::new(
3753 None,
3754 end,
3755 Some(client_id),
3756 Some(*DYDX_VENUE),
3757 UUID4::new(),
3758 get_atomic_clock_realtime().get_time_ns(),
3759 None,
3760 );
3761
3762 assert!(client.request_instruments(&request).is_ok());
3763
3764 let timeout = tokio::time::Duration::from_secs(3);
3765 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3766 tokio::time::timeout(timeout, rx.recv()).await
3767 {
3768 assert!(resp.start.is_none());
3769 assert!(resp.end.is_some());
3770 assert!(resp.ts_init > 0);
3771 }
3772 }
3773
3774 #[tokio::test]
3775 async fn test_request_instruments_client_id_fallback() {
3776 setup_test_env();
3778
3779 let client_id = ClientId::from("DYDX-FALLBACK-TEST");
3780 let config = DydxDataClientConfig::default();
3781 let http_client = DydxHttpClient::default();
3782 let client =
3783 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3784
3785 let request = RequestInstruments::new(
3786 None,
3787 None,
3788 None, Some(*DYDX_VENUE),
3790 UUID4::new(),
3791 get_atomic_clock_realtime().get_time_ns(),
3792 None,
3793 );
3794
3795 assert!(client.request_instruments(&request).is_ok());
3797 }
3798
3799 #[tokio::test]
3800 async fn test_request_instruments_with_params() {
3801 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3803 set_data_event_sender(sender);
3804
3805 let client_id = ClientId::from("DYDX-PARAMS-TEST");
3806 let config = DydxDataClientConfig::default();
3807 let http_client = DydxHttpClient::default();
3808 let client =
3809 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3810
3811 let mut params_map = IndexMap::new();
3813 params_map.insert("test_key".to_string(), "test_value".to_string());
3814
3815 let request = RequestInstruments::new(
3816 None,
3817 None,
3818 Some(client_id),
3819 Some(*DYDX_VENUE),
3820 UUID4::new(),
3821 get_atomic_clock_realtime().get_time_ns(),
3822 Some(params_map),
3823 );
3824
3825 assert!(client.request_instruments(&request).is_ok());
3826
3827 let timeout = tokio::time::Duration::from_secs(3);
3829 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3830 tokio::time::timeout(timeout, rx.recv()).await
3831 {
3832 assert_eq!(resp.client_id, client_id);
3834 let params = resp
3835 .params
3836 .expect("expected params to be present in InstrumentsResponse");
3837 assert_eq!(
3838 params.get("test_key").map(String::as_str),
3839 Some("test_value")
3840 );
3841 }
3842 }
3843
3844 #[tokio::test]
3845 async fn test_request_instruments_with_start_and_end_range() {
3846 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3848 set_data_event_sender(sender);
3849
3850 let client_id = ClientId::from("DYDX-START-END-RANGE");
3851 let config = DydxDataClientConfig::default();
3852 let http_client = DydxHttpClient::default();
3853 let client =
3854 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3855
3856 let now = Utc::now();
3857 let start = Some(now - chrono::Duration::hours(48));
3858 let end = Some(now - chrono::Duration::hours(24));
3859
3860 let request = RequestInstruments::new(
3861 start,
3862 end,
3863 Some(client_id),
3864 Some(*DYDX_VENUE),
3865 UUID4::new(),
3866 get_atomic_clock_realtime().get_time_ns(),
3867 None,
3868 );
3869
3870 assert!(client.request_instruments(&request).is_ok());
3871
3872 let timeout = tokio::time::Duration::from_secs(3);
3873 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3874 tokio::time::timeout(timeout, rx.recv()).await
3875 {
3876 assert!(
3878 resp.start.is_some(),
3879 "start timestamp should be present when provided"
3880 );
3881 assert!(
3882 resp.end.is_some(),
3883 "end timestamp should be present when provided"
3884 );
3885 assert!(resp.ts_init > 0, "ts_init should always be set");
3886
3887 if let (Some(start_ts), Some(end_ts)) = (resp.start, resp.end) {
3889 assert!(
3890 start_ts < end_ts,
3891 "start timestamp should be before end timestamp"
3892 );
3893 }
3894 }
3895 }
3896
3897 #[tokio::test]
3898 async fn test_request_instruments_different_client_ids() {
3899 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3901 set_data_event_sender(sender);
3902
3903 let timeout = tokio::time::Duration::from_secs(3);
3904
3905 let client_id_1 = ClientId::from("DYDX-CLIENT-1");
3907 let config1 = DydxDataClientConfig::default();
3908 let http_client1 = DydxHttpClient::default();
3909 let client1 =
3910 DydxDataClient::new(client_id_1, config1, http_client1, create_test_ws_client())
3911 .unwrap();
3912
3913 let request1 = RequestInstruments::new(
3914 None,
3915 None,
3916 Some(client_id_1),
3917 Some(*DYDX_VENUE),
3918 UUID4::new(),
3919 get_atomic_clock_realtime().get_time_ns(),
3920 None,
3921 );
3922
3923 assert!(client1.request_instruments(&request1).is_ok());
3924
3925 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp1)))) =
3926 tokio::time::timeout(timeout, rx.recv()).await
3927 {
3928 assert_eq!(
3929 resp1.client_id, client_id_1,
3930 "Response should contain client_id_1"
3931 );
3932 }
3933
3934 let client_id_2 = ClientId::from("DYDX-CLIENT-2");
3936 let config2 = DydxDataClientConfig::default();
3937 let http_client2 = DydxHttpClient::default();
3938 let client2 =
3939 DydxDataClient::new(client_id_2, config2, http_client2, create_test_ws_client())
3940 .unwrap();
3941
3942 let request2 = RequestInstruments::new(
3943 None,
3944 None,
3945 Some(client_id_2),
3946 Some(*DYDX_VENUE),
3947 UUID4::new(),
3948 get_atomic_clock_realtime().get_time_ns(),
3949 None,
3950 );
3951
3952 assert!(client2.request_instruments(&request2).is_ok());
3953
3954 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp2)))) =
3955 tokio::time::timeout(timeout, rx.recv()).await
3956 {
3957 assert_eq!(
3958 resp2.client_id, client_id_2,
3959 "Response should contain client_id_2"
3960 );
3961 assert_ne!(
3962 resp2.client_id, client_id_1,
3963 "Different clients should have different client_ids"
3964 );
3965 }
3966 }
3967
3968 #[tokio::test]
3969 async fn test_request_instruments_no_timestamps() {
3970 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3972 set_data_event_sender(sender);
3973
3974 let client_id = ClientId::from("DYDX-NO-TIMESTAMPS");
3975 let config = DydxDataClientConfig::default();
3976 let http_client = DydxHttpClient::default();
3977 let client =
3978 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
3979
3980 let request = RequestInstruments::new(
3981 None, None, Some(client_id),
3984 Some(*DYDX_VENUE),
3985 UUID4::new(),
3986 get_atomic_clock_realtime().get_time_ns(),
3987 None,
3988 );
3989
3990 assert!(client.request_instruments(&request).is_ok());
3991
3992 let timeout = tokio::time::Duration::from_secs(5);
3993 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3994 tokio::time::timeout(timeout, rx.recv()).await
3995 {
3996 assert!(
3998 resp.start.is_none(),
3999 "start should be None when not provided"
4000 );
4001 assert!(resp.end.is_none(), "end should be None when not provided");
4002
4003 assert_eq!(resp.venue, *DYDX_VENUE);
4005 assert_eq!(resp.client_id, client_id);
4006 assert!(resp.ts_init > 0);
4007 }
4008 }
4009
4010 #[tokio::test]
4011 async fn test_request_instrument_cache_hit() {
4012 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4014 set_data_event_sender(sender);
4015
4016 let client_id = ClientId::from("DYDX-CACHE-HIT");
4017 let config = DydxDataClientConfig::default();
4018 let http_client = DydxHttpClient::default();
4019 let client =
4020 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4021
4022 let instrument = create_test_instrument_any();
4024 let instrument_id = instrument.id();
4025 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4026 client.instruments.insert(symbol_key, instrument.clone());
4027
4028 let request = RequestInstrument::new(
4029 instrument_id,
4030 None,
4031 None,
4032 Some(client_id),
4033 UUID4::new(),
4034 get_atomic_clock_realtime().get_time_ns(),
4035 None,
4036 );
4037
4038 assert!(client.request_instrument(&request).is_ok());
4039
4040 let timeout = tokio::time::Duration::from_millis(500);
4042 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4043 tokio::time::timeout(timeout, rx.recv()).await
4044 {
4045 assert_eq!(resp.instrument_id, instrument_id);
4046 assert_eq!(resp.client_id, client_id);
4047 assert_eq!(resp.data.id(), instrument_id);
4048 }
4049 }
4050
4051 #[tokio::test]
4052 async fn test_request_instrument_cache_miss() {
4053 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4055 set_data_event_sender(sender);
4056
4057 let client_id = ClientId::from("DYDX-CACHE-MISS");
4058 let config = DydxDataClientConfig::default();
4059 let http_client = DydxHttpClient::default();
4060 let client =
4061 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4062
4063 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
4064
4065 let request = RequestInstrument::new(
4066 instrument_id,
4067 None,
4068 None,
4069 Some(client_id),
4070 UUID4::new(),
4071 get_atomic_clock_realtime().get_time_ns(),
4072 None,
4073 );
4074
4075 assert!(client.request_instrument(&request).is_ok());
4076
4077 let timeout = tokio::time::Duration::from_secs(5);
4079 let result = tokio::time::timeout(timeout, rx.recv()).await;
4080
4081 match result {
4083 Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) => {
4084 assert_eq!(resp.instrument_id, instrument_id);
4085 assert_eq!(resp.client_id, client_id);
4086 }
4087 Ok(Some(_)) => panic!("Expected InstrumentResponse"),
4088 Ok(None) => panic!("Channel closed unexpectedly"),
4089 Err(_) => {
4090 println!("Test timed out - testnet may be unreachable");
4091 }
4092 }
4093 }
4094
4095 #[tokio::test]
4096 async fn test_request_instrument_not_found() {
4097 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4099 set_data_event_sender(sender);
4100
4101 let client_id = ClientId::from("DYDX-NOT-FOUND");
4102 let config = DydxDataClientConfig {
4103 base_url_http: Some("http://invalid-url.local".to_string()),
4104 ..Default::default()
4105 };
4106 let http_client = DydxHttpClient::new(
4107 config.base_url_http.clone(),
4108 config.http_timeout_secs,
4109 config.http_proxy_url.clone(),
4110 config.is_testnet,
4111 None,
4112 )
4113 .unwrap();
4114
4115 let client =
4116 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4117
4118 let instrument_id = InstrumentId::from("INVALID-SYMBOL.DYDX");
4119
4120 let request = RequestInstrument::new(
4121 instrument_id,
4122 None,
4123 None,
4124 Some(client_id),
4125 UUID4::new(),
4126 get_atomic_clock_realtime().get_time_ns(),
4127 None,
4128 );
4129
4130 assert!(client.request_instrument(&request).is_ok());
4132
4133 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
4135 }
4136
4137 #[tokio::test]
4138 async fn test_request_instrument_bulk_caching() {
4139 setup_test_env();
4141
4142 let client_id = ClientId::from("DYDX-BULK-CACHE");
4143 let config = DydxDataClientConfig::default();
4144 let http_client = DydxHttpClient::default();
4145 let client =
4146 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4147
4148 let initial_cache_size = client.instruments.len();
4149
4150 let instrument_id = InstrumentId::from("ETH-USD-PERP.DYDX");
4151
4152 let request = RequestInstrument::new(
4153 instrument_id,
4154 None,
4155 None,
4156 Some(client_id),
4157 UUID4::new(),
4158 get_atomic_clock_realtime().get_time_ns(),
4159 None,
4160 );
4161
4162 assert!(client.request_instrument(&request).is_ok());
4163
4164 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
4166
4167 let final_cache_size = client.instruments.len();
4169 assert!(final_cache_size >= initial_cache_size);
4170 }
4172
4173 #[tokio::test]
4174 async fn test_request_instrument_correlation_id() {
4175 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4177 set_data_event_sender(sender);
4178
4179 let client_id = ClientId::from("DYDX-CORR-ID");
4180 let config = DydxDataClientConfig::default();
4181 let http_client = DydxHttpClient::default();
4182 let client =
4183 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4184
4185 let instrument = create_test_instrument_any();
4187 let instrument_id = instrument.id();
4188 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4189 client.instruments.insert(symbol_key, instrument.clone());
4190
4191 let request_id = UUID4::new();
4192 let request = RequestInstrument::new(
4193 instrument_id,
4194 None,
4195 None,
4196 Some(client_id),
4197 request_id,
4198 get_atomic_clock_realtime().get_time_ns(),
4199 None,
4200 );
4201
4202 assert!(client.request_instrument(&request).is_ok());
4203
4204 let timeout = tokio::time::Duration::from_millis(500);
4206 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4207 tokio::time::timeout(timeout, rx.recv()).await
4208 {
4209 assert_eq!(resp.correlation_id, request_id);
4210 }
4211 }
4212
4213 #[tokio::test]
4214 async fn test_request_instrument_response_format_boxed() {
4215 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4217 set_data_event_sender(sender);
4218
4219 let client_id = ClientId::from("DYDX-BOXED");
4220 let config = DydxDataClientConfig::default();
4221 let http_client = DydxHttpClient::default();
4222 let client =
4223 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4224
4225 let instrument = create_test_instrument_any();
4227 let instrument_id = instrument.id();
4228 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4229 client.instruments.insert(symbol_key, instrument.clone());
4230
4231 let request = RequestInstrument::new(
4232 instrument_id,
4233 None,
4234 None,
4235 Some(client_id),
4236 UUID4::new(),
4237 get_atomic_clock_realtime().get_time_ns(),
4238 None,
4239 );
4240
4241 assert!(client.request_instrument(&request).is_ok());
4242
4243 let timeout = tokio::time::Duration::from_millis(500);
4245 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(boxed_resp)))) =
4246 tokio::time::timeout(timeout, rx.recv()).await
4247 {
4248 assert_eq!(boxed_resp.instrument_id, instrument_id);
4250 assert_eq!(boxed_resp.client_id, client_id);
4251 assert!(boxed_resp.start.is_none());
4252 assert!(boxed_resp.end.is_none());
4253 assert!(boxed_resp.ts_init > 0);
4254 }
4255 }
4256
4257 #[rstest]
4258 fn test_request_instrument_symbol_extraction() {
4259 setup_test_env();
4261
4262 let client_id = ClientId::from("DYDX-SYMBOL");
4263 let config = DydxDataClientConfig::default();
4264 let http_client = DydxHttpClient::default();
4265 let _client =
4266 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4267
4268 let test_cases = vec![
4271 ("BTC-USD-PERP.DYDX", "BTC-USD-PERP"),
4272 ("ETH-USD-PERP.DYDX", "ETH-USD-PERP"),
4273 ("SOL-USD-PERP.DYDX", "SOL-USD-PERP"),
4274 ];
4275
4276 for (instrument_id_str, expected_symbol) in test_cases {
4277 let instrument_id = InstrumentId::from(instrument_id_str);
4278 let symbol = Ustr::from(instrument_id.symbol.as_str());
4279 assert_eq!(symbol.as_str(), expected_symbol);
4280 }
4281 }
4282
4283 #[tokio::test]
4284 async fn test_request_instrument_client_id_fallback() {
4285 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4287 set_data_event_sender(sender);
4288
4289 let client_id = ClientId::from("DYDX-FALLBACK");
4290 let config = DydxDataClientConfig::default();
4291 let http_client = DydxHttpClient::default();
4292 let client =
4293 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4294
4295 let instrument = create_test_instrument_any();
4297 let instrument_id = instrument.id();
4298 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4299 client.instruments.insert(symbol_key, instrument.clone());
4300
4301 let request = RequestInstrument::new(
4302 instrument_id,
4303 None,
4304 None,
4305 None, UUID4::new(),
4307 get_atomic_clock_realtime().get_time_ns(),
4308 None,
4309 );
4310
4311 assert!(client.request_instrument(&request).is_ok());
4312
4313 let timeout = tokio::time::Duration::from_millis(500);
4315 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4316 tokio::time::timeout(timeout, rx.recv()).await
4317 {
4318 assert_eq!(resp.client_id, client_id);
4319 }
4320 }
4321
4322 #[tokio::test]
4323 async fn test_request_trades_success_with_limit_and_symbol_conversion() {
4324 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4325 set_data_event_sender(sender);
4326
4327 let created_at = Utc::now();
4328
4329 let http_trade = crate::http::models::Trade {
4330 id: "trade-1".to_string(),
4331 side: OrderSide::Buy,
4332 size: dec!(1.5),
4333 price: dec!(100.25),
4334 created_at,
4335 created_at_height: 1,
4336 trade_type: crate::common::enums::DydxTradeType::Limit,
4337 };
4338
4339 let trades_response = crate::http::models::TradesResponse {
4340 trades: vec![http_trade],
4341 };
4342
4343 let state = TradesTestState {
4344 response: Arc::new(trades_response),
4345 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4346 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4347 };
4348
4349 let addr = start_trades_test_server(state.clone()).await;
4350 let base_url = format!("http://{addr}");
4351
4352 let client_id = ClientId::from("DYDX-TRADES-SUCCESS");
4353 let config = DydxDataClientConfig {
4354 base_url_http: Some(base_url),
4355 is_testnet: true,
4356 ..Default::default()
4357 };
4358
4359 let http_client = DydxHttpClient::new(
4360 config.base_url_http.clone(),
4361 config.http_timeout_secs,
4362 config.http_proxy_url.clone(),
4363 config.is_testnet,
4364 None,
4365 )
4366 .unwrap();
4367
4368 let client =
4369 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4370
4371 let instrument = create_test_instrument_any();
4372 let instrument_id = instrument.id();
4373 let price_precision = instrument.price_precision();
4374 let size_precision = instrument.size_precision();
4375 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4376 client.instruments.insert(symbol_key, instrument);
4377
4378 let request_id = UUID4::new();
4379 let now = Utc::now();
4380 let start = Some(now - chrono::Duration::seconds(10));
4381 let end = Some(now + chrono::Duration::seconds(10));
4382 let limit = std::num::NonZeroUsize::new(100).unwrap();
4383
4384 let request = RequestTrades::new(
4385 instrument_id,
4386 start,
4387 end,
4388 Some(limit),
4389 Some(client_id),
4390 request_id,
4391 get_atomic_clock_realtime().get_time_ns(),
4392 None,
4393 );
4394
4395 assert!(client.request_trades(&request).is_ok());
4396
4397 let timeout = tokio::time::Duration::from_secs(1);
4398 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4399 tokio::time::timeout(timeout, rx.recv()).await
4400 {
4401 assert_eq!(resp.correlation_id, request_id);
4402 assert_eq!(resp.client_id, client_id);
4403 assert_eq!(resp.instrument_id, instrument_id);
4404 assert_eq!(resp.data.len(), 1);
4405
4406 let tick = &resp.data[0];
4407 assert_eq!(tick.instrument_id, instrument_id);
4408 assert_eq!(tick.price, Price::new(100.25, price_precision));
4409 assert_eq!(tick.size, Quantity::new(1.5, size_precision));
4410 assert_eq!(tick.trade_id.to_string(), "trade-1");
4411 assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
4412 } else {
4413 panic!("did not receive trades response in time");
4414 }
4415
4416 let last_ticker = state.last_ticker.lock().await.clone();
4418 assert_eq!(last_ticker.as_deref(), Some("BTC-USD"));
4419
4420 let last_limit = *state.last_limit.lock().await;
4421 assert_eq!(last_limit, Some(Some(100)));
4422 }
4423
4424 #[tokio::test]
4425 async fn test_request_trades_empty_response_and_no_limit() {
4426 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4427 set_data_event_sender(sender);
4428
4429 let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4430
4431 let state = TradesTestState {
4432 response: Arc::new(trades_response),
4433 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4434 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4435 };
4436
4437 let addr = start_trades_test_server(state.clone()).await;
4438 let base_url = format!("http://{addr}");
4439
4440 let client_id = ClientId::from("DYDX-TRADES-EMPTY");
4441 let config = DydxDataClientConfig {
4442 base_url_http: Some(base_url),
4443 is_testnet: true,
4444 ..Default::default()
4445 };
4446
4447 let http_client = DydxHttpClient::new(
4448 config.base_url_http.clone(),
4449 config.http_timeout_secs,
4450 config.http_proxy_url.clone(),
4451 config.is_testnet,
4452 None,
4453 )
4454 .unwrap();
4455
4456 let client =
4457 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4458
4459 let instrument = create_test_instrument_any();
4460 let instrument_id = instrument.id();
4461 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4462 client.instruments.insert(symbol_key, instrument);
4463
4464 let request_id = UUID4::new();
4465
4466 let request = RequestTrades::new(
4467 instrument_id,
4468 None,
4469 None,
4470 None, Some(client_id),
4472 request_id,
4473 get_atomic_clock_realtime().get_time_ns(),
4474 None,
4475 );
4476
4477 assert!(client.request_trades(&request).is_ok());
4478
4479 let timeout = tokio::time::Duration::from_secs(1);
4480 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4481 tokio::time::timeout(timeout, rx.recv()).await
4482 {
4483 assert_eq!(resp.correlation_id, request_id);
4484 assert_eq!(resp.client_id, client_id);
4485 assert_eq!(resp.instrument_id, instrument_id);
4486 assert!(resp.data.is_empty());
4487 } else {
4488 panic!("did not receive trades response in time");
4489 }
4490
4491 let last_limit = *state.last_limit.lock().await;
4493 assert_eq!(last_limit, Some(None));
4494 }
4495
4496 #[tokio::test]
4497 async fn test_request_trades_timestamp_filtering() {
4498 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4499 set_data_event_sender(sender);
4500
4501 let now = Utc::now();
4502 let trade_before = crate::http::models::Trade {
4503 id: "before".to_string(),
4504 side: OrderSide::Buy,
4505 size: dec!(1.0),
4506 price: dec!(100.0),
4507 created_at: now - chrono::Duration::seconds(60),
4508 created_at_height: 1,
4509 trade_type: crate::common::enums::DydxTradeType::Limit,
4510 };
4511 let trade_inside = crate::http::models::Trade {
4512 id: "inside".to_string(),
4513 side: OrderSide::Sell,
4514 size: dec!(2.0),
4515 price: dec!(101.0),
4516 created_at: now,
4517 created_at_height: 2,
4518 trade_type: crate::common::enums::DydxTradeType::Limit,
4519 };
4520 let trade_after = crate::http::models::Trade {
4521 id: "after".to_string(),
4522 side: OrderSide::Buy,
4523 size: dec!(3.0),
4524 price: dec!(102.0),
4525 created_at: now + chrono::Duration::seconds(60),
4526 created_at_height: 3,
4527 trade_type: crate::common::enums::DydxTradeType::Limit,
4528 };
4529
4530 let trades_response = crate::http::models::TradesResponse {
4531 trades: vec![trade_before, trade_inside.clone(), trade_after],
4532 };
4533
4534 let state = TradesTestState {
4535 response: Arc::new(trades_response),
4536 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4537 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4538 };
4539
4540 let addr = start_trades_test_server(state).await;
4541 let base_url = format!("http://{addr}");
4542
4543 let client_id = ClientId::from("DYDX-TRADES-FILTER");
4544 let config = DydxDataClientConfig {
4545 base_url_http: Some(base_url),
4546 is_testnet: true,
4547 ..Default::default()
4548 };
4549
4550 let http_client = DydxHttpClient::new(
4551 config.base_url_http.clone(),
4552 config.http_timeout_secs,
4553 config.http_proxy_url.clone(),
4554 config.is_testnet,
4555 None,
4556 )
4557 .unwrap();
4558
4559 let client =
4560 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4561
4562 let instrument = create_test_instrument_any();
4563 let instrument_id = instrument.id();
4564 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4565 client.instruments.insert(symbol_key, instrument);
4566
4567 let request_id = UUID4::new();
4568
4569 let start = Some(now - chrono::Duration::seconds(10));
4571 let end = Some(now + chrono::Duration::seconds(10));
4572
4573 let request = RequestTrades::new(
4574 instrument_id,
4575 start,
4576 end,
4577 None,
4578 Some(client_id),
4579 request_id,
4580 get_atomic_clock_realtime().get_time_ns(),
4581 None,
4582 );
4583
4584 assert!(client.request_trades(&request).is_ok());
4585
4586 let timeout = tokio::time::Duration::from_secs(1);
4587 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4588 tokio::time::timeout(timeout, rx.recv()).await
4589 {
4590 assert_eq!(resp.correlation_id, request_id);
4591 assert_eq!(resp.client_id, client_id);
4592 assert_eq!(resp.instrument_id, instrument_id);
4593 assert_eq!(resp.data.len(), 1);
4594
4595 let tick = &resp.data[0];
4596 assert_eq!(tick.trade_id.to_string(), "inside");
4597 assert_eq!(tick.price.as_decimal(), dec!(101.0));
4598 } else {
4599 panic!("did not receive trades response in time");
4600 }
4601 }
4602
4603 #[tokio::test]
4604 async fn test_request_trades_correlation_id_matching() {
4605 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4607 set_data_event_sender(sender);
4608
4609 let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4610
4611 let state = TradesTestState {
4612 response: Arc::new(trades_response),
4613 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4614 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4615 };
4616
4617 let addr = start_trades_test_server(state).await;
4618 let base_url = format!("http://{addr}");
4619
4620 let client_id = ClientId::from("DYDX-TRADES-CORR");
4621 let config = DydxDataClientConfig {
4622 base_url_http: Some(base_url),
4623 is_testnet: true,
4624 ..Default::default()
4625 };
4626
4627 let http_client = DydxHttpClient::new(
4628 config.base_url_http.clone(),
4629 config.http_timeout_secs,
4630 config.http_proxy_url.clone(),
4631 config.is_testnet,
4632 None,
4633 )
4634 .unwrap();
4635
4636 let client =
4637 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4638
4639 let instrument = create_test_instrument_any();
4640 let instrument_id = instrument.id();
4641 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4642 client.instruments.insert(symbol_key, instrument);
4643
4644 let request_id = UUID4::new();
4645 let request = RequestTrades::new(
4646 instrument_id,
4647 None,
4648 None,
4649 None,
4650 Some(client_id),
4651 request_id,
4652 get_atomic_clock_realtime().get_time_ns(),
4653 None,
4654 );
4655
4656 assert!(client.request_trades(&request).is_ok());
4657
4658 let timeout = tokio::time::Duration::from_millis(500);
4659 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4660 tokio::time::timeout(timeout, rx.recv()).await
4661 {
4662 assert_eq!(resp.correlation_id, request_id);
4663 }
4664 }
4665
4666 #[tokio::test]
4667 async fn test_request_trades_response_format() {
4668 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4670 set_data_event_sender(sender);
4671
4672 let created_at = Utc::now();
4673 let http_trade = crate::http::models::Trade {
4674 id: "format-test".to_string(),
4675 side: OrderSide::Sell,
4676 size: dec!(5.0),
4677 price: dec!(200.0),
4678 created_at,
4679 created_at_height: 100,
4680 trade_type: crate::common::enums::DydxTradeType::Limit,
4681 };
4682
4683 let trades_response = crate::http::models::TradesResponse {
4684 trades: vec![http_trade],
4685 };
4686
4687 let state = TradesTestState {
4688 response: Arc::new(trades_response),
4689 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4690 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4691 };
4692
4693 let addr = start_trades_test_server(state).await;
4694 let base_url = format!("http://{addr}");
4695
4696 let client_id = ClientId::from("DYDX-TRADES-FORMAT");
4697 let config = DydxDataClientConfig {
4698 base_url_http: Some(base_url),
4699 is_testnet: true,
4700 ..Default::default()
4701 };
4702
4703 let http_client = DydxHttpClient::new(
4704 config.base_url_http.clone(),
4705 config.http_timeout_secs,
4706 config.http_proxy_url.clone(),
4707 config.is_testnet,
4708 None,
4709 )
4710 .unwrap();
4711
4712 let client =
4713 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4714
4715 let instrument = create_test_instrument_any();
4716 let instrument_id = instrument.id();
4717 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4718 client.instruments.insert(symbol_key, instrument);
4719
4720 let request = RequestTrades::new(
4721 instrument_id,
4722 None,
4723 None,
4724 None,
4725 Some(client_id),
4726 UUID4::new(),
4727 get_atomic_clock_realtime().get_time_ns(),
4728 None,
4729 );
4730
4731 assert!(client.request_trades(&request).is_ok());
4732
4733 let timeout = tokio::time::Duration::from_millis(500);
4734 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4735 tokio::time::timeout(timeout, rx.recv()).await
4736 {
4737 assert_eq!(resp.client_id, client_id);
4739 assert_eq!(resp.instrument_id, instrument_id);
4740 assert!(resp.data.len() == 1);
4741 assert!(resp.ts_init > 0);
4742
4743 let tick = &resp.data[0];
4745 assert_eq!(tick.instrument_id, instrument_id);
4746 assert!(tick.ts_event > 0);
4747 assert!(tick.ts_init > 0);
4748 }
4749 }
4750
4751 #[tokio::test]
4752 async fn test_request_trades_no_instrument_in_cache() {
4753 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4755 set_data_event_sender(sender);
4756
4757 let client_id = ClientId::from("DYDX-TRADES-NO-INST");
4758 let config = DydxDataClientConfig::default();
4759 let http_client = DydxHttpClient::default();
4760 let client =
4761 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4762
4763 let instrument_id = InstrumentId::from("UNKNOWN-SYMBOL.DYDX");
4765
4766 let request = RequestTrades::new(
4767 instrument_id,
4768 None,
4769 None,
4770 None,
4771 Some(client_id),
4772 UUID4::new(),
4773 get_atomic_clock_realtime().get_time_ns(),
4774 None,
4775 );
4776
4777 assert!(client.request_trades(&request).is_ok());
4778
4779 let timeout = tokio::time::Duration::from_millis(500);
4781 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4782 tokio::time::timeout(timeout, rx.recv()).await
4783 {
4784 assert!(resp.data.is_empty());
4785 }
4786 }
4787
4788 #[tokio::test]
4789 async fn test_request_trades_limit_parameter() {
4790 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4792 set_data_event_sender(sender);
4793
4794 let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4795
4796 let state = TradesTestState {
4797 response: Arc::new(trades_response),
4798 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4799 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4800 };
4801
4802 let addr = start_trades_test_server(state.clone()).await;
4803 let base_url = format!("http://{addr}");
4804
4805 let client_id = ClientId::from("DYDX-TRADES-LIMIT");
4806 let config = DydxDataClientConfig {
4807 base_url_http: Some(base_url),
4808 is_testnet: true,
4809 ..Default::default()
4810 };
4811
4812 let http_client = DydxHttpClient::new(
4813 config.base_url_http.clone(),
4814 config.http_timeout_secs,
4815 config.http_proxy_url.clone(),
4816 config.is_testnet,
4817 None,
4818 )
4819 .unwrap();
4820
4821 let client =
4822 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4823
4824 let instrument = create_test_instrument_any();
4825 let instrument_id = instrument.id();
4826 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4827 client.instruments.insert(symbol_key, instrument);
4828
4829 let limit = std::num::NonZeroUsize::new(500).unwrap();
4831 let request = RequestTrades::new(
4832 instrument_id,
4833 None,
4834 None,
4835 Some(limit),
4836 Some(client_id),
4837 UUID4::new(),
4838 get_atomic_clock_realtime().get_time_ns(),
4839 None,
4840 );
4841
4842 assert!(client.request_trades(&request).is_ok());
4843
4844 let state_clone = state.clone();
4845 wait_until_async(
4846 || async { state_clone.last_limit.lock().await.is_some() },
4847 Duration::from_secs(5),
4848 )
4849 .await;
4850
4851 let last_limit = *state.last_limit.lock().await;
4853 assert_eq!(last_limit, Some(Some(500)));
4854 }
4855
4856 #[rstest]
4857 fn test_request_trades_symbol_conversion() {
4858 setup_test_env();
4860
4861 let client_id = ClientId::from("DYDX-SYMBOL-CONV");
4862 let config = DydxDataClientConfig::default();
4863 let http_client = DydxHttpClient::default();
4864 let _client =
4865 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4866
4867 let test_cases = vec![
4869 ("BTC-USD-PERP.DYDX", "BTC-USD"),
4870 ("ETH-USD-PERP.DYDX", "ETH-USD"),
4871 ("SOL-USD-PERP.DYDX", "SOL-USD"),
4872 ];
4873
4874 for (instrument_id_str, expected_ticker) in test_cases {
4875 let instrument_id = InstrumentId::from(instrument_id_str);
4876 let ticker = instrument_id
4877 .symbol
4878 .as_str()
4879 .trim_end_matches("-PERP")
4880 .to_string();
4881 assert_eq!(ticker, expected_ticker);
4882 }
4883 }
4884
4885 #[tokio::test]
4886 async fn test_http_404_handling() {
4887 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4889 set_data_event_sender(sender);
4890
4891 let client_id = ClientId::from("DYDX-404");
4892 let config = DydxDataClientConfig {
4893 base_url_http: Some("http://localhost:1/nonexistent".to_string()),
4894 http_timeout_secs: Some(1),
4895 ..Default::default()
4896 };
4897
4898 let http_client = DydxHttpClient::new(
4899 config.base_url_http.clone(),
4900 config.http_timeout_secs,
4901 config.http_proxy_url.clone(),
4902 config.is_testnet,
4903 None,
4904 )
4905 .unwrap();
4906
4907 let client =
4908 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4909
4910 let request = RequestInstruments::new(
4911 None,
4912 None,
4913 Some(client_id),
4914 Some(*DYDX_VENUE),
4915 UUID4::new(),
4916 get_atomic_clock_realtime().get_time_ns(),
4917 None,
4918 );
4919
4920 assert!(client.request_instruments(&request).is_ok());
4921
4922 let timeout = tokio::time::Duration::from_secs(2);
4924 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
4925 tokio::time::timeout(timeout, rx.recv()).await
4926 {
4927 assert!(resp.data.is_empty(), "Expected empty response on 404");
4928 }
4929 }
4930
4931 #[tokio::test]
4932 async fn test_http_500_handling() {
4933 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4935 set_data_event_sender(sender);
4936
4937 let client_id = ClientId::from("DYDX-500");
4938 let config = DydxDataClientConfig {
4939 base_url_http: Some("http://httpstat.us/500".to_string()),
4940 http_timeout_secs: Some(2),
4941 ..Default::default()
4942 };
4943
4944 let http_client = DydxHttpClient::new(
4945 config.base_url_http.clone(),
4946 config.http_timeout_secs,
4947 config.http_proxy_url.clone(),
4948 config.is_testnet,
4949 None,
4950 )
4951 .unwrap();
4952
4953 let client =
4954 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
4955
4956 let instrument = create_test_instrument_any();
4957 let instrument_id = instrument.id();
4958 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4959 client.instruments.insert(symbol_key, instrument);
4960
4961 let request = RequestTrades::new(
4962 instrument_id,
4963 None,
4964 None,
4965 None,
4966 Some(client_id),
4967 UUID4::new(),
4968 get_atomic_clock_realtime().get_time_ns(),
4969 None,
4970 );
4971
4972 assert!(client.request_trades(&request).is_ok());
4973
4974 let timeout = tokio::time::Duration::from_secs(3);
4976 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4977 tokio::time::timeout(timeout, rx.recv()).await
4978 {
4979 assert!(resp.data.is_empty(), "Expected empty response on 500");
4980 }
4981 }
4982
4983 #[tokio::test]
4984 async fn test_network_timeout_handling() {
4985 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4987 set_data_event_sender(sender);
4988
4989 let client_id = ClientId::from("DYDX-TIMEOUT");
4990 let config = DydxDataClientConfig {
4991 base_url_http: Some("http://10.255.255.1:81".to_string()), http_timeout_secs: Some(1), ..Default::default()
4994 };
4995
4996 let http_client = DydxHttpClient::new(
4997 config.base_url_http.clone(),
4998 config.http_timeout_secs,
4999 config.http_proxy_url.clone(),
5000 config.is_testnet,
5001 None,
5002 )
5003 .unwrap();
5004
5005 let client =
5006 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5007
5008 let request = RequestInstruments::new(
5009 None,
5010 None,
5011 Some(client_id),
5012 Some(*DYDX_VENUE),
5013 UUID4::new(),
5014 get_atomic_clock_realtime().get_time_ns(),
5015 None,
5016 );
5017
5018 assert!(client.request_instruments(&request).is_ok());
5019
5020 let timeout = tokio::time::Duration::from_secs(3);
5022 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5023 tokio::time::timeout(timeout, rx.recv()).await
5024 {
5025 assert!(resp.data.is_empty(), "Expected empty response on timeout");
5026 }
5027 }
5028
5029 #[tokio::test]
5030 async fn test_connection_refused_handling() {
5031 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5033 set_data_event_sender(sender);
5034
5035 let client_id = ClientId::from("DYDX-REFUSED");
5036 let config = DydxDataClientConfig {
5037 base_url_http: Some("http://localhost:9999".to_string()), http_timeout_secs: Some(1),
5039 ..Default::default()
5040 };
5041
5042 let http_client = DydxHttpClient::new(
5043 config.base_url_http.clone(),
5044 config.http_timeout_secs,
5045 config.http_proxy_url.clone(),
5046 config.is_testnet,
5047 None,
5048 )
5049 .unwrap();
5050
5051 let client =
5052 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5053
5054 let instrument = create_test_instrument_any();
5055 let instrument_id = instrument.id();
5056 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5057 client.instruments.insert(symbol_key, instrument);
5058
5059 let request = RequestInstrument::new(
5060 instrument_id,
5061 None,
5062 None,
5063 Some(client_id),
5064 UUID4::new(),
5065 get_atomic_clock_realtime().get_time_ns(),
5066 None,
5067 );
5068
5069 assert!(client.request_instrument(&request).is_ok());
5070
5071 let timeout = tokio::time::Duration::from_secs(2);
5073 let result = tokio::time::timeout(timeout, rx.recv()).await;
5074
5075 match result {
5078 Ok(Some(DataEvent::Response(_))) => {
5079 }
5081 Ok(None) | Err(_) => {
5082 }
5084 _ => {}
5085 }
5086 }
5087
5088 #[tokio::test]
5089 async fn test_dns_resolution_failure_handling() {
5090 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5092 set_data_event_sender(sender);
5093
5094 let client_id = ClientId::from("DYDX-DNS");
5095 let config = DydxDataClientConfig {
5096 base_url_http: Some(
5097 "http://this-domain-definitely-does-not-exist-12345.invalid".to_string(),
5098 ),
5099 http_timeout_secs: Some(2),
5100 ..Default::default()
5101 };
5102
5103 let http_client = DydxHttpClient::new(
5104 config.base_url_http.clone(),
5105 config.http_timeout_secs,
5106 config.http_proxy_url.clone(),
5107 config.is_testnet,
5108 None,
5109 )
5110 .unwrap();
5111
5112 let client =
5113 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5114
5115 let request = RequestInstruments::new(
5116 None,
5117 None,
5118 Some(client_id),
5119 Some(*DYDX_VENUE),
5120 UUID4::new(),
5121 get_atomic_clock_realtime().get_time_ns(),
5122 None,
5123 );
5124
5125 assert!(client.request_instruments(&request).is_ok());
5126
5127 let timeout = tokio::time::Duration::from_secs(3);
5129 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5130 tokio::time::timeout(timeout, rx.recv()).await
5131 {
5132 assert!(
5133 resp.data.is_empty(),
5134 "Expected empty response on DNS failure"
5135 );
5136 }
5137 }
5138
5139 #[tokio::test]
5140 async fn test_http_502_503_handling() {
5141 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5143 set_data_event_sender(sender);
5144
5145 let client_id = ClientId::from("DYDX-503");
5146 let config = DydxDataClientConfig {
5147 base_url_http: Some("http://httpstat.us/503".to_string()),
5148 http_timeout_secs: Some(2),
5149 ..Default::default()
5150 };
5151
5152 let http_client = DydxHttpClient::new(
5153 config.base_url_http.clone(),
5154 config.http_timeout_secs,
5155 config.http_proxy_url.clone(),
5156 config.is_testnet,
5157 None,
5158 )
5159 .unwrap();
5160
5161 let client =
5162 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5163
5164 let request = RequestInstruments::new(
5165 None,
5166 None,
5167 Some(client_id),
5168 Some(*DYDX_VENUE),
5169 UUID4::new(),
5170 get_atomic_clock_realtime().get_time_ns(),
5171 None,
5172 );
5173
5174 assert!(client.request_instruments(&request).is_ok());
5175
5176 let timeout = tokio::time::Duration::from_secs(3);
5178 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5179 tokio::time::timeout(timeout, rx.recv()).await
5180 {
5181 assert!(resp.data.is_empty(), "Expected empty response on 503");
5182 }
5183 }
5184
5185 #[tokio::test]
5186 async fn test_http_429_rate_limit_handling() {
5187 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5189 set_data_event_sender(sender);
5190
5191 let client_id = ClientId::from("DYDX-429");
5192 let config = DydxDataClientConfig {
5193 base_url_http: Some("http://httpstat.us/429".to_string()),
5194 http_timeout_secs: Some(2),
5195 ..Default::default()
5196 };
5197
5198 let http_client = DydxHttpClient::new(
5199 config.base_url_http.clone(),
5200 config.http_timeout_secs,
5201 config.http_proxy_url.clone(),
5202 config.is_testnet,
5203 None,
5204 )
5205 .unwrap();
5206
5207 let client =
5208 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5209
5210 let instrument = create_test_instrument_any();
5211 let instrument_id = instrument.id();
5212 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5213 client.instruments.insert(symbol_key, instrument);
5214
5215 let request = RequestTrades::new(
5216 instrument_id,
5217 None,
5218 None,
5219 None,
5220 Some(client_id),
5221 UUID4::new(),
5222 get_atomic_clock_realtime().get_time_ns(),
5223 None,
5224 );
5225
5226 assert!(client.request_trades(&request).is_ok());
5227
5228 let timeout = tokio::time::Duration::from_secs(3);
5230 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5231 tokio::time::timeout(timeout, rx.recv()).await
5232 {
5233 assert!(
5234 resp.data.is_empty(),
5235 "Expected empty response on rate limit"
5236 );
5237 }
5238 }
5239
5240 #[tokio::test]
5241 async fn test_error_handling_does_not_panic() {
5242 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5244 set_data_event_sender(sender);
5245
5246 let client_id = ClientId::from("DYDX-NO-PANIC");
5247 let config = DydxDataClientConfig {
5248 base_url_http: Some("http://invalid".to_string()),
5249 ..Default::default()
5250 };
5251
5252 let http_client = DydxHttpClient::new(
5253 config.base_url_http.clone(),
5254 config.http_timeout_secs,
5255 config.http_proxy_url.clone(),
5256 config.is_testnet,
5257 None,
5258 )
5259 .unwrap();
5260
5261 let client =
5262 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5263
5264 let request_instruments = RequestInstruments::new(
5266 None,
5267 None,
5268 Some(client_id),
5269 Some(*DYDX_VENUE),
5270 UUID4::new(),
5271 get_atomic_clock_realtime().get_time_ns(),
5272 None,
5273 );
5274 assert!(client.request_instruments(&request_instruments).is_ok());
5275
5276 let instrument_id = InstrumentId::from("INVALID.DYDX");
5277 let request_instrument = RequestInstrument::new(
5278 instrument_id,
5279 None,
5280 None,
5281 Some(client_id),
5282 UUID4::new(),
5283 get_atomic_clock_realtime().get_time_ns(),
5284 None,
5285 );
5286 assert!(client.request_instrument(&request_instrument).is_ok());
5287
5288 let request_trades = RequestTrades::new(
5289 instrument_id,
5290 None,
5291 None,
5292 None,
5293 Some(client_id),
5294 UUID4::new(),
5295 get_atomic_clock_realtime().get_time_ns(),
5296 None,
5297 );
5298 assert!(client.request_trades(&request_trades).is_ok());
5299 }
5300
5301 #[tokio::test]
5302 async fn test_malformed_json_response() {
5303 use axum::{Router, routing::get};
5305
5306 #[derive(Clone)]
5307 struct MalformedState;
5308
5309 async fn malformed_markets_handler() -> String {
5310 r#"{"markets": {"BTC-USD": {"ticker": "BTC-USD""#.to_string()
5312 }
5313
5314 let app = Router::new()
5315 .route("/v4/markets", get(malformed_markets_handler))
5316 .with_state(MalformedState);
5317
5318 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5319 let server_addr = listener.local_addr().unwrap();
5320 let port = server_addr.port();
5321
5322 tokio::spawn(async move {
5323 axum::serve(listener, app).await.unwrap();
5324 });
5325
5326 wait_for_server(server_addr).await;
5327
5328 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5329 set_data_event_sender(sender);
5330
5331 let client_id = ClientId::from("DYDX-MALFORMED");
5332 let config = DydxDataClientConfig {
5333 base_url_http: Some(format!("http://127.0.0.1:{port}")),
5334 http_timeout_secs: Some(2),
5335 ..Default::default()
5336 };
5337
5338 let http_client = DydxHttpClient::new(
5339 config.base_url_http.clone(),
5340 config.http_timeout_secs,
5341 config.http_proxy_url.clone(),
5342 config.is_testnet,
5343 None,
5344 )
5345 .unwrap();
5346
5347 let client =
5348 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5349
5350 let request = RequestInstruments::new(
5351 None,
5352 None,
5353 Some(client_id),
5354 Some(*DYDX_VENUE),
5355 UUID4::new(),
5356 get_atomic_clock_realtime().get_time_ns(),
5357 None,
5358 );
5359
5360 assert!(client.request_instruments(&request).is_ok());
5361
5362 let timeout = tokio::time::Duration::from_secs(3);
5364 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5365 tokio::time::timeout(timeout, rx.recv()).await
5366 {
5367 assert!(
5368 resp.data.is_empty(),
5369 "Expected empty response on malformed JSON"
5370 );
5371 }
5372 }
5373
5374 #[tokio::test]
5375 async fn test_missing_required_fields_in_response() {
5376 use axum::{Json, Router, routing::get};
5378 use serde_json::{Value, json};
5379
5380 #[derive(Clone)]
5381 struct MissingFieldsState;
5382
5383 async fn missing_fields_handler() -> Json<Value> {
5384 Json(json!({
5386 "markets": {
5387 "BTC-USD": {
5388 "status": "ACTIVE",
5390 "baseAsset": "BTC",
5391 "quoteAsset": "USD",
5392 }
5394 }
5395 }))
5396 }
5397
5398 let app = Router::new()
5399 .route("/v4/markets", get(missing_fields_handler))
5400 .with_state(MissingFieldsState);
5401
5402 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5403 let server_addr = listener.local_addr().unwrap();
5404 let port = server_addr.port();
5405
5406 tokio::spawn(async move {
5407 axum::serve(listener, app).await.unwrap();
5408 });
5409
5410 wait_for_server(server_addr).await;
5411
5412 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5413 set_data_event_sender(sender);
5414
5415 let client_id = ClientId::from("DYDX-MISSING");
5416 let config = DydxDataClientConfig {
5417 base_url_http: Some(format!("http://127.0.0.1:{port}")),
5418 http_timeout_secs: Some(2),
5419 ..Default::default()
5420 };
5421
5422 let http_client = DydxHttpClient::new(
5423 config.base_url_http.clone(),
5424 config.http_timeout_secs,
5425 config.http_proxy_url.clone(),
5426 config.is_testnet,
5427 None,
5428 )
5429 .unwrap();
5430
5431 let client =
5432 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5433
5434 let request = RequestInstruments::new(
5435 None,
5436 None,
5437 Some(client_id),
5438 Some(*DYDX_VENUE),
5439 UUID4::new(),
5440 get_atomic_clock_realtime().get_time_ns(),
5441 None,
5442 );
5443
5444 assert!(client.request_instruments(&request).is_ok());
5445
5446 let timeout = tokio::time::Duration::from_secs(3);
5448 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5449 tokio::time::timeout(timeout, rx.recv()).await
5450 {
5451 assert!(resp.correlation_id == request.request_id);
5454 }
5455 }
5456
5457 #[tokio::test]
5458 async fn test_invalid_data_types_in_response() {
5459 use axum::{Json, Router, routing::get};
5461 use serde_json::{Value, json};
5462
5463 #[derive(Clone)]
5464 struct InvalidTypesState;
5465
5466 async fn invalid_types_handler() -> Json<Value> {
5467 Json(json!({
5469 "markets": {
5470 "BTC-USD": {
5471 "ticker": "BTC-USD",
5472 "status": "ACTIVE",
5473 "baseAsset": "BTC",
5474 "quoteAsset": "USD",
5475 "stepSize": "not_a_number", "tickSize": true, "minOrderSize": ["array"], "market": 12345, }
5480 }
5481 }))
5482 }
5483
5484 let app = Router::new()
5485 .route("/v4/markets", get(invalid_types_handler))
5486 .with_state(InvalidTypesState);
5487
5488 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5489 let server_addr = listener.local_addr().unwrap();
5490 let port = server_addr.port();
5491
5492 tokio::spawn(async move {
5493 axum::serve(listener, app).await.unwrap();
5494 });
5495
5496 wait_for_server(server_addr).await;
5497
5498 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5499 set_data_event_sender(sender);
5500
5501 let client_id = ClientId::from("DYDX-TYPES");
5502 let config = DydxDataClientConfig {
5503 base_url_http: Some(format!("http://127.0.0.1:{port}")),
5504 http_timeout_secs: Some(2),
5505 ..Default::default()
5506 };
5507
5508 let http_client = DydxHttpClient::new(
5509 config.base_url_http.clone(),
5510 config.http_timeout_secs,
5511 config.http_proxy_url.clone(),
5512 config.is_testnet,
5513 None,
5514 )
5515 .unwrap();
5516
5517 let client =
5518 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5519
5520 let request = RequestInstruments::new(
5521 None,
5522 None,
5523 Some(client_id),
5524 Some(*DYDX_VENUE),
5525 UUID4::new(),
5526 get_atomic_clock_realtime().get_time_ns(),
5527 None,
5528 );
5529
5530 assert!(client.request_instruments(&request).is_ok());
5531
5532 let timeout = tokio::time::Duration::from_secs(3);
5534 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5535 tokio::time::timeout(timeout, rx.recv()).await
5536 {
5537 assert!(resp.correlation_id == request.request_id);
5539 }
5540 }
5541
5542 #[tokio::test]
5543 async fn test_unexpected_response_structure() {
5544 use axum::{Json, Router, routing::get};
5546 use serde_json::{Value, json};
5547
5548 #[derive(Clone)]
5549 struct UnexpectedState;
5550
5551 async fn unexpected_structure_handler() -> Json<Value> {
5552 Json(json!({
5554 "error": "Something went wrong",
5555 "code": 500,
5556 "data": null,
5557 "unexpected_field": {
5558 "nested": {
5559 "deeply": [1, 2, 3]
5560 }
5561 }
5562 }))
5563 }
5564
5565 let app = Router::new()
5566 .route("/v4/markets", get(unexpected_structure_handler))
5567 .with_state(UnexpectedState);
5568
5569 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5570 let server_addr = listener.local_addr().unwrap();
5571 let port = server_addr.port();
5572
5573 tokio::spawn(async move {
5574 axum::serve(listener, app).await.unwrap();
5575 });
5576
5577 wait_for_server(server_addr).await;
5578
5579 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5580 set_data_event_sender(sender);
5581
5582 let client_id = ClientId::from("DYDX-STRUCT");
5583 let config = DydxDataClientConfig {
5584 base_url_http: Some(format!("http://127.0.0.1:{port}")),
5585 http_timeout_secs: Some(2),
5586 ..Default::default()
5587 };
5588
5589 let http_client = DydxHttpClient::new(
5590 config.base_url_http.clone(),
5591 config.http_timeout_secs,
5592 config.http_proxy_url.clone(),
5593 config.is_testnet,
5594 None,
5595 )
5596 .unwrap();
5597
5598 let client =
5599 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5600
5601 let request = RequestInstruments::new(
5602 None,
5603 None,
5604 Some(client_id),
5605 Some(*DYDX_VENUE),
5606 UUID4::new(),
5607 get_atomic_clock_realtime().get_time_ns(),
5608 None,
5609 );
5610
5611 assert!(client.request_instruments(&request).is_ok());
5612
5613 let timeout = tokio::time::Duration::from_secs(3);
5615 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5616 tokio::time::timeout(timeout, rx.recv()).await
5617 {
5618 assert!(
5619 resp.data.is_empty(),
5620 "Expected empty response on unexpected structure"
5621 );
5622 assert!(resp.correlation_id == request.request_id);
5623 }
5624 }
5625
5626 #[tokio::test]
5627 async fn test_empty_markets_object_in_response() {
5628 use axum::{Json, Router, routing::get};
5630 use serde_json::{Value, json};
5631
5632 #[derive(Clone)]
5633 struct EmptyMarketsState;
5634
5635 async fn empty_markets_handler() -> Json<Value> {
5636 Json(json!({
5637 "markets": {}
5638 }))
5639 }
5640
5641 let app = Router::new()
5642 .route("/v4/markets", get(empty_markets_handler))
5643 .with_state(EmptyMarketsState);
5644
5645 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5646 let server_addr = listener.local_addr().unwrap();
5647 let port = server_addr.port();
5648
5649 tokio::spawn(async move {
5650 axum::serve(listener, app).await.unwrap();
5651 });
5652
5653 wait_for_server(server_addr).await;
5654
5655 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5656 set_data_event_sender(sender);
5657
5658 let client_id = ClientId::from("DYDX-EMPTY");
5659 let config = DydxDataClientConfig {
5660 base_url_http: Some(format!("http://127.0.0.1:{port}")),
5661 http_timeout_secs: Some(2),
5662 ..Default::default()
5663 };
5664
5665 let http_client = DydxHttpClient::new(
5666 config.base_url_http.clone(),
5667 config.http_timeout_secs,
5668 config.http_proxy_url.clone(),
5669 config.is_testnet,
5670 None,
5671 )
5672 .unwrap();
5673
5674 let client =
5675 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5676
5677 let request = RequestInstruments::new(
5678 None,
5679 None,
5680 Some(client_id),
5681 Some(*DYDX_VENUE),
5682 UUID4::new(),
5683 get_atomic_clock_realtime().get_time_ns(),
5684 None,
5685 );
5686
5687 assert!(client.request_instruments(&request).is_ok());
5688
5689 let timeout = tokio::time::Duration::from_secs(3);
5691 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5692 tokio::time::timeout(timeout, rx.recv()).await
5693 {
5694 assert!(
5695 resp.data.is_empty(),
5696 "Expected empty response for empty markets"
5697 );
5698 assert!(resp.correlation_id == request.request_id);
5699 }
5700 }
5701
5702 #[tokio::test]
5703 async fn test_null_values_in_response() {
5704 use axum::{Json, Router, routing::get};
5706 use serde_json::{Value, json};
5707
5708 #[derive(Clone)]
5709 struct NullValuesState;
5710
5711 async fn null_values_handler() -> Json<Value> {
5712 Json(json!({
5713 "markets": {
5714 "BTC-USD": {
5715 "ticker": null,
5716 "status": "ACTIVE",
5717 "baseAsset": null,
5718 "quoteAsset": "USD",
5719 "stepSize": null,
5720 }
5721 }
5722 }))
5723 }
5724
5725 let app = Router::new()
5726 .route("/v4/markets", get(null_values_handler))
5727 .with_state(NullValuesState);
5728
5729 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5730 let server_addr = listener.local_addr().unwrap();
5731 let port = server_addr.port();
5732
5733 tokio::spawn(async move {
5734 axum::serve(listener, app).await.unwrap();
5735 });
5736
5737 wait_for_server(server_addr).await;
5738
5739 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5740 set_data_event_sender(sender);
5741
5742 let client_id = ClientId::from("DYDX-NULL");
5743 let config = DydxDataClientConfig {
5744 base_url_http: Some(format!("http://127.0.0.1:{port}")),
5745 http_timeout_secs: Some(2),
5746 ..Default::default()
5747 };
5748
5749 let http_client = DydxHttpClient::new(
5750 config.base_url_http.clone(),
5751 config.http_timeout_secs,
5752 config.http_proxy_url.clone(),
5753 config.is_testnet,
5754 None,
5755 )
5756 .unwrap();
5757
5758 let client =
5759 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5760
5761 let request = RequestInstruments::new(
5762 None,
5763 None,
5764 Some(client_id),
5765 Some(*DYDX_VENUE),
5766 UUID4::new(),
5767 get_atomic_clock_realtime().get_time_ns(),
5768 None,
5769 );
5770
5771 assert!(client.request_instruments(&request).is_ok());
5772
5773 let timeout = tokio::time::Duration::from_secs(3);
5775 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5776 tokio::time::timeout(timeout, rx.recv()).await
5777 {
5778 assert!(resp.correlation_id == request.request_id);
5780 }
5781 }
5782
5783 #[tokio::test]
5784 async fn test_invalid_instrument_id_format() {
5785 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5787 set_data_event_sender(sender);
5788
5789 let client_id = ClientId::from("DYDX-INVALID-ID");
5790 let config = DydxDataClientConfig::default();
5791
5792 let http_client = DydxHttpClient::new(
5793 config.base_url_http.clone(),
5794 config.http_timeout_secs,
5795 config.http_proxy_url.clone(),
5796 config.is_testnet,
5797 None,
5798 )
5799 .unwrap();
5800
5801 let client =
5802 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5803
5804 let non_existent_id = InstrumentId::from("NONEXISTENT-USD.DYDX");
5806
5807 let request = RequestInstrument::new(
5808 non_existent_id,
5809 None,
5810 None,
5811 Some(client_id),
5812 UUID4::new(),
5813 get_atomic_clock_realtime().get_time_ns(),
5814 None,
5815 );
5816
5817 assert!(client.request_instrument(&request).is_ok());
5818
5819 let timeout = tokio::time::Duration::from_secs(2);
5821 let result = tokio::time::timeout(timeout, rx.recv()).await;
5822
5823 match result {
5825 Ok(Some(DataEvent::Response(DataResponse::Instrument(_)))) => {
5826 }
5828 Ok(None) | Err(_) => {
5829 }
5831 _ => {}
5832 }
5833 }
5834
5835 #[tokio::test]
5836 async fn test_invalid_date_range_end_before_start() {
5837 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5839 set_data_event_sender(sender);
5840
5841 let client_id = ClientId::from("DYDX-DATE-RANGE");
5842 let config = DydxDataClientConfig::default();
5843
5844 let http_client = DydxHttpClient::new(
5845 config.base_url_http.clone(),
5846 config.http_timeout_secs,
5847 config.http_proxy_url.clone(),
5848 config.is_testnet,
5849 None,
5850 )
5851 .unwrap();
5852
5853 let client =
5854 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5855
5856 let instrument = create_test_instrument_any();
5857 let instrument_id = instrument.id();
5858 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5859 client.instruments.insert(symbol_key, instrument);
5860
5861 let start = Utc::now();
5863 let end = start - chrono::Duration::hours(24); let request = RequestTrades::new(
5866 instrument_id,
5867 Some(start),
5868 Some(end),
5869 None,
5870 Some(client_id),
5871 UUID4::new(),
5872 get_atomic_clock_realtime().get_time_ns(),
5873 None,
5874 );
5875
5876 assert!(client.request_trades(&request).is_ok());
5877
5878 let timeout = tokio::time::Duration::from_secs(2);
5880 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5881 tokio::time::timeout(timeout, rx.recv()).await
5882 {
5883 assert!(resp.correlation_id == request.request_id);
5885 }
5886 }
5887
5888 #[tokio::test]
5889 async fn test_negative_limit_value() {
5890 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5893 set_data_event_sender(sender);
5894
5895 let client_id = ClientId::from("DYDX-NEG-LIMIT");
5896 let config = DydxDataClientConfig::default();
5897
5898 let http_client = DydxHttpClient::new(
5899 config.base_url_http.clone(),
5900 config.http_timeout_secs,
5901 config.http_proxy_url.clone(),
5902 config.is_testnet,
5903 None,
5904 )
5905 .unwrap();
5906
5907 let client =
5908 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5909
5910 let instrument = create_test_instrument_any();
5911 let instrument_id = instrument.id();
5912 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5913 client.instruments.insert(symbol_key, instrument);
5914
5915 let request = RequestTrades::new(
5917 instrument_id,
5918 None,
5919 None,
5920 std::num::NonZeroUsize::new(1), Some(client_id),
5922 UUID4::new(),
5923 get_atomic_clock_realtime().get_time_ns(),
5924 None,
5925 );
5926
5927 assert!(client.request_trades(&request).is_ok());
5929 }
5930
5931 #[tokio::test]
5932 async fn test_zero_limit_value() {
5933 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5936 set_data_event_sender(sender);
5937
5938 let client_id = ClientId::from("DYDX-ZERO-LIMIT");
5939 let config = DydxDataClientConfig::default();
5940
5941 let http_client = DydxHttpClient::new(
5942 config.base_url_http.clone(),
5943 config.http_timeout_secs,
5944 config.http_proxy_url.clone(),
5945 config.is_testnet,
5946 None,
5947 )
5948 .unwrap();
5949
5950 let client =
5951 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
5952
5953 let instrument = create_test_instrument_any();
5954 let instrument_id = instrument.id();
5955 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5956 client.instruments.insert(symbol_key, instrument);
5957
5958 let request = RequestTrades::new(
5959 instrument_id,
5960 None,
5961 None,
5962 None, Some(client_id),
5964 UUID4::new(),
5965 get_atomic_clock_realtime().get_time_ns(),
5966 None,
5967 );
5968
5969 assert!(client.request_trades(&request).is_ok());
5970
5971 let timeout = tokio::time::Duration::from_secs(2);
5973 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5974 tokio::time::timeout(timeout, rx.recv()).await
5975 {
5976 assert!(resp.correlation_id == request.request_id);
5977 }
5978 }
5979
5980 #[tokio::test]
5981 async fn test_very_large_limit_value() {
5982 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5984 set_data_event_sender(sender);
5985
5986 let client_id = ClientId::from("DYDX-LARGE-LIMIT");
5987 let config = DydxDataClientConfig::default();
5988
5989 let http_client = DydxHttpClient::new(
5990 config.base_url_http.clone(),
5991 config.http_timeout_secs,
5992 config.http_proxy_url.clone(),
5993 config.is_testnet,
5994 None,
5995 )
5996 .unwrap();
5997
5998 let client =
5999 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6000
6001 let instrument = create_test_instrument_any();
6002 let instrument_id = instrument.id();
6003 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6004 client.instruments.insert(symbol_key, instrument);
6005
6006 let request = RequestTrades::new(
6007 instrument_id,
6008 None,
6009 None,
6010 std::num::NonZeroUsize::new(1_000_000), Some(client_id),
6012 UUID4::new(),
6013 get_atomic_clock_realtime().get_time_ns(),
6014 None,
6015 );
6016
6017 assert!(client.request_trades(&request).is_ok());
6019
6020 let timeout = tokio::time::Duration::from_secs(2);
6022 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6023 tokio::time::timeout(timeout, rx.recv()).await
6024 {
6025 assert!(resp.correlation_id == request.request_id);
6026 }
6027 }
6028
6029 #[tokio::test]
6030 async fn test_none_limit_uses_default() {
6031 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6033 set_data_event_sender(sender);
6034
6035 let client_id = ClientId::from("DYDX-NONE-LIMIT");
6036 let config = DydxDataClientConfig::default();
6037
6038 let http_client = DydxHttpClient::new(
6039 config.base_url_http.clone(),
6040 config.http_timeout_secs,
6041 config.http_proxy_url.clone(),
6042 config.is_testnet,
6043 None,
6044 )
6045 .unwrap();
6046
6047 let client =
6048 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6049
6050 let instrument = create_test_instrument_any();
6051 let instrument_id = instrument.id();
6052 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6053 client.instruments.insert(symbol_key, instrument);
6054
6055 let request = RequestTrades::new(
6056 instrument_id,
6057 None,
6058 None,
6059 None, Some(client_id),
6061 UUID4::new(),
6062 get_atomic_clock_realtime().get_time_ns(),
6063 None,
6064 );
6065
6066 assert!(client.request_trades(&request).is_ok());
6068
6069 let timeout = tokio::time::Duration::from_secs(2);
6070 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6071 tokio::time::timeout(timeout, rx.recv()).await
6072 {
6073 assert!(resp.correlation_id == request.request_id);
6074 }
6075 }
6076
6077 #[tokio::test]
6078 async fn test_validation_does_not_panic() {
6079 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6081 set_data_event_sender(sender);
6082
6083 let client_id = ClientId::from("DYDX-VALIDATION");
6084 let config = DydxDataClientConfig::default();
6085
6086 let http_client = DydxHttpClient::new(
6087 config.base_url_http.clone(),
6088 config.http_timeout_secs,
6089 config.http_proxy_url.clone(),
6090 config.is_testnet,
6091 None,
6092 )
6093 .unwrap();
6094
6095 let client =
6096 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6097
6098 let instrument = create_test_instrument_any();
6099 let instrument_id = instrument.id();
6100 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6101 client.instruments.insert(symbol_key, instrument);
6102
6103 let invalid_id = InstrumentId::from("INVALID.WRONG");
6105 let req1 = RequestInstrument::new(
6106 invalid_id,
6107 None,
6108 None,
6109 Some(client_id),
6110 UUID4::new(),
6111 get_atomic_clock_realtime().get_time_ns(),
6112 None,
6113 );
6114 assert!(client.request_instrument(&req1).is_ok());
6115
6116 let start = Utc::now();
6118 let end = start - chrono::Duration::hours(1);
6119 let req2 = RequestTrades::new(
6120 instrument_id,
6121 Some(start),
6122 Some(end),
6123 None,
6124 Some(client_id),
6125 UUID4::new(),
6126 get_atomic_clock_realtime().get_time_ns(),
6127 None,
6128 );
6129 assert!(client.request_trades(&req2).is_ok());
6130
6131 let req3 = RequestTrades::new(
6133 instrument_id,
6134 None,
6135 None,
6136 std::num::NonZeroUsize::new(1),
6137 Some(client_id),
6138 UUID4::new(),
6139 get_atomic_clock_realtime().get_time_ns(),
6140 None,
6141 );
6142 assert!(client.request_trades(&req3).is_ok());
6143
6144 let req4 = RequestTrades::new(
6146 instrument_id,
6147 None,
6148 None,
6149 std::num::NonZeroUsize::new(usize::MAX),
6150 Some(client_id),
6151 UUID4::new(),
6152 get_atomic_clock_realtime().get_time_ns(),
6153 None,
6154 );
6155 assert!(client.request_trades(&req4).is_ok());
6156
6157 }
6159
6160 #[tokio::test]
6161 async fn test_instruments_response_has_correct_venue() {
6162 use axum::{Json, Router, routing::get};
6164 use serde_json::{Value, json};
6165
6166 #[derive(Clone)]
6167 struct VenueTestState;
6168
6169 async fn venue_handler() -> Json<Value> {
6170 Json(json!({
6171 "markets": {
6172 "BTC-USD": {
6173 "ticker": "BTC-USD",
6174 "status": "ACTIVE",
6175 "baseAsset": "BTC",
6176 "quoteAsset": "USD",
6177 "stepSize": "0.0001",
6178 "tickSize": "1",
6179 "indexPrice": "50000",
6180 "oraclePrice": "50000",
6181 "priceChange24H": "1000",
6182 "nextFundingRate": "0.0001",
6183 "nextFundingAt": "2024-01-01T00:00:00.000Z",
6184 "minOrderSize": "0.001",
6185 "type": "PERPETUAL",
6186 "initialMarginFraction": "0.05",
6187 "maintenanceMarginFraction": "0.03",
6188 "volume24H": "1000000",
6189 "trades24H": "10000",
6190 "openInterest": "5000000",
6191 "incrementalInitialMarginFraction": "0.01",
6192 "incrementalPositionSize": "10",
6193 "maxPositionSize": "1000",
6194 "baselinePositionSize": "100",
6195 "assetResolution": "10000000000"
6196 }
6197 }
6198 }))
6199 }
6200
6201 let app = Router::new()
6202 .route("/v4/markets", get(venue_handler))
6203 .with_state(VenueTestState);
6204
6205 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
6206 let server_addr = listener.local_addr().unwrap();
6207 let port = server_addr.port();
6208
6209 tokio::spawn(async move {
6210 axum::serve(listener, app).await.unwrap();
6211 });
6212
6213 wait_for_server(server_addr).await;
6214
6215 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6216 set_data_event_sender(sender);
6217
6218 let client_id = ClientId::from("DYDX-VENUE-TEST");
6219 let config = DydxDataClientConfig {
6220 base_url_http: Some(format!("http://127.0.0.1:{port}")),
6221 http_timeout_secs: Some(2),
6222 ..Default::default()
6223 };
6224
6225 let http_client = DydxHttpClient::new(
6226 config.base_url_http.clone(),
6227 config.http_timeout_secs,
6228 config.http_proxy_url.clone(),
6229 config.is_testnet,
6230 None,
6231 )
6232 .unwrap();
6233
6234 let client =
6235 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6236
6237 let request = RequestInstruments::new(
6238 None,
6239 None,
6240 Some(client_id),
6241 Some(*DYDX_VENUE),
6242 UUID4::new(),
6243 get_atomic_clock_realtime().get_time_ns(),
6244 None,
6245 );
6246
6247 assert!(client.request_instruments(&request).is_ok());
6248
6249 let timeout = tokio::time::Duration::from_secs(3);
6250 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6251 tokio::time::timeout(timeout, rx.recv()).await
6252 {
6253 assert_eq!(resp.venue, *DYDX_VENUE, "Response should have DYDX venue");
6255 }
6256 }
6257
6258 #[tokio::test]
6259 async fn test_instruments_response_contains_vec_instrument_any() {
6260 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6262 set_data_event_sender(sender);
6263
6264 let client_id = ClientId::from("DYDX-VEC-TEST");
6265 let config = DydxDataClientConfig::default();
6266
6267 let http_client = DydxHttpClient::new(
6268 config.base_url_http.clone(),
6269 config.http_timeout_secs,
6270 config.http_proxy_url.clone(),
6271 config.is_testnet,
6272 None,
6273 )
6274 .unwrap();
6275
6276 let client =
6277 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6278
6279 let request = RequestInstruments::new(
6280 None,
6281 None,
6282 Some(client_id),
6283 Some(*DYDX_VENUE),
6284 UUID4::new(),
6285 get_atomic_clock_realtime().get_time_ns(),
6286 None,
6287 );
6288
6289 assert!(client.request_instruments(&request).is_ok());
6290
6291 let timeout = tokio::time::Duration::from_secs(2);
6292 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6293 tokio::time::timeout(timeout, rx.recv()).await
6294 {
6295 assert!(
6297 resp.data.is_empty() || !resp.data.is_empty(),
6298 "data should be Vec<InstrumentAny>"
6299 );
6300 }
6301 }
6302
6303 #[tokio::test]
6304 async fn test_instruments_response_includes_correlation_id() {
6305 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6307 set_data_event_sender(sender);
6308
6309 let client_id = ClientId::from("DYDX-CORR-TEST");
6310 let config = DydxDataClientConfig::default();
6311
6312 let http_client = DydxHttpClient::new(
6313 config.base_url_http.clone(),
6314 config.http_timeout_secs,
6315 config.http_proxy_url.clone(),
6316 config.is_testnet,
6317 None,
6318 )
6319 .unwrap();
6320
6321 let client =
6322 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6323
6324 let request_id = UUID4::new();
6325 let request = RequestInstruments::new(
6326 None,
6327 None,
6328 Some(client_id),
6329 Some(*DYDX_VENUE),
6330 request_id,
6331 get_atomic_clock_realtime().get_time_ns(),
6332 None,
6333 );
6334
6335 assert!(client.request_instruments(&request).is_ok());
6336
6337 let timeout = tokio::time::Duration::from_secs(2);
6338 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6339 tokio::time::timeout(timeout, rx.recv()).await
6340 {
6341 assert_eq!(
6343 resp.correlation_id, request_id,
6344 "correlation_id should match request_id"
6345 );
6346 }
6347 }
6348
6349 #[tokio::test]
6350 async fn test_instruments_response_includes_client_id() {
6351 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6353 set_data_event_sender(sender);
6354
6355 let client_id = ClientId::from("DYDX-CLIENT-TEST");
6356 let config = DydxDataClientConfig::default();
6357
6358 let http_client = DydxHttpClient::new(
6359 config.base_url_http.clone(),
6360 config.http_timeout_secs,
6361 config.http_proxy_url.clone(),
6362 config.is_testnet,
6363 None,
6364 )
6365 .unwrap();
6366
6367 let client =
6368 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6369
6370 let request = RequestInstruments::new(
6371 None,
6372 None,
6373 Some(client_id),
6374 Some(*DYDX_VENUE),
6375 UUID4::new(),
6376 get_atomic_clock_realtime().get_time_ns(),
6377 None,
6378 );
6379
6380 assert!(client.request_instruments(&request).is_ok());
6381
6382 let timeout = tokio::time::Duration::from_secs(2);
6383 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6384 tokio::time::timeout(timeout, rx.recv()).await
6385 {
6386 assert_eq!(
6388 resp.client_id, client_id,
6389 "client_id should be included in response"
6390 );
6391 }
6392 }
6393
6394 #[tokio::test]
6395 async fn test_instruments_response_includes_timestamps() {
6396 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6398 set_data_event_sender(sender);
6399
6400 let client_id = ClientId::from("DYDX-TS-TEST");
6401 let config = DydxDataClientConfig::default();
6402
6403 let http_client = DydxHttpClient::new(
6404 config.base_url_http.clone(),
6405 config.http_timeout_secs,
6406 config.http_proxy_url.clone(),
6407 config.is_testnet,
6408 None,
6409 )
6410 .unwrap();
6411
6412 let client =
6413 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6414
6415 let start = Some(Utc::now() - chrono::Duration::days(1));
6416 let end = Some(Utc::now());
6417 let ts_init = get_atomic_clock_realtime().get_time_ns();
6418
6419 let request = RequestInstruments::new(
6420 start,
6421 end,
6422 Some(client_id),
6423 Some(*DYDX_VENUE),
6424 UUID4::new(),
6425 ts_init,
6426 None,
6427 );
6428
6429 assert!(client.request_instruments(&request).is_ok());
6430
6431 let timeout = tokio::time::Duration::from_secs(2);
6432 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6433 tokio::time::timeout(timeout, rx.recv()).await
6434 {
6435 assert!(
6437 resp.start.is_some() || resp.start.is_none(),
6438 "start timestamp field exists"
6439 );
6440 assert!(
6441 resp.end.is_some() || resp.end.is_none(),
6442 "end timestamp field exists"
6443 );
6444 assert!(resp.ts_init > 0, "ts_init should be greater than 0");
6445 }
6446 }
6447
6448 #[tokio::test]
6449 async fn test_instruments_response_includes_params_when_provided() {
6450 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6452 set_data_event_sender(sender);
6453
6454 let client_id = ClientId::from("DYDX-PARAMS-TEST");
6455 let config = DydxDataClientConfig::default();
6456
6457 let http_client = DydxHttpClient::new(
6458 config.base_url_http.clone(),
6459 config.http_timeout_secs,
6460 config.http_proxy_url.clone(),
6461 config.is_testnet,
6462 None,
6463 )
6464 .unwrap();
6465
6466 let client =
6467 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6468
6469 let request = RequestInstruments::new(
6472 None,
6473 None,
6474 Some(client_id),
6475 Some(*DYDX_VENUE),
6476 UUID4::new(),
6477 get_atomic_clock_realtime().get_time_ns(),
6478 None, );
6480
6481 assert!(client.request_instruments(&request).is_ok());
6482
6483 let timeout = tokio::time::Duration::from_secs(2);
6484 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6485 tokio::time::timeout(timeout, rx.recv()).await
6486 {
6487 let _params = resp.params;
6489 }
6490 }
6491
6492 #[tokio::test]
6493 async fn test_instruments_response_params_none_when_not_provided() {
6494 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6496 set_data_event_sender(sender);
6497
6498 let client_id = ClientId::from("DYDX-NO-PARAMS");
6499 let config = DydxDataClientConfig::default();
6500
6501 let http_client = DydxHttpClient::new(
6502 config.base_url_http.clone(),
6503 config.http_timeout_secs,
6504 config.http_proxy_url.clone(),
6505 config.is_testnet,
6506 None,
6507 )
6508 .unwrap();
6509
6510 let client =
6511 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6512
6513 let request = RequestInstruments::new(
6514 None,
6515 None,
6516 Some(client_id),
6517 Some(*DYDX_VENUE),
6518 UUID4::new(),
6519 get_atomic_clock_realtime().get_time_ns(),
6520 None, );
6522
6523 assert!(client.request_instruments(&request).is_ok());
6524
6525 let timeout = tokio::time::Duration::from_secs(2);
6526 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6527 tokio::time::timeout(timeout, rx.recv()).await
6528 {
6529 assert!(
6531 resp.params.is_none(),
6532 "params should be None when not provided"
6533 );
6534 }
6535 }
6536
6537 #[tokio::test]
6538 async fn test_instruments_response_complete_structure() {
6539 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6541 set_data_event_sender(sender);
6542
6543 let client_id = ClientId::from("DYDX-FULL-TEST");
6544 let config = DydxDataClientConfig::default();
6545
6546 let http_client = DydxHttpClient::new(
6547 config.base_url_http.clone(),
6548 config.http_timeout_secs,
6549 config.http_proxy_url.clone(),
6550 config.is_testnet,
6551 None,
6552 )
6553 .unwrap();
6554
6555 let client =
6556 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6557
6558 let request_id = UUID4::new();
6559 let start = Some(Utc::now() - chrono::Duration::hours(1));
6560 let end = Some(Utc::now());
6561 let ts_init = get_atomic_clock_realtime().get_time_ns();
6562
6563 let request = RequestInstruments::new(
6564 start,
6565 end,
6566 Some(client_id),
6567 Some(*DYDX_VENUE),
6568 request_id,
6569 ts_init,
6570 None,
6571 );
6572
6573 assert!(client.request_instruments(&request).is_ok());
6574
6575 let timeout = tokio::time::Duration::from_secs(2);
6576 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6577 tokio::time::timeout(timeout, rx.recv()).await
6578 {
6579 assert_eq!(resp.venue, *DYDX_VENUE, "venue should be DYDX");
6581 assert_eq!(
6582 resp.correlation_id, request_id,
6583 "correlation_id should match"
6584 );
6585 assert_eq!(resp.client_id, client_id, "client_id should match");
6586 assert!(resp.ts_init > 0, "ts_init should be set");
6587
6588 let _data: Vec<InstrumentAny> = resp.data;
6590
6591 let _start = resp.start;
6593 let _end = resp.end;
6594 let _params = resp.params;
6595 }
6596 }
6597
6598 #[tokio::test]
6599 async fn test_instrument_response_properly_boxed() {
6600 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6602 set_data_event_sender(sender);
6603
6604 let client_id = ClientId::from("DYDX-BOXED-TEST");
6605 let config = DydxDataClientConfig::default();
6606
6607 let http_client = DydxHttpClient::new(
6608 config.base_url_http.clone(),
6609 config.http_timeout_secs,
6610 config.http_proxy_url.clone(),
6611 config.is_testnet,
6612 None,
6613 )
6614 .unwrap();
6615
6616 let client =
6617 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6618
6619 let instrument = create_test_instrument_any();
6620 let instrument_id = instrument.id();
6621 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6622 client.instruments.insert(symbol_key, instrument);
6623
6624 let request = RequestInstrument::new(
6625 instrument_id,
6626 None,
6627 None,
6628 Some(client_id),
6629 UUID4::new(),
6630 get_atomic_clock_realtime().get_time_ns(),
6631 None,
6632 );
6633
6634 assert!(client.request_instrument(&request).is_ok());
6635
6636 let timeout = tokio::time::Duration::from_secs(2);
6637 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(boxed_resp)))) =
6638 tokio::time::timeout(timeout, rx.recv()).await
6639 {
6640 let _response: Box<InstrumentResponse> = boxed_resp;
6642 }
6644 }
6645
6646 #[tokio::test]
6647 async fn test_instrument_response_contains_single_instrument() {
6648 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6650 set_data_event_sender(sender);
6651
6652 let client_id = ClientId::from("DYDX-SINGLE-TEST");
6653 let config = DydxDataClientConfig::default();
6654
6655 let http_client = DydxHttpClient::new(
6656 config.base_url_http.clone(),
6657 config.http_timeout_secs,
6658 config.http_proxy_url.clone(),
6659 config.is_testnet,
6660 None,
6661 )
6662 .unwrap();
6663
6664 let client =
6665 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6666
6667 let instrument = create_test_instrument_any();
6668 let instrument_id = instrument.id();
6669 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6670 client.instruments.insert(symbol_key, instrument.clone());
6671
6672 let request = RequestInstrument::new(
6673 instrument_id,
6674 None,
6675 None,
6676 Some(client_id),
6677 UUID4::new(),
6678 get_atomic_clock_realtime().get_time_ns(),
6679 None,
6680 );
6681
6682 assert!(client.request_instrument(&request).is_ok());
6683
6684 let timeout = tokio::time::Duration::from_secs(2);
6685 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6686 tokio::time::timeout(timeout, rx.recv()).await
6687 {
6688 let _instrument: InstrumentAny = resp.data;
6690 }
6692 }
6693
6694 #[tokio::test]
6695 async fn test_instrument_response_has_correct_instrument_id() {
6696 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6698 set_data_event_sender(sender);
6699
6700 let client_id = ClientId::from("DYDX-ID-TEST");
6701 let config = DydxDataClientConfig::default();
6702
6703 let http_client = DydxHttpClient::new(
6704 config.base_url_http.clone(),
6705 config.http_timeout_secs,
6706 config.http_proxy_url.clone(),
6707 config.is_testnet,
6708 None,
6709 )
6710 .unwrap();
6711
6712 let client =
6713 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6714
6715 let instrument = create_test_instrument_any();
6716 let instrument_id = instrument.id();
6717 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6718 client.instruments.insert(symbol_key, instrument);
6719
6720 let request = RequestInstrument::new(
6721 instrument_id,
6722 None,
6723 None,
6724 Some(client_id),
6725 UUID4::new(),
6726 get_atomic_clock_realtime().get_time_ns(),
6727 None,
6728 );
6729
6730 assert!(client.request_instrument(&request).is_ok());
6731
6732 let timeout = tokio::time::Duration::from_secs(2);
6733 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6734 tokio::time::timeout(timeout, rx.recv()).await
6735 {
6736 assert_eq!(
6738 resp.instrument_id, instrument_id,
6739 "instrument_id should match requested ID"
6740 );
6741 }
6742 }
6743
6744 #[tokio::test]
6745 async fn test_instrument_response_includes_metadata() {
6746 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6748 set_data_event_sender(sender);
6749
6750 let client_id = ClientId::from("DYDX-META-TEST");
6751 let config = DydxDataClientConfig::default();
6752
6753 let http_client = DydxHttpClient::new(
6754 config.base_url_http.clone(),
6755 config.http_timeout_secs,
6756 config.http_proxy_url.clone(),
6757 config.is_testnet,
6758 None,
6759 )
6760 .unwrap();
6761
6762 let client =
6763 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6764
6765 let instrument = create_test_instrument_any();
6766 let instrument_id = instrument.id();
6767 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6768 client.instruments.insert(symbol_key, instrument);
6769
6770 let request_id = UUID4::new();
6771 let start = Some(Utc::now() - chrono::Duration::hours(1));
6772 let end = Some(Utc::now());
6773 let ts_init = get_atomic_clock_realtime().get_time_ns();
6774
6775 let request = RequestInstrument::new(
6776 instrument_id,
6777 start,
6778 end,
6779 Some(client_id),
6780 request_id,
6781 ts_init,
6782 None,
6783 );
6784
6785 assert!(client.request_instrument(&request).is_ok());
6786
6787 let timeout = tokio::time::Duration::from_secs(2);
6788 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6789 tokio::time::timeout(timeout, rx.recv()).await
6790 {
6791 assert_eq!(
6793 resp.correlation_id, request_id,
6794 "correlation_id should match"
6795 );
6796 assert_eq!(resp.client_id, client_id, "client_id should match");
6797 assert!(resp.ts_init > 0, "ts_init should be set");
6798
6799 let _start = resp.start;
6801 let _end = resp.end;
6802
6803 let _params = resp.params;
6805 }
6806 }
6807
6808 #[tokio::test]
6809 async fn test_instrument_response_matches_requested_instrument() {
6810 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6812 set_data_event_sender(sender);
6813
6814 let client_id = ClientId::from("DYDX-MATCH-TEST");
6815 let config = DydxDataClientConfig::default();
6816
6817 let http_client = DydxHttpClient::new(
6818 config.base_url_http.clone(),
6819 config.http_timeout_secs,
6820 config.http_proxy_url.clone(),
6821 config.is_testnet,
6822 None,
6823 )
6824 .unwrap();
6825
6826 let client =
6827 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6828
6829 let instrument = create_test_instrument_any();
6830 let instrument_id = instrument.id();
6831 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6832 client.instruments.insert(symbol_key, instrument.clone());
6833
6834 let request = RequestInstrument::new(
6835 instrument_id,
6836 None,
6837 None,
6838 Some(client_id),
6839 UUID4::new(),
6840 get_atomic_clock_realtime().get_time_ns(),
6841 None,
6842 );
6843
6844 assert!(client.request_instrument(&request).is_ok());
6845
6846 let timeout = tokio::time::Duration::from_secs(2);
6847 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6848 tokio::time::timeout(timeout, rx.recv()).await
6849 {
6850 assert_eq!(
6852 resp.data.id(),
6853 instrument_id,
6854 "Returned instrument should match requested"
6855 );
6856 assert_eq!(
6857 resp.instrument_id, instrument_id,
6858 "instrument_id field should match"
6859 );
6860
6861 assert_eq!(
6863 resp.data.id(),
6864 resp.instrument_id,
6865 "data.id() should match instrument_id field"
6866 );
6867 }
6868 }
6869
6870 #[tokio::test]
6871 async fn test_instrument_response_complete_structure() {
6872 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6874 set_data_event_sender(sender);
6875
6876 let client_id = ClientId::from("DYDX-FULL-INST-TEST");
6877 let config = DydxDataClientConfig::default();
6878
6879 let http_client = DydxHttpClient::new(
6880 config.base_url_http.clone(),
6881 config.http_timeout_secs,
6882 config.http_proxy_url.clone(),
6883 config.is_testnet,
6884 None,
6885 )
6886 .unwrap();
6887
6888 let client =
6889 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6890
6891 let instrument = create_test_instrument_any();
6892 let instrument_id = instrument.id();
6893 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6894 client.instruments.insert(symbol_key, instrument.clone());
6895
6896 let request_id = UUID4::new();
6897 let ts_init = get_atomic_clock_realtime().get_time_ns();
6898
6899 let request = RequestInstrument::new(
6900 instrument_id,
6901 None,
6902 None,
6903 Some(client_id),
6904 request_id,
6905 ts_init,
6906 None,
6907 );
6908
6909 assert!(client.request_instrument(&request).is_ok());
6910
6911 let timeout = tokio::time::Duration::from_secs(2);
6912 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6913 tokio::time::timeout(timeout, rx.recv()).await
6914 {
6915 let _boxed: Box<InstrumentResponse> = resp.clone();
6918
6919 assert_eq!(resp.correlation_id, request_id);
6921 assert_eq!(resp.client_id, client_id);
6922 assert_eq!(resp.instrument_id, instrument_id);
6923 assert!(resp.ts_init > 0);
6924
6925 let returned_instrument: InstrumentAny = resp.data;
6927 assert_eq!(returned_instrument.id(), instrument_id);
6928
6929 let _start = resp.start;
6931 let _end = resp.end;
6932 let _params = resp.params;
6933 }
6934 }
6935
6936 #[tokio::test]
6937 async fn test_trades_response_contains_vec_trade_tick() {
6938 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6940 set_data_event_sender(sender);
6941
6942 let created_at = Utc::now();
6943 let http_trades = vec![
6944 crate::http::models::Trade {
6945 id: "trade-1".to_string(),
6946 side: OrderSide::Buy,
6947 size: dec!(1.0),
6948 price: dec!(100.0),
6949 created_at,
6950 created_at_height: 100,
6951 trade_type: crate::common::enums::DydxTradeType::Limit,
6952 },
6953 crate::http::models::Trade {
6954 id: "trade-2".to_string(),
6955 side: OrderSide::Sell,
6956 size: dec!(2.0),
6957 price: dec!(101.0),
6958 created_at: created_at + chrono::Duration::seconds(1),
6959 created_at_height: 101,
6960 trade_type: crate::common::enums::DydxTradeType::Limit,
6961 },
6962 ];
6963
6964 let trades_response = crate::http::models::TradesResponse {
6965 trades: http_trades,
6966 };
6967
6968 let state = TradesTestState {
6969 response: Arc::new(trades_response),
6970 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
6971 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
6972 };
6973
6974 let addr = start_trades_test_server(state).await;
6975 let base_url = format!("http://{addr}");
6976
6977 let client_id = ClientId::from("DYDX-VEC-TEST");
6978 let config = DydxDataClientConfig {
6979 base_url_http: Some(base_url),
6980 is_testnet: true,
6981 ..Default::default()
6982 };
6983
6984 let http_client = DydxHttpClient::new(
6985 config.base_url_http.clone(),
6986 config.http_timeout_secs,
6987 config.http_proxy_url.clone(),
6988 config.is_testnet,
6989 None,
6990 )
6991 .unwrap();
6992
6993 let client =
6994 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
6995
6996 let instrument = create_test_instrument_any();
6997 let instrument_id = instrument.id();
6998 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6999 client.instruments.insert(symbol_key, instrument);
7000
7001 let request = RequestTrades::new(
7002 instrument_id,
7003 None,
7004 None,
7005 None,
7006 Some(client_id),
7007 UUID4::new(),
7008 get_atomic_clock_realtime().get_time_ns(),
7009 None,
7010 );
7011
7012 assert!(client.request_trades(&request).is_ok());
7013
7014 let timeout = tokio::time::Duration::from_millis(500);
7015 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7016 tokio::time::timeout(timeout, rx.recv()).await
7017 {
7018 let trade_ticks: Vec<TradeTick> = resp.data;
7020 assert_eq!(trade_ticks.len(), 2, "Should contain 2 TradeTick elements");
7021
7022 for tick in &trade_ticks {
7024 assert_eq!(tick.instrument_id, instrument_id);
7025 }
7026 }
7027 }
7028
7029 #[tokio::test]
7030 async fn test_trades_response_has_correct_instrument_id() {
7031 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7033 set_data_event_sender(sender);
7034
7035 let created_at = Utc::now();
7036 let http_trade = crate::http::models::Trade {
7037 id: "instrument-id-test".to_string(),
7038 side: OrderSide::Buy,
7039 size: dec!(1.0),
7040 price: dec!(100.0),
7041 created_at,
7042 created_at_height: 100,
7043 trade_type: crate::common::enums::DydxTradeType::Limit,
7044 };
7045
7046 let trades_response = crate::http::models::TradesResponse {
7047 trades: vec![http_trade],
7048 };
7049
7050 let state = TradesTestState {
7051 response: Arc::new(trades_response),
7052 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7053 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7054 };
7055
7056 let addr = start_trades_test_server(state).await;
7057 let base_url = format!("http://{addr}");
7058
7059 let client_id = ClientId::from("DYDX-INSTID-TEST");
7060 let config = DydxDataClientConfig {
7061 base_url_http: Some(base_url),
7062 is_testnet: true,
7063 ..Default::default()
7064 };
7065
7066 let http_client = DydxHttpClient::new(
7067 config.base_url_http.clone(),
7068 config.http_timeout_secs,
7069 config.http_proxy_url.clone(),
7070 config.is_testnet,
7071 None,
7072 )
7073 .unwrap();
7074
7075 let client =
7076 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
7077
7078 let instrument = create_test_instrument_any();
7079 let instrument_id = instrument.id();
7080 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7081 client.instruments.insert(symbol_key, instrument);
7082
7083 let request = RequestTrades::new(
7084 instrument_id,
7085 None,
7086 None,
7087 None,
7088 Some(client_id),
7089 UUID4::new(),
7090 get_atomic_clock_realtime().get_time_ns(),
7091 None,
7092 );
7093
7094 assert!(client.request_trades(&request).is_ok());
7095
7096 let timeout = tokio::time::Duration::from_millis(500);
7097 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7098 tokio::time::timeout(timeout, rx.recv()).await
7099 {
7100 assert_eq!(
7102 resp.instrument_id, instrument_id,
7103 "TradesResponse.instrument_id should match request"
7104 );
7105
7106 for tick in &resp.data {
7108 assert_eq!(
7109 tick.instrument_id, instrument_id,
7110 "Each TradeTick should have correct instrument_id"
7111 );
7112 }
7113 }
7114 }
7115
7116 #[tokio::test]
7117 async fn test_trades_response_properly_ordered_by_timestamp() {
7118 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7120 set_data_event_sender(sender);
7121
7122 let base_time = Utc::now();
7123 let http_trades = vec![
7124 crate::http::models::Trade {
7125 id: "trade-oldest".to_string(),
7126 side: OrderSide::Buy,
7127 size: dec!(1.0),
7128 price: dec!(100.0),
7129 created_at: base_time,
7130 created_at_height: 100,
7131 trade_type: crate::common::enums::DydxTradeType::Limit,
7132 },
7133 crate::http::models::Trade {
7134 id: "trade-middle".to_string(),
7135 side: OrderSide::Sell,
7136 size: dec!(2.0),
7137 price: dec!(101.0),
7138 created_at: base_time + chrono::Duration::seconds(1),
7139 created_at_height: 101,
7140 trade_type: crate::common::enums::DydxTradeType::Limit,
7141 },
7142 crate::http::models::Trade {
7143 id: "trade-newest".to_string(),
7144 side: OrderSide::Buy,
7145 size: dec!(3.0),
7146 price: dec!(102.0),
7147 created_at: base_time + chrono::Duration::seconds(2),
7148 created_at_height: 102,
7149 trade_type: crate::common::enums::DydxTradeType::Limit,
7150 },
7151 ];
7152
7153 let trades_response = crate::http::models::TradesResponse {
7154 trades: http_trades,
7155 };
7156
7157 let state = TradesTestState {
7158 response: Arc::new(trades_response),
7159 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7160 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7161 };
7162
7163 let addr = start_trades_test_server(state).await;
7164 let base_url = format!("http://{addr}");
7165
7166 let client_id = ClientId::from("DYDX-ORDER-TEST");
7167 let config = DydxDataClientConfig {
7168 base_url_http: Some(base_url),
7169 is_testnet: true,
7170 ..Default::default()
7171 };
7172
7173 let http_client = DydxHttpClient::new(
7174 config.base_url_http.clone(),
7175 config.http_timeout_secs,
7176 config.http_proxy_url.clone(),
7177 config.is_testnet,
7178 None,
7179 )
7180 .unwrap();
7181
7182 let client =
7183 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
7184
7185 let instrument = create_test_instrument_any();
7186 let instrument_id = instrument.id();
7187 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7188 client.instruments.insert(symbol_key, instrument);
7189
7190 let request = RequestTrades::new(
7191 instrument_id,
7192 None,
7193 None,
7194 None,
7195 Some(client_id),
7196 UUID4::new(),
7197 get_atomic_clock_realtime().get_time_ns(),
7198 None,
7199 );
7200
7201 assert!(client.request_trades(&request).is_ok());
7202
7203 let timeout = tokio::time::Duration::from_millis(500);
7204 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7205 tokio::time::timeout(timeout, rx.recv()).await
7206 {
7207 let trade_ticks = resp.data;
7209 assert_eq!(trade_ticks.len(), 3, "Should have 3 trades");
7210
7211 for i in 1..trade_ticks.len() {
7213 assert!(
7214 trade_ticks[i].ts_event >= trade_ticks[i - 1].ts_event,
7215 "Trades should be ordered by timestamp (ts_event) in ascending order"
7216 );
7217 }
7218
7219 assert!(
7221 trade_ticks[0].ts_event < trade_ticks[1].ts_event,
7222 "First trade should be before second"
7223 );
7224 assert!(
7225 trade_ticks[1].ts_event < trade_ticks[2].ts_event,
7226 "Second trade should be before third"
7227 );
7228 }
7229 }
7230
7231 #[tokio::test]
7232 async fn test_trades_response_all_trade_tick_fields_populated() {
7233 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7235 set_data_event_sender(sender);
7236
7237 let created_at = Utc::now();
7238 let http_trade = crate::http::models::Trade {
7239 id: "field-test".to_string(),
7240 side: OrderSide::Buy,
7241 size: dec!(5.5),
7242 price: dec!(12345.67),
7243 created_at,
7244 created_at_height: 999,
7245 trade_type: crate::common::enums::DydxTradeType::Limit,
7246 };
7247
7248 let trades_response = crate::http::models::TradesResponse {
7249 trades: vec![http_trade],
7250 };
7251
7252 let state = TradesTestState {
7253 response: Arc::new(trades_response),
7254 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7255 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7256 };
7257
7258 let addr = start_trades_test_server(state).await;
7259 let base_url = format!("http://{addr}");
7260
7261 let client_id = ClientId::from("DYDX-FIELDS-TEST");
7262 let config = DydxDataClientConfig {
7263 base_url_http: Some(base_url),
7264 is_testnet: true,
7265 ..Default::default()
7266 };
7267
7268 let http_client = DydxHttpClient::new(
7269 config.base_url_http.clone(),
7270 config.http_timeout_secs,
7271 config.http_proxy_url.clone(),
7272 config.is_testnet,
7273 None,
7274 )
7275 .unwrap();
7276
7277 let client =
7278 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
7279
7280 let instrument = create_test_instrument_any();
7281 let instrument_id = instrument.id();
7282 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7283 client.instruments.insert(symbol_key, instrument);
7284
7285 let request = RequestTrades::new(
7286 instrument_id,
7287 None,
7288 None,
7289 None,
7290 Some(client_id),
7291 UUID4::new(),
7292 get_atomic_clock_realtime().get_time_ns(),
7293 None,
7294 );
7295
7296 assert!(client.request_trades(&request).is_ok());
7297
7298 let timeout = tokio::time::Duration::from_millis(500);
7299 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7300 tokio::time::timeout(timeout, rx.recv()).await
7301 {
7302 assert_eq!(resp.data.len(), 1, "Should have 1 trade");
7303 let tick = &resp.data[0];
7304
7305 assert_eq!(
7307 tick.instrument_id, instrument_id,
7308 "instrument_id should be set"
7309 );
7310 assert!(tick.price.as_f64() > 0.0, "price should be positive");
7311 assert!(tick.size.as_f64() > 0.0, "size should be positive");
7312
7313 match tick.aggressor_side {
7315 AggressorSide::Buyer | AggressorSide::Seller => {
7316 }
7318 AggressorSide::NoAggressor => {
7319 panic!("aggressor_side should be Buyer or Seller, not NoAggressor")
7320 }
7321 }
7322
7323 assert!(
7325 !tick.trade_id.to_string().is_empty(),
7326 "trade_id should be set"
7327 );
7328
7329 assert!(tick.ts_event > 0, "ts_event should be set");
7331 assert!(tick.ts_init > 0, "ts_init should be set");
7332 assert!(
7333 tick.ts_init >= tick.ts_event,
7334 "ts_init should be >= ts_event"
7335 );
7336 }
7337 }
7338
7339 #[tokio::test]
7340 async fn test_trades_response_includes_metadata() {
7341 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7343 set_data_event_sender(sender);
7344
7345 let created_at = Utc::now();
7346 let http_trade = crate::http::models::Trade {
7347 id: "metadata-test".to_string(),
7348 side: OrderSide::Buy,
7349 size: dec!(1.0),
7350 price: dec!(100.0),
7351 created_at,
7352 created_at_height: 100,
7353 trade_type: crate::common::enums::DydxTradeType::Limit,
7354 };
7355
7356 let trades_response = crate::http::models::TradesResponse {
7357 trades: vec![http_trade],
7358 };
7359
7360 let state = TradesTestState {
7361 response: Arc::new(trades_response),
7362 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7363 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7364 };
7365
7366 let addr = start_trades_test_server(state).await;
7367 let base_url = format!("http://{addr}");
7368
7369 let client_id = ClientId::from("DYDX-META-TEST");
7370 let config = DydxDataClientConfig {
7371 base_url_http: Some(base_url),
7372 is_testnet: true,
7373 ..Default::default()
7374 };
7375
7376 let http_client = DydxHttpClient::new(
7377 config.base_url_http.clone(),
7378 config.http_timeout_secs,
7379 config.http_proxy_url.clone(),
7380 config.is_testnet,
7381 None,
7382 )
7383 .unwrap();
7384
7385 let client =
7386 DydxDataClient::new(client_id, config, http_client, create_test_ws_client()).unwrap();
7387
7388 let instrument = create_test_instrument_any();
7389 let instrument_id = instrument.id();
7390 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7391 client.instruments.insert(symbol_key, instrument);
7392
7393 let request_id = UUID4::new();
7394 let request = RequestTrades::new(
7395 instrument_id,
7396 None,
7397 None,
7398 None,
7399 Some(client_id),
7400 request_id,
7401 get_atomic_clock_realtime().get_time_ns(),
7402 None,
7403 );
7404
7405 assert!(client.request_trades(&request).is_ok());
7406
7407 let timeout = tokio::time::Duration::from_millis(500);
7408 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7409 tokio::time::timeout(timeout, rx.recv()).await
7410 {
7411 assert_eq!(
7413 resp.correlation_id, request_id,
7414 "correlation_id should match request"
7415 );
7416 assert_eq!(resp.client_id, client_id, "client_id should be set");
7417 assert_eq!(
7418 resp.instrument_id, instrument_id,
7419 "instrument_id should be set"
7420 );
7421 assert!(resp.ts_init > 0, "ts_init should be set");
7422
7423 let _start = resp.start;
7424 let _end = resp.end;
7425 let _params = resp.params;
7426 }
7427 }
7428
7429 #[tokio::test]
7430 async fn test_orderbook_cache_growth_with_many_instruments() {
7431 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7432 set_data_event_sender(sender);
7433
7434 let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7435 let config = DydxDataClientConfig {
7436 base_url_http: Some(base_url),
7437 is_testnet: true,
7438 ..Default::default()
7439 };
7440
7441 let http_client = DydxHttpClient::new(
7442 config.base_url_http.clone(),
7443 config.http_timeout_secs,
7444 config.http_proxy_url.clone(),
7445 config.is_testnet,
7446 None,
7447 )
7448 .unwrap();
7449
7450 let client = DydxDataClient::new(
7451 ClientId::from("dydx_test"),
7452 config,
7453 http_client,
7454 create_test_ws_client(),
7455 )
7456 .unwrap();
7457
7458 let initial_capacity = client.order_books.capacity();
7459
7460 for i in 0..100 {
7461 let symbol = format!("INSTRUMENT-{i}");
7462 let instrument_id = InstrumentId::from(format!("{symbol}-PERP.DYDX").as_str());
7463 client.order_books.insert(
7464 instrument_id,
7465 OrderBook::new(instrument_id, BookType::L2_MBP),
7466 );
7467 }
7468
7469 assert_eq!(client.order_books.len(), 100);
7470 assert!(client.order_books.capacity() >= initial_capacity);
7471
7472 client.order_books.clear();
7473 assert_eq!(client.order_books.len(), 0);
7474 }
7475
7476 #[rstest]
7477 fn test_instrument_id_validation_rejects_invalid_formats() {
7478 let test_cases = vec![
7480 ("", "Empty string missing separator"),
7481 ("INVALID", "No venue separator"),
7482 ("NO-VENUE", "No venue separator"),
7483 (".DYDX", "Empty symbol"),
7484 ("SYMBOL.", "Empty venue"),
7485 ];
7486
7487 for (invalid_id, description) in test_cases {
7488 let result = std::panic::catch_unwind(|| InstrumentId::from(invalid_id));
7489 assert!(
7490 result.is_err(),
7491 "Expected {invalid_id} to panic: {description}"
7492 );
7493 }
7494 }
7495
7496 #[rstest]
7497 fn test_instrument_id_validation_accepts_valid_formats() {
7498 let valid_ids = vec![
7499 "BTC-USD-PERP.DYDX",
7500 "ETH-USD-PERP.DYDX",
7501 "SOL-USD.DYDX",
7502 "AVAX-USD-PERP.DYDX",
7503 ];
7504
7505 for valid_id in valid_ids {
7506 let instrument_id = InstrumentId::from(valid_id);
7507 assert!(
7508 !instrument_id.symbol.as_str().is_empty()
7509 && !instrument_id.venue.as_str().is_empty(),
7510 "Expected {valid_id} to have non-empty symbol and venue"
7511 );
7512 }
7513 }
7514
7515 #[tokio::test]
7516 async fn test_request_bars_with_inverted_date_range() {
7517 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7518 set_data_event_sender(sender);
7519
7520 let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7521 let config = DydxDataClientConfig {
7522 base_url_http: Some(base_url),
7523 is_testnet: true,
7524 ..Default::default()
7525 };
7526
7527 let http_client = DydxHttpClient::new(
7528 config.base_url_http.clone(),
7529 config.http_timeout_secs,
7530 config.http_proxy_url.clone(),
7531 config.is_testnet,
7532 None,
7533 )
7534 .unwrap();
7535
7536 let client = DydxDataClient::new(
7537 ClientId::from("dydx_test"),
7538 config,
7539 http_client,
7540 create_test_ws_client(),
7541 )
7542 .unwrap();
7543
7544 let instrument = create_test_instrument_any();
7545 let instrument_id = instrument.id();
7546 client
7547 .instruments
7548 .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7549
7550 let spec = BarSpecification {
7551 step: std::num::NonZeroUsize::new(1).unwrap(),
7552 aggregation: BarAggregation::Minute,
7553 price_type: PriceType::Last,
7554 };
7555 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
7556
7557 let now = Utc::now();
7558 let start = Some(now);
7559 let end = Some(now - chrono::Duration::hours(1));
7560
7561 let request = RequestBars::new(
7562 bar_type,
7563 start,
7564 end,
7565 None,
7566 Some(ClientId::from("dydx_test")),
7567 UUID4::new(),
7568 get_atomic_clock_realtime().get_time_ns(),
7569 None,
7570 );
7571
7572 let result = client.request_bars(&request);
7573 assert!(result.is_ok());
7574 }
7575
7576 #[tokio::test]
7577 async fn test_request_bars_with_zero_limit() {
7578 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7579 set_data_event_sender(sender);
7580
7581 let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7582 let config = DydxDataClientConfig {
7583 base_url_http: Some(base_url),
7584 is_testnet: true,
7585 ..Default::default()
7586 };
7587
7588 let http_client = DydxHttpClient::new(
7589 config.base_url_http.clone(),
7590 config.http_timeout_secs,
7591 config.http_proxy_url.clone(),
7592 config.is_testnet,
7593 None,
7594 )
7595 .unwrap();
7596
7597 let client = DydxDataClient::new(
7598 ClientId::from("dydx_test"),
7599 config,
7600 http_client,
7601 create_test_ws_client(),
7602 )
7603 .unwrap();
7604
7605 let instrument = create_test_instrument_any();
7606 let instrument_id = instrument.id();
7607 client
7608 .instruments
7609 .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7610
7611 let spec = BarSpecification {
7612 step: std::num::NonZeroUsize::new(1).unwrap(),
7613 aggregation: BarAggregation::Minute,
7614 price_type: PriceType::Last,
7615 };
7616 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
7617
7618 let request = RequestBars::new(
7619 bar_type,
7620 None,
7621 None,
7622 Some(std::num::NonZeroUsize::new(1).unwrap()),
7623 Some(ClientId::from("dydx_test")),
7624 UUID4::new(),
7625 get_atomic_clock_realtime().get_time_ns(),
7626 None,
7627 );
7628
7629 let result = client.request_bars(&request);
7630 assert!(result.is_ok());
7631 }
7632
7633 #[tokio::test]
7634 async fn test_request_trades_with_excessive_limit() {
7635 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7636 set_data_event_sender(sender);
7637
7638 let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7639 let config = DydxDataClientConfig {
7640 base_url_http: Some(base_url),
7641 is_testnet: true,
7642 ..Default::default()
7643 };
7644
7645 let http_client = DydxHttpClient::new(
7646 config.base_url_http.clone(),
7647 config.http_timeout_secs,
7648 config.http_proxy_url.clone(),
7649 config.is_testnet,
7650 None,
7651 )
7652 .unwrap();
7653
7654 let client = DydxDataClient::new(
7655 ClientId::from("dydx_test"),
7656 config,
7657 http_client,
7658 create_test_ws_client(),
7659 )
7660 .unwrap();
7661
7662 let instrument = create_test_instrument_any();
7663 let instrument_id = instrument.id();
7664 client
7665 .instruments
7666 .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7667
7668 let request = RequestTrades::new(
7669 instrument_id,
7670 None,
7671 None,
7672 Some(std::num::NonZeroUsize::new(100_000).unwrap()),
7673 Some(ClientId::from("dydx_test")),
7674 UUID4::new(),
7675 get_atomic_clock_realtime().get_time_ns(),
7676 None,
7677 );
7678
7679 let result = client.request_trades(&request);
7680 assert!(result.is_ok());
7681 }
7682
7683 #[rstest]
7684 fn test_candle_topic_format() {
7685 let instrument_id = InstrumentId::new(Symbol::from("BTC-USD-PERP"), Venue::from("DYDX"));
7686 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
7687 let resolution = "1MIN";
7688 let topic = format!("{ticker}/{resolution}");
7689
7690 assert_eq!(topic, "BTC-USD/1MIN");
7691 assert!(!topic.contains("-PERP"));
7692 assert!(!topic.contains(".DYDX"));
7693 }
7694}