1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use anyhow::Context;
24use dashmap::DashMap;
25use nautilus_common::{
26 messages::{
27 DataEvent, DataResponse,
28 data::{
29 BarsResponse, InstrumentResponse, InstrumentsResponse, RequestBars, RequestInstrument,
30 RequestInstruments, RequestTrades, SubscribeBars, SubscribeBookDeltas,
31 SubscribeBookSnapshots, SubscribeInstrument, SubscribeInstruments, SubscribeQuotes,
32 SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
33 UnsubscribeBookSnapshots, UnsubscribeInstrument, UnsubscribeInstruments,
34 UnsubscribeQuotes, UnsubscribeTrades,
35 },
36 },
37 runner::get_data_event_sender,
38};
39use nautilus_core::{
40 UnixNanos,
41 time::{AtomicTime, get_atomic_clock_realtime},
42};
43use nautilus_data::client::DataClient;
44use nautilus_model::{
45 data::{
46 Bar, BarType, BookOrder, Data as NautilusData, OrderBookDelta, OrderBookDeltas_API,
47 QuoteTick,
48 },
49 enums::{BookAction, OrderSide, RecordFlag},
50 identifiers::{ClientId, InstrumentId, Venue},
51 instruments::{Instrument, InstrumentAny},
52 orderbook::OrderBook,
53 types::{Price, Quantity, price::PriceRaw},
54};
55use tokio::{task::JoinHandle, time::Duration};
56use tokio_util::sync::CancellationToken;
57use ustr::Ustr;
58
59use crate::{
60 common::consts::DYDX_VENUE, config::DydxDataClientConfig, http::client::DydxHttpClient,
61 websocket::client::DydxWebSocketClient,
62};
63
64struct WsMessageContext<'a> {
66 data_sender: &'a tokio::sync::mpsc::UnboundedSender<DataEvent>,
67 instruments: &'a Arc<DashMap<Ustr, InstrumentAny>>,
68 order_books: &'a Arc<DashMap<InstrumentId, OrderBook>>,
69 last_quotes: &'a Arc<DashMap<InstrumentId, QuoteTick>>,
70 ws_client: &'a Option<DydxWebSocketClient>,
71 active_orderbook_subs: &'a Arc<DashMap<InstrumentId, ()>>,
72 active_trade_subs: &'a Arc<DashMap<InstrumentId, ()>>,
73 active_bar_subs: &'a Arc<DashMap<(InstrumentId, String), BarType>>,
74}
75
76#[derive(Debug)]
84pub struct DydxDataClient {
85 client_id: ClientId,
87 config: DydxDataClientConfig,
89 http_client: DydxHttpClient,
91 ws_client: Option<DydxWebSocketClient>,
93 is_connected: AtomicBool,
95 cancellation_token: CancellationToken,
97 tasks: Vec<JoinHandle<()>>,
99 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
101 instruments: Arc<DashMap<Ustr, InstrumentAny>>,
103 clock: &'static AtomicTime,
105 #[allow(dead_code)]
107 order_books: Arc<DashMap<InstrumentId, OrderBook>>,
108 #[allow(dead_code)]
110 last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
111 #[allow(dead_code)]
113 incomplete_bars: Arc<DashMap<BarType, Bar>>,
114 #[allow(dead_code)]
116 bar_type_mappings: Arc<DashMap<String, BarType>>,
117 active_orderbook_subs: Arc<DashMap<InstrumentId, ()>>,
119 active_trade_subs: Arc<DashMap<InstrumentId, ()>>,
121 active_bar_subs: Arc<DashMap<(InstrumentId, String), BarType>>,
123}
124
125impl DydxDataClient {
126 #[allow(dead_code)]
132 fn map_bar_spec_to_resolution(
133 spec: &nautilus_model::data::BarSpecification,
134 ) -> anyhow::Result<&'static str> {
135 use nautilus_model::enums::BarAggregation;
136
137 match spec.step.get() {
138 1 => match spec.aggregation {
139 BarAggregation::Minute => Ok("1MIN"),
140 BarAggregation::Hour => Ok("1HOUR"),
141 BarAggregation::Day => Ok("1DAY"),
142 _ => anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation),
143 },
144 5 if spec.aggregation == BarAggregation::Minute => Ok("5MINS"),
145 15 if spec.aggregation == BarAggregation::Minute => Ok("15MINS"),
146 30 if spec.aggregation == BarAggregation::Minute => Ok("30MINS"),
147 4 if spec.aggregation == BarAggregation::Hour => Ok("4HOURS"),
148 step => anyhow::bail!(
149 "Unsupported bar step: {step} with aggregation {:?}",
150 spec.aggregation
151 ),
152 }
153 }
154
155 pub fn new(
161 client_id: ClientId,
162 config: DydxDataClientConfig,
163 http_client: DydxHttpClient,
164 ws_client: Option<DydxWebSocketClient>,
165 ) -> anyhow::Result<Self> {
166 let clock = get_atomic_clock_realtime();
167 let data_sender = get_data_event_sender();
168
169 let instruments_cache = http_client.instruments().clone();
171
172 Ok(Self {
173 client_id,
174 config,
175 http_client,
176 ws_client,
177 is_connected: AtomicBool::new(false),
178 cancellation_token: CancellationToken::new(),
179 tasks: Vec::new(),
180 data_sender,
181 instruments: instruments_cache,
182 clock,
183 order_books: Arc::new(DashMap::new()),
184 last_quotes: Arc::new(DashMap::new()),
185 incomplete_bars: Arc::new(DashMap::new()),
186 bar_type_mappings: Arc::new(DashMap::new()),
187 active_orderbook_subs: Arc::new(DashMap::new()),
188 active_trade_subs: Arc::new(DashMap::new()),
189 active_bar_subs: Arc::new(DashMap::new()),
190 })
191 }
192
193 #[must_use]
195 pub fn venue(&self) -> Venue {
196 *DYDX_VENUE
197 }
198
199 fn ws_client(&self) -> anyhow::Result<&DydxWebSocketClient> {
200 self.ws_client
201 .as_ref()
202 .context("websocket client not initialized; call connect first")
203 }
204
205 #[allow(dead_code)]
206 fn ws_client_mut(&mut self) -> anyhow::Result<&mut DydxWebSocketClient> {
207 self.ws_client
208 .as_mut()
209 .context("websocket client not initialized; call connect first")
210 }
211
212 #[must_use]
214 pub fn is_connected(&self) -> bool {
215 self.is_connected.load(Ordering::Relaxed)
216 }
217
218 fn spawn_ws<F>(&self, fut: F, context: &'static str)
222 where
223 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
224 {
225 tokio::spawn(async move {
226 if let Err(e) = fut.await {
227 tracing::error!("{context}: {e:?}");
228 }
229 });
230 }
231
232 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
247 tracing::info!("Bootstrapping dYdX instruments");
248
249 let instruments = self
252 .http_client
253 .request_instruments(None, None, None)
254 .await
255 .context("failed to load instruments from dYdX")?;
256
257 if instruments.is_empty() {
258 tracing::warn!("No dYdX instruments were loaded");
259 return Ok(instruments);
260 }
261
262 tracing::info!("Loaded {} dYdX instruments", instruments.len());
263
264 self.http_client.cache_instruments(instruments.clone());
266
267 if let Some(ref ws) = self.ws_client {
269 ws.cache_instruments(instruments.clone());
270 }
271
272 Ok(instruments)
273 }
274}
275
276#[async_trait::async_trait]
278impl DataClient for DydxDataClient {
279 fn client_id(&self) -> ClientId {
280 self.client_id
281 }
282
283 fn venue(&self) -> Option<Venue> {
284 Some(*DYDX_VENUE)
285 }
286
287 fn start(&mut self) -> anyhow::Result<()> {
288 tracing::info!(
289 client_id = %self.client_id,
290 is_testnet = self.http_client.is_testnet(),
291 "Starting dYdX data client"
292 );
293 Ok(())
294 }
295
296 fn stop(&mut self) -> anyhow::Result<()> {
297 tracing::info!("Stopping dYdX data client {}", self.client_id);
298 self.cancellation_token.cancel();
299 self.is_connected.store(false, Ordering::Relaxed);
300 Ok(())
301 }
302
303 fn reset(&mut self) -> anyhow::Result<()> {
304 tracing::debug!("Resetting dYdX data client {}", self.client_id);
305 self.is_connected.store(false, Ordering::Relaxed);
306 self.cancellation_token = CancellationToken::new();
307 self.tasks.clear();
308 Ok(())
309 }
310
311 fn dispose(&mut self) -> anyhow::Result<()> {
312 tracing::debug!("Disposing dYdX data client {}", self.client_id);
313 self.stop()
314 }
315
316 async fn connect(&mut self) -> anyhow::Result<()> {
317 if self.is_connected() {
318 return Ok(());
319 }
320
321 tracing::info!("Connecting dYdX data client");
322
323 self.bootstrap_instruments().await?;
325
326 if let Some(ref mut ws) = self.ws_client {
328 ws.connect()
329 .await
330 .context("failed to connect dYdX websocket")?;
331
332 ws.subscribe_markets()
333 .await
334 .context("failed to subscribe to markets channel")?;
335
336 if let Some(rx) = ws.take_receiver() {
338 let data_tx = self.data_sender.clone();
339 let instruments = self.instruments.clone();
340 let order_books = self.order_books.clone();
341 let last_quotes = self.last_quotes.clone();
342 let ws_client = self.ws_client.clone();
343 let active_orderbook_subs = self.active_orderbook_subs.clone();
344 let active_trade_subs = self.active_trade_subs.clone();
345 let active_bar_subs = self.active_bar_subs.clone();
346
347 let task = tokio::spawn(async move {
348 let mut rx = rx;
349 while let Some(msg) = rx.recv().await {
350 let ctx = WsMessageContext {
351 data_sender: &data_tx,
352 instruments: &instruments,
353 order_books: &order_books,
354 last_quotes: &last_quotes,
355 ws_client: &ws_client,
356 active_orderbook_subs: &active_orderbook_subs,
357 active_trade_subs: &active_trade_subs,
358 active_bar_subs: &active_bar_subs,
359 };
360 Self::handle_ws_message(msg, &ctx);
361 }
362 });
363 self.tasks.push(task);
364 } else {
365 tracing::warn!("No inbound WS receiver available after connect");
366 }
367 }
368
369 self.start_orderbook_refresh_task()?;
371
372 self.start_instrument_refresh_task()?;
374
375 self.is_connected.store(true, Ordering::Relaxed);
376 tracing::info!("Connected dYdX data client");
377
378 Ok(())
379 }
380
381 async fn disconnect(&mut self) -> anyhow::Result<()> {
382 if !self.is_connected() {
383 return Ok(());
384 }
385
386 tracing::info!("Disconnecting dYdX data client");
387
388 if let Some(ref mut ws) = self.ws_client {
390 ws.disconnect()
391 .await
392 .context("failed to disconnect dYdX websocket")?;
393 }
394
395 self.is_connected.store(false, Ordering::Relaxed);
396 tracing::info!("Disconnected dYdX data client");
397
398 Ok(())
399 }
400
401 fn is_connected(&self) -> bool {
402 self.is_connected.load(Ordering::Relaxed)
403 }
404
405 fn is_disconnected(&self) -> bool {
406 !self.is_connected()
407 }
408
409 fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
410 tracing::debug!("unsubscribe_instruments: dYdX markets channel is global; no-op");
414 Ok(())
415 }
416
417 fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
418 tracing::debug!("unsubscribe_instrument: dYdX markets channel is global; no-op");
421 Ok(())
422 }
423
424 fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
425 tracing::debug!("subscribe_instruments: dYdX auto-subscribes via markets channel");
428 Ok(())
429 }
430
431 fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
432 tracing::debug!("subscribe_instrument: dYdX auto-subscribes via markets channel");
435 Ok(())
436 }
437
438 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
439 let ws = self.ws_client()?.clone();
440 let instrument_id = cmd.instrument_id;
441
442 self.active_trade_subs.insert(instrument_id, ());
444
445 self.spawn_ws(
446 async move {
447 ws.subscribe_trades(instrument_id)
448 .await
449 .context("trade subscription")
450 },
451 "dYdX trade subscription",
452 );
453
454 Ok(())
455 }
456
457 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
458 use nautilus_model::enums::BookType;
459
460 if cmd.book_type != BookType::L2_MBP {
461 anyhow::bail!(
462 "dYdX only supports L2_MBP order book deltas, received {:?}",
463 cmd.book_type
464 );
465 }
466
467 self.ensure_order_book(cmd.instrument_id, BookType::L2_MBP);
469
470 self.active_orderbook_subs.insert(cmd.instrument_id, ());
472
473 let ws = self.ws_client()?.clone();
474 let instrument_id = cmd.instrument_id;
475
476 self.spawn_ws(
477 async move {
478 ws.subscribe_orderbook(instrument_id)
479 .await
480 .context("orderbook subscription")
481 },
482 "dYdX orderbook subscription",
483 );
484
485 Ok(())
486 }
487
488 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
489 use nautilus_model::enums::BookType;
490
491 if cmd.book_type != BookType::L2_MBP {
492 anyhow::bail!(
493 "dYdX only supports L2_MBP order book snapshots, received {:?}",
494 cmd.book_type
495 );
496 }
497
498 self.active_orderbook_subs.insert(cmd.instrument_id, ());
500
501 let ws = self.ws_client()?.clone();
502 let instrument_id = cmd.instrument_id;
503
504 tokio::spawn(async move {
505 if let Err(e) = ws.subscribe_orderbook(instrument_id).await {
506 tracing::error!(
507 "Failed to subscribe to orderbook snapshot for {instrument_id}: {e:?}"
508 );
509 }
510 });
511
512 Ok(())
513 }
514
515 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
516 tracing::debug!(
519 "subscribe_quotes for {}: delegating to subscribe_book_deltas (no native quotes channel)",
520 cmd.instrument_id
521 );
522
523 use nautilus_common::messages::data::SubscribeBookDeltas;
525 use nautilus_model::enums::BookType;
526
527 let book_cmd = SubscribeBookDeltas {
528 client_id: cmd.client_id,
529 venue: cmd.venue,
530 instrument_id: cmd.instrument_id,
531 book_type: BookType::L2_MBP,
532 depth: None,
533 managed: false,
534 params: None,
535 command_id: cmd.command_id,
536 ts_init: cmd.ts_init,
537 };
538
539 self.subscribe_book_deltas(&book_cmd)
540 }
541
542 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
543 let ws = self.ws_client()?.clone();
544 let instrument_id = cmd.bar_type.instrument_id();
545 let spec = cmd.bar_type.spec();
546
547 let resolution = match spec.step.get() {
549 1 => match spec.aggregation {
550 nautilus_model::enums::BarAggregation::Minute => "1MIN",
551 nautilus_model::enums::BarAggregation::Hour => "1HOUR",
552 nautilus_model::enums::BarAggregation::Day => "1DAY",
553 _ => {
554 anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation);
555 }
556 },
557 5 => {
558 if spec.aggregation == nautilus_model::enums::BarAggregation::Minute {
559 "5MINS"
560 } else {
561 anyhow::bail!("Unsupported 5-step aggregation: {:?}", spec.aggregation);
562 }
563 }
564 15 => {
565 if spec.aggregation == nautilus_model::enums::BarAggregation::Minute {
566 "15MINS"
567 } else {
568 anyhow::bail!("Unsupported 15-step aggregation: {:?}", spec.aggregation);
569 }
570 }
571 30 => {
572 if spec.aggregation == nautilus_model::enums::BarAggregation::Minute {
573 "30MINS"
574 } else {
575 anyhow::bail!("Unsupported 30-step aggregation: {:?}", spec.aggregation);
576 }
577 }
578 4 => {
579 if spec.aggregation == nautilus_model::enums::BarAggregation::Hour {
580 "4HOURS"
581 } else {
582 anyhow::bail!("Unsupported 4-step aggregation: {:?}", spec.aggregation);
583 }
584 }
585 step => {
586 anyhow::bail!("Unsupported bar step: {step}");
587 }
588 };
589
590 let bar_type = cmd.bar_type;
592 self.active_bar_subs
593 .insert((instrument_id, resolution.to_string()), bar_type);
594
595 self.spawn_ws(
596 async move {
597 let ticker = instrument_id.symbol.as_str().trim_end_matches(".DYDX");
599 let topic = format!("{ticker}/{resolution}");
600 if let Err(e) =
601 ws.send_command(crate::websocket::handler::HandlerCommand::RegisterBarType {
602 topic,
603 bar_type,
604 })
605 {
606 anyhow::bail!("Failed to register bar type: {e}");
607 }
608
609 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
611
612 ws.subscribe_candles(instrument_id, resolution)
613 .await
614 .context("candles subscription")
615 },
616 "dYdX candles subscription",
617 );
618
619 Ok(())
620 }
621
622 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
623 self.active_trade_subs.remove(&cmd.instrument_id);
625
626 let ws = self.ws_client()?.clone();
627 let instrument_id = cmd.instrument_id;
628
629 self.spawn_ws(
630 async move {
631 ws.unsubscribe_trades(instrument_id)
632 .await
633 .context("trade unsubscription")
634 },
635 "dYdX trade unsubscription",
636 );
637
638 Ok(())
639 }
640
641 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
642 self.active_orderbook_subs.remove(&cmd.instrument_id);
644
645 let ws = self.ws_client()?.clone();
646 let instrument_id = cmd.instrument_id;
647
648 self.spawn_ws(
649 async move {
650 ws.unsubscribe_orderbook(instrument_id)
651 .await
652 .context("orderbook unsubscription")
653 },
654 "dYdX orderbook unsubscription",
655 );
656
657 Ok(())
658 }
659
660 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
661 self.active_orderbook_subs.remove(&cmd.instrument_id);
665
666 let ws = self.ws_client()?.clone();
667 let instrument_id = cmd.instrument_id;
668
669 self.spawn_ws(
670 async move {
671 ws.unsubscribe_orderbook(instrument_id)
672 .await
673 .context("orderbook snapshot unsubscription")
674 },
675 "dYdX orderbook snapshot unsubscription",
676 );
677
678 Ok(())
679 }
680
681 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
682 tracing::debug!(
684 "unsubscribe_quotes for {}: delegating to unsubscribe_book_deltas (no native quotes channel)",
685 cmd.instrument_id
686 );
687
688 let book_cmd = UnsubscribeBookDeltas {
689 instrument_id: cmd.instrument_id,
690 client_id: cmd.client_id,
691 venue: cmd.venue,
692 command_id: cmd.command_id,
693 ts_init: cmd.ts_init,
694 params: cmd.params.clone(),
695 };
696
697 self.unsubscribe_book_deltas(&book_cmd)
698 }
699
700 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
701 let ws = self.ws_client()?.clone();
702 let instrument_id = cmd.bar_type.instrument_id();
703 let spec = cmd.bar_type.spec();
704
705 let resolution = match spec.step.get() {
707 1 => match spec.aggregation {
708 nautilus_model::enums::BarAggregation::Minute => "1MIN",
709 nautilus_model::enums::BarAggregation::Hour => "1HOUR",
710 nautilus_model::enums::BarAggregation::Day => "1DAY",
711 _ => {
712 anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation);
713 }
714 },
715 5 => {
716 if spec.aggregation == nautilus_model::enums::BarAggregation::Minute {
717 "5MINS"
718 } else {
719 anyhow::bail!("Unsupported 5-step aggregation: {:?}", spec.aggregation);
720 }
721 }
722 15 => {
723 if spec.aggregation == nautilus_model::enums::BarAggregation::Minute {
724 "15MINS"
725 } else {
726 anyhow::bail!("Unsupported 15-step aggregation: {:?}", spec.aggregation);
727 }
728 }
729 30 => {
730 if spec.aggregation == nautilus_model::enums::BarAggregation::Minute {
731 "30MINS"
732 } else {
733 anyhow::bail!("Unsupported 30-step aggregation: {:?}", spec.aggregation);
734 }
735 }
736 4 => {
737 if spec.aggregation == nautilus_model::enums::BarAggregation::Hour {
738 "4HOURS"
739 } else {
740 anyhow::bail!("Unsupported 4-step aggregation: {:?}", spec.aggregation);
741 }
742 }
743 step => {
744 anyhow::bail!("Unsupported bar step: {step}");
745 }
746 };
747
748 self.active_bar_subs
750 .remove(&(instrument_id, resolution.to_string()));
751
752 let ticker = instrument_id.symbol.as_str().trim_end_matches(".DYDX");
754 let topic = format!("{ticker}/{resolution}");
755 if let Err(e) =
756 ws.send_command(crate::websocket::handler::HandlerCommand::UnregisterBarType { topic })
757 {
758 tracing::warn!("Failed to unregister bar type: {e}");
759 }
760
761 self.spawn_ws(
762 async move {
763 ws.unsubscribe_candles(instrument_id, resolution)
764 .await
765 .context("candles unsubscription")
766 },
767 "dYdX candles unsubscription",
768 );
769
770 Ok(())
771 }
772
773 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
774 let instruments_cache = self.instruments.clone();
775 let sender = self.data_sender.clone();
776 let http = self.http_client.clone();
777 let instrument_id = request.instrument_id;
778 let request_id = request.request_id;
779 let client_id = request.client_id.unwrap_or(self.client_id);
780 let start = request.start;
781 let end = request.end;
782 let params = request.params.clone();
783 let clock = self.clock;
784 let start_nanos = datetime_to_unix_nanos(start);
785 let end_nanos = datetime_to_unix_nanos(end);
786
787 tokio::spawn(async move {
788 let symbol = Ustr::from(instrument_id.symbol.as_str());
790 let instrument = if let Some(cached) = instruments_cache.get(&symbol) {
791 tracing::debug!("Found instrument {instrument_id} in cache");
792 Some(cached.clone())
793 } else {
794 tracing::debug!("Instrument {instrument_id} not in cache, fetching from API");
796 match http.request_instruments(None, None, None).await {
797 Ok(instruments) => {
798 for inst in &instruments {
800 upsert_instrument(&instruments_cache, inst.clone());
801 }
802 instruments.into_iter().find(|i| i.id() == instrument_id)
804 }
805 Err(e) => {
806 tracing::error!("Failed to fetch instruments from dYdX: {e:?}");
807 None
808 }
809 }
810 };
811
812 if let Some(inst) = instrument {
813 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
814 request_id,
815 client_id,
816 instrument_id,
817 inst,
818 start_nanos,
819 end_nanos,
820 clock.get_time_ns(),
821 params,
822 )));
823
824 if let Err(e) = sender.send(DataEvent::Response(response)) {
825 tracing::error!("Failed to send instrument response: {e}");
826 }
827 } else {
828 tracing::error!("Instrument {instrument_id} not found");
829 }
830 });
831
832 Ok(())
833 }
834
835 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
836 let http = self.http_client.clone();
837 let sender = self.data_sender.clone();
838 let instruments_cache = self.instruments.clone();
839 let request_id = request.request_id;
840 let client_id = request.client_id.unwrap_or(self.client_id);
841 let venue = self.venue();
842 let start = request.start;
843 let end = request.end;
844 let params = request.params.clone();
845 let clock = self.clock;
846 let start_nanos = datetime_to_unix_nanos(start);
847 let end_nanos = datetime_to_unix_nanos(end);
848
849 tokio::spawn(async move {
850 match http.request_instruments(None, None, None).await {
851 Ok(instruments) => {
852 tracing::info!("Fetched {} instruments from dYdX", instruments.len());
853
854 for instrument in &instruments {
856 upsert_instrument(&instruments_cache, instrument.clone());
857 }
858
859 let response = DataResponse::Instruments(InstrumentsResponse::new(
860 request_id,
861 client_id,
862 venue,
863 instruments,
864 start_nanos,
865 end_nanos,
866 clock.get_time_ns(),
867 params,
868 ));
869
870 if let Err(e) = sender.send(DataEvent::Response(response)) {
871 tracing::error!("Failed to send instruments response: {e}");
872 }
873 }
874 Err(e) => {
875 tracing::error!("Failed to fetch instruments from dYdX: {e:?}");
876
877 let response = DataResponse::Instruments(InstrumentsResponse::new(
879 request_id,
880 client_id,
881 venue,
882 Vec::new(),
883 start_nanos,
884 end_nanos,
885 clock.get_time_ns(),
886 params,
887 ));
888
889 if let Err(e) = sender.send(DataEvent::Response(response)) {
890 tracing::error!("Failed to send empty instruments response: {e}");
891 }
892 }
893 }
894 });
895
896 Ok(())
897 }
898
899 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
900 use nautilus_model::{
901 data::TradeTick,
902 enums::{AggressorSide, OrderSide},
903 identifiers::TradeId,
904 };
905
906 let http = self.http_client.clone();
907 let instruments = self.instruments.clone();
908 let sender = self.data_sender.clone();
909 let instrument_id = request.instrument_id;
910 let start = request.start;
911 let end = request.end;
912 let limit = request.limit.map(|n| n.get() as u32);
913 let request_id = request.request_id;
914 let client_id = request.client_id.unwrap_or(self.client_id);
915 let params = request.params.clone();
916 let clock = self.clock;
917 let start_nanos = datetime_to_unix_nanos(start);
918 let end_nanos = datetime_to_unix_nanos(end);
919
920 tokio::spawn(async move {
921 let ticker = instrument_id
925 .symbol
926 .as_str()
927 .trim_end_matches("-PERP")
928 .to_string();
929
930 let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
932 Some(inst) => inst.clone(),
933 None => {
934 tracing::error!(
935 "request_trades: instrument {} not found in cache; cannot convert trades",
936 instrument_id
937 );
938 let ts_now = clock.get_time_ns();
939 let response = DataResponse::Trades(TradesResponse::new(
940 request_id,
941 client_id,
942 instrument_id,
943 Vec::new(),
944 start_nanos,
945 end_nanos,
946 ts_now,
947 params,
948 ));
949 if let Err(e) = sender.send(DataEvent::Response(response)) {
950 tracing::error!("Failed to send empty trades response: {e}");
951 }
952 return;
953 }
954 };
955
956 let price_precision = instrument.price_precision();
957 let size_precision = instrument.size_precision();
958
959 match http
960 .inner
961 .get_trades(&ticker, limit)
962 .await
963 .context("failed to request trades from dYdX")
964 {
965 Ok(trades_response) => {
966 let mut ticks = Vec::new();
967
968 for trade in trades_response.trades {
969 let aggressor_side = match trade.side {
970 OrderSide::Buy => AggressorSide::Buyer,
971 OrderSide::Sell => AggressorSide::Seller,
972 _ => continue, };
974
975 let price = match Price::from_decimal_dp(trade.price, price_precision) {
976 Ok(p) => p,
977 Err(e) => {
978 tracing::warn!(
979 "request_trades: failed to convert price for trade {}: {e}",
980 trade.id
981 );
982 continue;
983 }
984 };
985
986 let size = match Quantity::from_decimal_dp(trade.size, size_precision) {
987 Ok(q) => q,
988 Err(e) => {
989 tracing::warn!(
990 "request_trades: failed to convert size for trade {}: {e}",
991 trade.id
992 );
993 continue;
994 }
995 };
996
997 let ts_event = match trade.created_at.timestamp_nanos_opt() {
998 Some(ns) if ns >= 0 => UnixNanos::from(ns as u64),
999 _ => {
1000 tracing::warn!(
1001 "request_trades: timestamp out of range for trade {}",
1002 trade.id
1003 );
1004 continue;
1005 }
1006 };
1007
1008 if let Some(start_ts) = start_nanos
1010 && ts_event < start_ts
1011 {
1012 continue;
1013 }
1014 if let Some(end_ts) = end_nanos
1015 && ts_event > end_ts
1016 {
1017 continue;
1018 }
1019
1020 let tick = TradeTick::new(
1021 instrument_id,
1022 price,
1023 size,
1024 aggressor_side,
1025 TradeId::new(&trade.id),
1026 ts_event,
1027 clock.get_time_ns(),
1028 );
1029 ticks.push(tick);
1030 }
1031
1032 let response = DataResponse::Trades(TradesResponse::new(
1033 request_id,
1034 client_id,
1035 instrument_id,
1036 ticks,
1037 start_nanos,
1038 end_nanos,
1039 clock.get_time_ns(),
1040 params,
1041 ));
1042
1043 if let Err(e) = sender.send(DataEvent::Response(response)) {
1044 tracing::error!("Failed to send trades response: {e}");
1045 }
1046 }
1047 Err(e) => {
1048 tracing::error!("Trade request failed for {}: {e:?}", instrument_id);
1049
1050 let response = DataResponse::Trades(TradesResponse::new(
1051 request_id,
1052 client_id,
1053 instrument_id,
1054 Vec::new(),
1055 start_nanos,
1056 end_nanos,
1057 clock.get_time_ns(),
1058 params,
1059 ));
1060
1061 if let Err(e) = sender.send(DataEvent::Response(response)) {
1062 tracing::error!("Failed to send empty trades response: {e}");
1063 }
1064 }
1065 }
1066 });
1067
1068 Ok(())
1069 }
1070
1071 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1072 use chrono::Duration;
1073 use nautilus_model::enums::{AggregationSource, BarAggregation, PriceType};
1074
1075 const DYDX_MAX_BARS_PER_REQUEST: u32 = 1_000;
1076
1077 let bar_type = request.bar_type;
1078 let spec = bar_type.spec();
1079
1080 if bar_type.aggregation_source() != AggregationSource::External {
1082 anyhow::bail!(
1083 "dYdX only supports EXTERNAL aggregation, got {:?}",
1084 bar_type.aggregation_source()
1085 );
1086 }
1087
1088 if spec.price_type != PriceType::Last {
1089 anyhow::bail!(
1090 "dYdX only supports LAST price type, got {:?}",
1091 spec.price_type
1092 );
1093 }
1094
1095 let resolution = match spec.step.get() {
1097 1 => match spec.aggregation {
1098 BarAggregation::Minute => "1MIN",
1099 BarAggregation::Hour => "1HOUR",
1100 BarAggregation::Day => "1DAY",
1101 _ => {
1102 anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation);
1103 }
1104 },
1105 5 if spec.aggregation == BarAggregation::Minute => "5MINS",
1106 15 if spec.aggregation == BarAggregation::Minute => "15MINS",
1107 30 if spec.aggregation == BarAggregation::Minute => "30MINS",
1108 4 if spec.aggregation == BarAggregation::Hour => "4HOURS",
1109 step => {
1110 anyhow::bail!("Unsupported bar step: {step}");
1111 }
1112 };
1113
1114 let http = self.http_client.clone();
1115 let instruments = self.instruments.clone();
1116 let sender = self.data_sender.clone();
1117 let instrument_id = bar_type.instrument_id();
1118 let symbol = instrument_id
1120 .symbol
1121 .as_str()
1122 .trim_end_matches("-PERP")
1123 .to_string();
1124 let request_id = request.request_id;
1125 let client_id = request.client_id.unwrap_or(self.client_id);
1126 let params = request.params.clone();
1127 let clock = self.clock;
1128
1129 let start = request.start;
1130 let end = request.end;
1131 let overall_limit = request.limit.map(|n| n.get() as u32);
1132
1133 let start_nanos = datetime_to_unix_nanos(start);
1135 let end_nanos = datetime_to_unix_nanos(end);
1136
1137 let resolution_enum = match resolution {
1139 "1MIN" => crate::common::enums::DydxCandleResolution::OneMinute,
1140 "5MINS" => crate::common::enums::DydxCandleResolution::FiveMinutes,
1141 "15MINS" => crate::common::enums::DydxCandleResolution::FifteenMinutes,
1142 "30MINS" => crate::common::enums::DydxCandleResolution::ThirtyMinutes,
1143 "1HOUR" => crate::common::enums::DydxCandleResolution::OneHour,
1144 "4HOURS" => crate::common::enums::DydxCandleResolution::FourHours,
1145 "1DAY" => crate::common::enums::DydxCandleResolution::OneDay,
1146 _ => {
1147 anyhow::bail!("Unsupported resolution: {resolution}");
1148 }
1149 };
1150
1151 tokio::spawn(async move {
1152 let bar_secs: i64 = match spec.aggregation {
1154 BarAggregation::Minute => spec.step.get() as i64 * 60,
1155 BarAggregation::Hour => spec.step.get() as i64 * 3_600,
1156 BarAggregation::Day => spec.step.get() as i64 * 86_400,
1157 _ => {
1158 tracing::error!(
1159 "Unsupported aggregation for request_bars: {:?}",
1160 spec.aggregation
1161 );
1162 return;
1163 }
1164 };
1165
1166 let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
1168 Some(inst) => inst.clone(),
1169 None => {
1170 tracing::error!(
1171 "request_bars: instrument {} not found in cache; cannot convert candles",
1172 instrument_id
1173 );
1174 let ts_now = clock.get_time_ns();
1175 let response = DataResponse::Bars(BarsResponse::new(
1176 request_id,
1177 client_id,
1178 bar_type,
1179 Vec::new(),
1180 start_nanos,
1181 end_nanos,
1182 ts_now,
1183 params,
1184 ));
1185 if let Err(e) = sender.send(DataEvent::Response(response)) {
1186 tracing::error!("Failed to send empty bars response: {e}");
1187 }
1188 return;
1189 }
1190 };
1191
1192 let price_precision = instrument.price_precision();
1193 let size_precision = instrument.size_precision();
1194
1195 let mut all_bars: Vec<Bar> = Vec::new();
1196
1197 let (range_start, range_end) = match (start, end) {
1199 (Some(s), Some(e)) if e > s => (s, e),
1200 _ => {
1201 let limit = overall_limit.unwrap_or(DYDX_MAX_BARS_PER_REQUEST);
1202 match http
1203 .inner
1204 .get_candles(&symbol, resolution_enum, Some(limit), None, None)
1205 .await
1206 {
1207 Ok(candles_response) => {
1208 tracing::debug!(
1209 "request_bars fetched {} candles without explicit date range",
1210 candles_response.candles.len()
1211 );
1212
1213 for candle in &candles_response.candles {
1214 match Self::candle_to_bar(
1215 candle,
1216 bar_type,
1217 price_precision,
1218 size_precision,
1219 bar_secs,
1220 clock,
1221 ) {
1222 Ok(bar) => all_bars.push(bar),
1223 Err(e) => {
1224 tracing::warn!(
1225 "Failed to convert dYdX candle to bar for {}: {e}",
1226 instrument_id
1227 );
1228 }
1229 }
1230 }
1231
1232 let current_time_ns = clock.get_time_ns();
1233 all_bars.retain(|bar| bar.ts_event < current_time_ns);
1234
1235 let response = DataResponse::Bars(BarsResponse::new(
1236 request_id,
1237 client_id,
1238 bar_type,
1239 all_bars,
1240 start_nanos,
1241 end_nanos,
1242 current_time_ns,
1243 params,
1244 ));
1245
1246 if let Err(e) = sender.send(DataEvent::Response(response)) {
1247 tracing::error!("Failed to send bars response: {e}");
1248 }
1249 }
1250 Err(e) => {
1251 tracing::error!(
1252 "Failed to request candles for {symbol} without date range: {e:?}"
1253 );
1254 }
1255 }
1256 return;
1257 }
1258 };
1259
1260 let total_secs = (range_end - range_start).num_seconds().max(0);
1262 let expected_bars = (total_secs / bar_secs).max(1) as u64;
1263
1264 tracing::debug!(
1265 "request_bars range {:?} -> {:?}, expected_bars ~= {}",
1266 range_start,
1267 range_end,
1268 expected_bars
1269 );
1270
1271 let mut remaining = overall_limit.unwrap_or(u32::MAX);
1272
1273 let bars_per_call = DYDX_MAX_BARS_PER_REQUEST.min(remaining);
1275 let chunk_duration = Duration::seconds(bar_secs * bars_per_call as i64);
1276
1277 let mut chunk_start = range_start;
1278
1279 while chunk_start < range_end && remaining > 0 {
1280 let mut chunk_end = chunk_start + chunk_duration;
1281 if chunk_end > range_end {
1282 chunk_end = range_end;
1283 }
1284
1285 let per_call_limit = remaining.min(DYDX_MAX_BARS_PER_REQUEST);
1286
1287 tracing::debug!(
1288 "request_bars chunk: {} -> {}, limit={}",
1289 chunk_start,
1290 chunk_end,
1291 per_call_limit
1292 );
1293
1294 match http
1295 .inner
1296 .get_candles(
1297 &symbol,
1298 resolution_enum,
1299 Some(per_call_limit),
1300 Some(chunk_start),
1301 Some(chunk_end),
1302 )
1303 .await
1304 {
1305 Ok(candles_response) => {
1306 let count = candles_response.candles.len() as u32;
1307
1308 if count == 0 {
1309 break;
1311 }
1312
1313 for candle in &candles_response.candles {
1315 match Self::candle_to_bar(
1316 candle,
1317 bar_type,
1318 price_precision,
1319 size_precision,
1320 bar_secs,
1321 clock,
1322 ) {
1323 Ok(bar) => all_bars.push(bar),
1324 Err(e) => {
1325 tracing::warn!(
1326 "Failed to convert dYdX candle to bar for {}: {e}",
1327 instrument_id
1328 );
1329 }
1330 }
1331 }
1332
1333 if remaining <= count {
1334 break;
1335 } else {
1336 remaining -= count;
1337 }
1338 }
1339 Err(e) => {
1340 tracing::error!(
1341 "Failed to request candles for {symbol} in chunk {:?} -> {:?}: {e:?}",
1342 chunk_start,
1343 chunk_end
1344 );
1345 break;
1346 }
1347 }
1348
1349 chunk_start += chunk_duration;
1350 }
1351
1352 tracing::debug!("request_bars completed partitioned fetch for {}", bar_type);
1353
1354 let current_time_ns = clock.get_time_ns();
1356 all_bars.retain(|bar| bar.ts_event < current_time_ns);
1357
1358 tracing::debug!(
1359 "request_bars filtered to {} completed bars (current_time_ns={})",
1360 all_bars.len(),
1361 current_time_ns
1362 );
1363
1364 let response = DataResponse::Bars(BarsResponse::new(
1365 request_id,
1366 client_id,
1367 bar_type,
1368 all_bars,
1369 start_nanos,
1370 end_nanos,
1371 current_time_ns,
1372 params,
1373 ));
1374
1375 if let Err(e) = sender.send(DataEvent::Response(response)) {
1376 tracing::error!("Failed to send bars response: {e}");
1377 }
1378 });
1379
1380 Ok(())
1381 }
1382}
1383
1384fn upsert_instrument(cache: &Arc<DashMap<Ustr, InstrumentAny>>, instrument: InstrumentAny) {
1386 let symbol = Ustr::from(instrument.id().symbol.as_str());
1387 cache.insert(symbol, instrument);
1388}
1389
1390fn datetime_to_unix_nanos(value: Option<chrono::DateTime<chrono::Utc>>) -> Option<UnixNanos> {
1392 value
1393 .and_then(|dt| dt.timestamp_nanos_opt())
1394 .and_then(|nanos| u64::try_from(nanos).ok())
1395 .map(UnixNanos::from)
1396}
1397
1398impl DydxDataClient {
1399 pub fn start_instrument_refresh_task(&mut self) -> anyhow::Result<()> {
1408 let interval_secs = match self.config.instrument_refresh_interval_secs {
1409 Some(secs) if secs > 0 => secs,
1410 _ => {
1411 tracing::info!("Instrument refresh disabled (interval not configured)");
1412 return Ok(());
1413 }
1414 };
1415
1416 let interval = Duration::from_secs(interval_secs);
1417 let http_client = self.http_client.clone();
1418 let instruments_cache = self.instruments.clone();
1419 let cancellation_token = self.cancellation_token.clone();
1420
1421 tracing::info!(
1422 "Starting instrument refresh task (interval: {}s)",
1423 interval_secs
1424 );
1425
1426 let task = tokio::spawn(async move {
1427 let mut interval_timer = tokio::time::interval(interval);
1428 interval_timer.tick().await; loop {
1431 tokio::select! {
1432 _ = cancellation_token.cancelled() => {
1433 tracing::info!("Instrument refresh task cancelled");
1434 break;
1435 }
1436 _ = interval_timer.tick() => {
1437 tracing::debug!("Refreshing instruments");
1438
1439 match http_client.request_instruments(None, None, None).await {
1440 Ok(instruments) => {
1441 tracing::debug!("Refreshed {} instruments", instruments.len());
1442
1443 for instrument in instruments {
1445 upsert_instrument(&instruments_cache, instrument);
1446 }
1447
1448 let all_instruments: Vec<_> = instruments_cache
1450 .iter()
1451 .map(|entry| entry.value().clone())
1452 .collect();
1453 http_client.cache_instruments(all_instruments);
1454 }
1455 Err(e) => {
1456 tracing::error!("Failed to refresh instruments: {}", e);
1457 }
1458 }
1459 }
1460 }
1461 }
1462 });
1463
1464 self.tasks.push(task);
1465 Ok(())
1466 }
1467
1468 fn start_orderbook_refresh_task(&mut self) -> anyhow::Result<()> {
1478 let interval_secs = match self.config.orderbook_refresh_interval_secs {
1479 Some(secs) if secs > 0 => secs,
1480 _ => {
1481 tracing::info!("Orderbook snapshot refresh disabled (interval not configured)");
1482 return Ok(());
1483 }
1484 };
1485
1486 let interval = Duration::from_secs(interval_secs);
1487 let http_client = self.http_client.clone();
1488 let instruments = self.instruments.clone();
1489 let order_books = self.order_books.clone();
1490 let active_subs = self.active_orderbook_subs.clone();
1491 let cancellation_token = self.cancellation_token.clone();
1492 let data_sender = self.data_sender.clone();
1493
1494 tracing::info!(
1495 "Starting orderbook snapshot refresh task (interval: {}s)",
1496 interval_secs
1497 );
1498
1499 let task = tokio::spawn(async move {
1500 let mut interval_timer = tokio::time::interval(interval);
1501 interval_timer.tick().await; loop {
1504 tokio::select! {
1505 _ = cancellation_token.cancelled() => {
1506 tracing::info!("Orderbook refresh task cancelled");
1507 break;
1508 }
1509 _ = interval_timer.tick() => {
1510 let active_instruments: Vec<InstrumentId> = active_subs
1511 .iter()
1512 .map(|entry| *entry.key())
1513 .collect();
1514
1515 if active_instruments.is_empty() {
1516 tracing::debug!("No active orderbook subscriptions to refresh");
1517 continue;
1518 }
1519
1520 tracing::debug!(
1521 "Refreshing {} orderbook snapshots",
1522 active_instruments.len()
1523 );
1524
1525 for instrument_id in active_instruments {
1526 let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
1528 Some(inst) => inst.clone(),
1529 None => {
1530 tracing::warn!(
1531 "Cannot refresh orderbook: no instrument for {}",
1532 instrument_id
1533 );
1534 continue;
1535 }
1536 };
1537
1538 let symbol = instrument_id.symbol.as_str().trim_end_matches("-PERP");
1540 let snapshot_result = http_client.inner.get_orderbook(symbol).await;
1541
1542 let snapshot = match snapshot_result {
1543 Ok(s) => s,
1544 Err(e) => {
1545 tracing::error!(
1546 "Failed to fetch orderbook snapshot for {}: {}",
1547 instrument_id,
1548 e
1549 );
1550 continue;
1551 }
1552 };
1553
1554 let deltas_result = Self::parse_orderbook_snapshot(
1556 instrument_id,
1557 &snapshot,
1558 &instrument,
1559 );
1560
1561 let deltas = match deltas_result {
1562 Ok(d) => d,
1563 Err(e) => {
1564 tracing::error!(
1565 "Failed to parse orderbook snapshot for {}: {}",
1566 instrument_id,
1567 e
1568 );
1569 continue;
1570 }
1571 };
1572
1573 if let Some(mut book) = order_books.get_mut(&instrument_id) {
1575 if let Err(e) = book.apply_deltas(&deltas) {
1576 tracing::error!(
1577 "Failed to apply orderbook snapshot for {}: {}",
1578 instrument_id,
1579 e
1580 );
1581 continue;
1582 }
1583
1584 tracing::debug!(
1585 "Refreshed orderbook snapshot for {} (bid={:?}, ask={:?})",
1586 instrument_id,
1587 book.best_bid_price(),
1588 book.best_ask_price()
1589 );
1590 }
1591
1592 use nautilus_model::data::OrderBookDeltas_API;
1594 let data = nautilus_model::data::Data::from(OrderBookDeltas_API::new(deltas));
1595 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1596 tracing::error!("Failed to emit orderbook snapshot: {}", e);
1597 }
1598 }
1599 }
1600 }
1601 }
1602 });
1603
1604 self.tasks.push(task);
1605 Ok(())
1606 }
1607
1608 fn parse_orderbook_snapshot(
1612 instrument_id: InstrumentId,
1613 snapshot: &crate::http::models::OrderbookResponse,
1614 instrument: &InstrumentAny,
1615 ) -> anyhow::Result<nautilus_model::data::OrderBookDeltas> {
1616 use nautilus_model::{
1617 data::{BookOrder, OrderBookDelta},
1618 enums::{BookAction, OrderSide, RecordFlag},
1619 instruments::Instrument,
1620 types::{Price, Quantity},
1621 };
1622
1623 let ts_init = get_atomic_clock_realtime().get_time_ns();
1624 let mut deltas = Vec::new();
1625
1626 deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init));
1628
1629 let price_precision = instrument.price_precision();
1630 let size_precision = instrument.size_precision();
1631
1632 let bids_len = snapshot.bids.len();
1633 let asks_len = snapshot.asks.len();
1634
1635 for (idx, bid) in snapshot.bids.iter().enumerate() {
1637 let is_last = idx == bids_len - 1 && asks_len == 0;
1638 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1639
1640 let price = Price::from_decimal_dp(bid.price, price_precision)
1641 .context("failed to parse bid price")?;
1642 let size = Quantity::from_decimal_dp(bid.size, size_precision)
1643 .context("failed to parse bid size")?;
1644
1645 let order = BookOrder::new(OrderSide::Buy, price, size, 0);
1646 deltas.push(OrderBookDelta::new(
1647 instrument_id,
1648 BookAction::Add,
1649 order,
1650 flags,
1651 0,
1652 ts_init,
1653 ts_init,
1654 ));
1655 }
1656
1657 for (idx, ask) in snapshot.asks.iter().enumerate() {
1659 let is_last = idx == asks_len - 1;
1660 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1661
1662 let price = Price::from_decimal_dp(ask.price, price_precision)
1663 .context("failed to parse ask price")?;
1664 let size = Quantity::from_decimal_dp(ask.size, size_precision)
1665 .context("failed to parse ask size")?;
1666
1667 let order = BookOrder::new(OrderSide::Sell, price, size, 0);
1668 deltas.push(OrderBookDelta::new(
1669 instrument_id,
1670 BookAction::Add,
1671 order,
1672 flags,
1673 0,
1674 ts_init,
1675 ts_init,
1676 ));
1677 }
1678
1679 Ok(nautilus_model::data::OrderBookDeltas::new(
1680 instrument_id,
1681 deltas,
1682 ))
1683 }
1684
1685 #[must_use]
1687 pub fn get_instrument(&self, symbol: &str) -> Option<InstrumentAny> {
1688 self.instruments.get(&Ustr::from(symbol)).map(|i| i.clone())
1689 }
1690
1691 #[must_use]
1693 pub fn get_instruments(&self) -> Vec<InstrumentAny> {
1694 self.instruments.iter().map(|i| i.clone()).collect()
1695 }
1696
1697 fn ensure_order_book(
1698 &self,
1699 instrument_id: InstrumentId,
1700 book_type: nautilus_model::enums::BookType,
1701 ) {
1702 self.order_books
1703 .entry(instrument_id)
1704 .or_insert_with(|| OrderBook::new(instrument_id, book_type));
1705 }
1706
1707 fn candle_to_bar(
1714 candle: &crate::http::models::Candle,
1715 bar_type: BarType,
1716 price_precision: u8,
1717 size_precision: u8,
1718 bar_secs: i64,
1719 clock: &AtomicTime,
1720 ) -> anyhow::Result<Bar> {
1721 use anyhow::Context;
1722
1723 let ts_init =
1725 datetime_to_unix_nanos(Some(candle.started_at)).unwrap_or_else(|| clock.get_time_ns());
1726
1727 let ts_event_ns = ts_init
1729 .as_u64()
1730 .saturating_add((bar_secs as u64).saturating_mul(1_000_000_000));
1731 let ts_event = UnixNanos::from(ts_event_ns);
1732
1733 let open = Price::from_decimal_dp(candle.open, price_precision)
1734 .context("failed to parse candle open price")?;
1735 let high = Price::from_decimal_dp(candle.high, price_precision)
1736 .context("failed to parse candle high price")?;
1737 let low = Price::from_decimal_dp(candle.low, price_precision)
1738 .context("failed to parse candle low price")?;
1739 let close = Price::from_decimal_dp(candle.close, price_precision)
1740 .context("failed to parse candle close price")?;
1741
1742 let volume = Quantity::from_decimal_dp(candle.base_token_volume, size_precision)
1744 .context("failed to parse candle base_token_volume")?;
1745
1746 Ok(Bar::new(
1747 bar_type, open, high, low, close, volume, ts_event, ts_init,
1748 ))
1749 }
1750
1751 fn handle_ws_message(
1752 message: crate::websocket::messages::NautilusWsMessage,
1753 ctx: &WsMessageContext,
1754 ) {
1755 match message {
1756 crate::websocket::messages::NautilusWsMessage::Data(payloads) => {
1757 Self::handle_data_message(payloads, ctx.data_sender);
1758 }
1759 crate::websocket::messages::NautilusWsMessage::Deltas(deltas) => {
1760 Self::handle_deltas_message(
1761 *deltas,
1762 ctx.data_sender,
1763 ctx.order_books,
1764 ctx.last_quotes,
1765 ctx.instruments,
1766 );
1767 }
1768 crate::websocket::messages::NautilusWsMessage::OraclePrices(oracle_prices) => {
1769 Self::handle_oracle_prices(oracle_prices, ctx.instruments, ctx.data_sender);
1770 }
1771 crate::websocket::messages::NautilusWsMessage::Error(err) => {
1772 tracing::error!("dYdX WS error: {err}");
1773 }
1774 crate::websocket::messages::NautilusWsMessage::Reconnected => {
1775 tracing::info!("dYdX WS reconnected - re-subscribing to active subscriptions");
1776
1777 if let Some(ws) = ctx.ws_client {
1779 let total_subs = ctx.active_orderbook_subs.len()
1780 + ctx.active_trade_subs.len()
1781 + ctx.active_bar_subs.len();
1782
1783 if total_subs == 0 {
1784 tracing::debug!("No active subscriptions to restore");
1785 return;
1786 }
1787
1788 tracing::info!(
1789 "Restoring {} subscriptions (orderbook={}, trades={}, bars={})",
1790 total_subs,
1791 ctx.active_orderbook_subs.len(),
1792 ctx.active_trade_subs.len(),
1793 ctx.active_bar_subs.len()
1794 );
1795
1796 for entry in ctx.active_orderbook_subs.iter() {
1798 let instrument_id = *entry.key();
1799 let ws_clone = ws.clone();
1800 tokio::spawn(async move {
1801 if let Err(e) = ws_clone.subscribe_orderbook(instrument_id).await {
1802 tracing::error!(
1803 "Failed to re-subscribe to orderbook for {instrument_id}: {e:?}"
1804 );
1805 } else {
1806 tracing::debug!("Re-subscribed to orderbook for {instrument_id}");
1807 }
1808 });
1809 }
1810
1811 for entry in ctx.active_trade_subs.iter() {
1813 let instrument_id = *entry.key();
1814 let ws_clone = ws.clone();
1815 tokio::spawn(async move {
1816 if let Err(e) = ws_clone.subscribe_trades(instrument_id).await {
1817 tracing::error!(
1818 "Failed to re-subscribe to trades for {instrument_id}: {e:?}"
1819 );
1820 } else {
1821 tracing::debug!("Re-subscribed to trades for {instrument_id}");
1822 }
1823 });
1824 }
1825
1826 for entry in ctx.active_bar_subs.iter() {
1828 let (instrument_id, resolution) = entry.key();
1829 let instrument_id = *instrument_id;
1830 let resolution = resolution.clone();
1831 let bar_type = *entry.value();
1832 let ws_clone = ws.clone();
1833
1834 let ticker = instrument_id.symbol.as_str().trim_end_matches(".DYDX");
1836 let topic = format!("{ticker}/{resolution}");
1837 if let Err(e) = ws.send_command(
1838 crate::websocket::handler::HandlerCommand::RegisterBarType {
1839 topic,
1840 bar_type,
1841 },
1842 ) {
1843 tracing::warn!(
1844 "Failed to re-register bar type for {instrument_id} ({resolution}): {e}"
1845 );
1846 }
1847
1848 tokio::spawn(async move {
1849 if let Err(e) =
1850 ws_clone.subscribe_candles(instrument_id, &resolution).await
1851 {
1852 tracing::error!(
1853 "Failed to re-subscribe to candles for {instrument_id} ({resolution}): {e:?}"
1854 );
1855 } else {
1856 tracing::debug!(
1857 "Re-subscribed to candles for {instrument_id} ({resolution})"
1858 );
1859 }
1860 });
1861 }
1862
1863 tracing::info!("Completed re-subscription requests after reconnection");
1864 } else {
1865 tracing::warn!("WebSocket client not available for re-subscription");
1866 }
1867 }
1868 crate::websocket::messages::NautilusWsMessage::Order(_)
1869 | crate::websocket::messages::NautilusWsMessage::Fill(_)
1870 | crate::websocket::messages::NautilusWsMessage::Position(_)
1871 | crate::websocket::messages::NautilusWsMessage::AccountState(_)
1872 | crate::websocket::messages::NautilusWsMessage::SubaccountSubscribed(_)
1873 | crate::websocket::messages::NautilusWsMessage::SubaccountsChannelData(_) => {
1874 tracing::debug!(
1875 "Ignoring execution/subaccount message on dYdX data client (handled by execution adapter)"
1876 );
1877 }
1878 }
1879 }
1880
1881 fn handle_data_message(
1882 payloads: Vec<NautilusData>,
1883 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1884 ) {
1885 for data in payloads {
1886 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1887 tracing::error!("Failed to emit data event: {e}");
1888 }
1889 }
1890 }
1891
1892 fn resolve_crossed_order_book(
1910 book: &mut OrderBook,
1911 venue_deltas: nautilus_model::data::OrderBookDeltas,
1912 instrument: &InstrumentAny,
1913 ) -> anyhow::Result<nautilus_model::data::OrderBookDeltas> {
1914 let instrument_id = venue_deltas.instrument_id;
1915 let ts_init = venue_deltas.ts_init;
1916 let mut all_deltas = venue_deltas.deltas.clone();
1917
1918 book.apply_deltas(&venue_deltas)?;
1920
1921 let mut is_crossed = if let (Some(bid_price), Some(ask_price)) =
1923 (book.best_bid_price(), book.best_ask_price())
1924 {
1925 bid_price >= ask_price
1926 } else {
1927 false
1928 };
1929
1930 while is_crossed {
1932 tracing::debug!(
1933 "Resolving crossed order book for {}: bid={:?} >= ask={:?}",
1934 instrument_id,
1935 book.best_bid_price(),
1936 book.best_ask_price()
1937 );
1938
1939 let bid_price = match book.best_bid_price() {
1940 Some(p) => p,
1941 None => break,
1942 };
1943 let ask_price = match book.best_ask_price() {
1944 Some(p) => p,
1945 None => break,
1946 };
1947 let bid_size = match book.best_bid_size() {
1948 Some(s) => s,
1949 None => break,
1950 };
1951 let ask_size = match book.best_ask_size() {
1952 Some(s) => s,
1953 None => break,
1954 };
1955
1956 let mut temp_deltas = Vec::new();
1957
1958 if bid_size > ask_size {
1959 let new_bid_size = Quantity::new(
1961 bid_size.as_f64() - ask_size.as_f64(),
1962 instrument.size_precision(),
1963 );
1964 temp_deltas.push(OrderBookDelta::new(
1965 instrument_id,
1966 BookAction::Update,
1967 BookOrder::new(OrderSide::Buy, bid_price, new_bid_size, 0),
1968 0,
1969 0,
1970 ts_init,
1971 ts_init,
1972 ));
1973 temp_deltas.push(OrderBookDelta::new(
1974 instrument_id,
1975 BookAction::Delete,
1976 BookOrder::new(
1977 OrderSide::Sell,
1978 ask_price,
1979 Quantity::new(0.0, instrument.size_precision()),
1980 0,
1981 ),
1982 0,
1983 0,
1984 ts_init,
1985 ts_init,
1986 ));
1987 } else if bid_size < ask_size {
1988 let new_ask_size = Quantity::new(
1990 ask_size.as_f64() - bid_size.as_f64(),
1991 instrument.size_precision(),
1992 );
1993 temp_deltas.push(OrderBookDelta::new(
1994 instrument_id,
1995 BookAction::Update,
1996 BookOrder::new(OrderSide::Sell, ask_price, new_ask_size, 0),
1997 0,
1998 0,
1999 ts_init,
2000 ts_init,
2001 ));
2002 temp_deltas.push(OrderBookDelta::new(
2003 instrument_id,
2004 BookAction::Delete,
2005 BookOrder::new(
2006 OrderSide::Buy,
2007 bid_price,
2008 Quantity::new(0.0, instrument.size_precision()),
2009 0,
2010 ),
2011 0,
2012 0,
2013 ts_init,
2014 ts_init,
2015 ));
2016 } else {
2017 temp_deltas.push(OrderBookDelta::new(
2019 instrument_id,
2020 BookAction::Delete,
2021 BookOrder::new(
2022 OrderSide::Buy,
2023 bid_price,
2024 Quantity::new(0.0, instrument.size_precision()),
2025 0,
2026 ),
2027 0,
2028 0,
2029 ts_init,
2030 ts_init,
2031 ));
2032 temp_deltas.push(OrderBookDelta::new(
2033 instrument_id,
2034 BookAction::Delete,
2035 BookOrder::new(
2036 OrderSide::Sell,
2037 ask_price,
2038 Quantity::new(0.0, instrument.size_precision()),
2039 0,
2040 ),
2041 0,
2042 0,
2043 ts_init,
2044 ts_init,
2045 ));
2046 }
2047
2048 let temp_deltas_obj =
2050 nautilus_model::data::OrderBookDeltas::new(instrument_id, temp_deltas.clone());
2051 book.apply_deltas(&temp_deltas_obj)?;
2052 all_deltas.extend(temp_deltas);
2053
2054 is_crossed = if let (Some(bid_price), Some(ask_price)) =
2056 (book.best_bid_price(), book.best_ask_price())
2057 {
2058 bid_price >= ask_price
2059 } else {
2060 false
2061 };
2062 }
2063
2064 if let Some(last_delta) = all_deltas.last_mut() {
2066 last_delta.flags = RecordFlag::F_LAST as u8;
2067 }
2068
2069 Ok(nautilus_model::data::OrderBookDeltas::new(
2070 instrument_id,
2071 all_deltas,
2072 ))
2073 }
2074
2075 fn handle_deltas_message(
2076 deltas: nautilus_model::data::OrderBookDeltas,
2077 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
2078 order_books: &Arc<DashMap<InstrumentId, OrderBook>>,
2079 last_quotes: &Arc<DashMap<InstrumentId, QuoteTick>>,
2080 instruments: &Arc<DashMap<Ustr, InstrumentAny>>,
2081 ) {
2082 use nautilus_model::enums::BookType;
2083
2084 let instrument_id = deltas.instrument_id;
2085
2086 let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
2088 Some(inst) => inst.clone(),
2089 None => {
2090 tracing::error!(
2091 "Cannot resolve crossed order book: no instrument for {instrument_id}"
2092 );
2093 if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::from(
2095 OrderBookDeltas_API::new(deltas),
2096 ))) {
2097 tracing::error!("Failed to emit order book deltas: {e}");
2098 }
2099 return;
2100 }
2101 };
2102
2103 let mut book = order_books
2105 .entry(instrument_id)
2106 .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
2107
2108 let resolved_deltas = match Self::resolve_crossed_order_book(&mut book, deltas, &instrument)
2110 {
2111 Ok(d) => d,
2112 Err(e) => {
2113 tracing::error!("Failed to resolve crossed order book for {instrument_id}: {e}");
2114 return;
2115 }
2116 };
2117
2118 let quote_opt = if let (Some(bid_price), Some(ask_price)) =
2121 (book.best_bid_price(), book.best_ask_price())
2122 && let (Some(bid_size), Some(ask_size)) = (book.best_bid_size(), book.best_ask_size())
2123 {
2124 Some(QuoteTick::new(
2125 instrument_id,
2126 bid_price,
2127 ask_price,
2128 bid_size,
2129 ask_size,
2130 resolved_deltas.ts_event,
2131 resolved_deltas.ts_init,
2132 ))
2133 } else {
2134 if book.best_bid_price().is_none() && book.best_ask_price().is_none() {
2136 tracing::debug!(
2137 "Empty orderbook for {instrument_id} after applying deltas, using last quote"
2138 );
2139 last_quotes.get(&instrument_id).map(|q| *q)
2140 } else {
2141 None
2142 }
2143 };
2144
2145 if let Some(quote) = quote_opt {
2146 let emit_quote =
2148 !matches!(last_quotes.get(&instrument_id), Some(existing) if *existing == quote);
2149
2150 if emit_quote {
2151 last_quotes.insert(instrument_id, quote);
2152 if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Quote(quote))) {
2153 tracing::error!("Failed to emit quote tick: {e}");
2154 }
2155 }
2156 } else if book.best_bid_price().is_some() || book.best_ask_price().is_some() {
2157 tracing::debug!(
2159 "Incomplete top-of-book for {instrument_id} (bid={:?}, ask={:?})",
2160 book.best_bid_price(),
2161 book.best_ask_price()
2162 );
2163 }
2164
2165 let data: NautilusData = OrderBookDeltas_API::new(resolved_deltas).into();
2167 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
2168 tracing::error!("Failed to emit order book deltas event: {e}");
2169 }
2170 }
2171
2172 fn handle_oracle_prices(
2173 oracle_prices: std::collections::HashMap<
2174 String,
2175 crate::websocket::types::DydxOraclePriceMarket,
2176 >,
2177 instruments: &Arc<DashMap<Ustr, InstrumentAny>>,
2178 _data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
2179 ) {
2180 use crate::types::DydxOraclePrice;
2181
2182 let ts_init = get_atomic_clock_realtime().get_time_ns();
2183
2184 for (symbol_str, oracle_market) in oracle_prices {
2185 let symbol = Ustr::from(&symbol_str);
2186
2187 let Some(instrument) = instruments.get(&symbol) else {
2189 tracing::debug!(
2190 symbol = %symbol,
2191 "Received oracle price for unknown instrument (not cached yet)"
2192 );
2193 continue;
2194 };
2195
2196 let instrument_id = instrument.id();
2197
2198 let oracle_price_str = &oracle_market.oracle_price;
2200 let Ok(oracle_price_f64) = oracle_price_str.parse::<f64>() else {
2201 tracing::error!(
2202 symbol = %symbol,
2203 price_str = %oracle_price_str,
2204 "Failed to parse oracle price as f64"
2205 );
2206 continue;
2207 };
2208
2209 let price_precision = instrument.price_precision();
2210 let oracle_price = Price::from_raw(
2211 (oracle_price_f64 * 10_f64.powi(price_precision as i32)) as PriceRaw,
2212 price_precision,
2213 );
2214
2215 let oracle_price_event = DydxOraclePrice::new(
2216 instrument_id,
2217 oracle_price,
2218 ts_init, ts_init,
2220 );
2221
2222 tracing::debug!(
2223 instrument_id = %instrument_id,
2224 oracle_price = %oracle_price,
2225 "Received dYdX oracle price: {oracle_price_event:?} (not yet forwarded as Data event)"
2226 );
2227
2228 }
2231 }
2232}
2233
2234#[cfg(test)]
2238mod tests {
2239 use std::{collections::HashMap, net::SocketAddr};
2240
2241 use axum::{
2242 Router,
2243 extract::{Path, Query, State},
2244 response::Json,
2245 routing::get,
2246 };
2247 use indexmap::IndexMap;
2248 use nautilus_common::{
2249 messages::{DataEvent, data::DataResponse},
2250 runner::set_data_event_sender,
2251 };
2252 use nautilus_core::UUID4;
2253 use nautilus_model::{
2254 data::{
2255 BarSpecification, BarType, Data as NautilusData, OrderBookDelta, OrderBookDeltas,
2256 TradeTick, order::BookOrder,
2257 },
2258 enums::{
2259 AggregationSource, AggressorSide, BarAggregation, BookAction, BookType, OrderSide,
2260 PriceType,
2261 },
2262 identifiers::{ClientId, InstrumentId, Symbol, Venue},
2263 instruments::{CryptoPerpetual, Instrument, InstrumentAny},
2264 orderbook::OrderBook,
2265 types::{Price, Quantity},
2266 };
2267 use rstest::rstest;
2268 use rust_decimal::Decimal;
2269 use rust_decimal_macros::dec;
2270 use tokio::net::TcpListener;
2271
2272 use super::*;
2273 use crate::http::models::{Candle, CandlesResponse};
2274
2275 fn setup_test_env() {
2276 let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel();
2278 set_data_event_sender(sender);
2279 }
2280
2281 #[rstest]
2282 fn test_new_data_client() {
2283 setup_test_env();
2284
2285 let client_id = ClientId::from("DYDX-001");
2286 let config = DydxDataClientConfig::default();
2287 let http_client = DydxHttpClient::default();
2288
2289 let client = DydxDataClient::new(client_id, config, http_client, None);
2290 assert!(client.is_ok());
2291
2292 let client = client.unwrap();
2293 assert_eq!(client.client_id(), client_id);
2294 assert_eq!(client.venue(), *DYDX_VENUE);
2295 assert!(!client.is_connected());
2296 }
2297
2298 #[tokio::test]
2299 async fn test_data_client_lifecycle() {
2300 setup_test_env();
2301
2302 let client_id = ClientId::from("DYDX-001");
2303 let config = DydxDataClientConfig::default();
2304 let http_client = DydxHttpClient::default();
2305
2306 let mut client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2307
2308 assert!(client.start().is_ok());
2310
2311 assert!(client.stop().is_ok());
2313 assert!(!client.is_connected());
2314
2315 assert!(client.reset().is_ok());
2317
2318 assert!(client.dispose().is_ok());
2320 }
2321
2322 #[rstest]
2323 fn test_subscribe_unsubscribe_instruments_noop() {
2324 setup_test_env();
2325
2326 let client_id = ClientId::from("DYDX-TEST");
2327 let config = DydxDataClientConfig::default();
2328 let http_client = DydxHttpClient::default();
2329
2330 let mut client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2331
2332 let venue = *DYDX_VENUE;
2333 let command_id = UUID4::new();
2334 let ts_init = get_atomic_clock_realtime().get_time_ns();
2335
2336 let subscribe = SubscribeInstruments {
2337 client_id: Some(client_id),
2338 venue,
2339 command_id,
2340 ts_init,
2341 params: None,
2342 };
2343 let unsubscribe = UnsubscribeInstruments::new(None, venue, command_id, ts_init, None);
2344
2345 assert!(client.subscribe_instruments(&subscribe).is_ok());
2347 assert!(client.unsubscribe_instruments(&unsubscribe).is_ok());
2348 }
2349
2350 #[tokio::test]
2351 async fn test_handle_ws_message_deltas_updates_orderbook_and_emits_quote() {
2352 setup_test_env();
2353
2354 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2355 let instruments = Arc::new(DashMap::new());
2356 let order_books = Arc::new(DashMap::new());
2357 let last_quotes = Arc::new(DashMap::new());
2358 let ws_client: Option<DydxWebSocketClient> = None;
2359 let active_orderbook_subs = Arc::new(DashMap::new());
2360 let active_trade_subs = Arc::new(DashMap::new());
2361 let active_bar_subs = Arc::new(DashMap::new());
2362
2363 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2364 let bar_ts = get_atomic_clock_realtime().get_time_ns();
2365
2366 use nautilus_model::{identifiers::Symbol, instruments::CryptoPerpetual, types::Currency};
2368 let symbol = Symbol::from("BTC-USD-PERP");
2369 let instrument = CryptoPerpetual::new(
2370 instrument_id,
2371 symbol,
2372 Currency::BTC(),
2373 Currency::USD(),
2374 Currency::USD(),
2375 false,
2376 2,
2377 4,
2378 Price::from("0.01"),
2379 Quantity::from("0.0001"),
2380 None,
2381 None,
2382 None,
2383 None,
2384 None,
2385 None,
2386 None,
2387 None,
2388 None,
2389 None,
2390 None,
2391 None,
2392 bar_ts,
2393 bar_ts,
2394 );
2395 instruments.insert(
2396 Ustr::from("BTC-USD-PERP"),
2397 InstrumentAny::CryptoPerpetual(instrument),
2398 );
2399
2400 let price = Price::from("100.00");
2401 let size = Quantity::from("1.0");
2402
2403 let bid_delta = OrderBookDelta::new(
2405 instrument_id,
2406 BookAction::Add,
2407 nautilus_model::data::order::BookOrder::new(
2408 nautilus_model::enums::OrderSide::Buy,
2409 price,
2410 size,
2411 1,
2412 ),
2413 0,
2414 1,
2415 bar_ts,
2416 bar_ts,
2417 );
2418 let ask_delta = OrderBookDelta::new(
2419 instrument_id,
2420 BookAction::Add,
2421 nautilus_model::data::order::BookOrder::new(
2422 nautilus_model::enums::OrderSide::Sell,
2423 Price::from("101.00"),
2424 size,
2425 1,
2426 ),
2427 0,
2428 1,
2429 bar_ts,
2430 bar_ts,
2431 );
2432 let deltas = OrderBookDeltas::new(instrument_id, vec![bid_delta, ask_delta]);
2433
2434 let message = crate::websocket::messages::NautilusWsMessage::Deltas(Box::new(deltas));
2435
2436 let ctx = WsMessageContext {
2437 data_sender: &sender,
2438 instruments: &instruments,
2439 order_books: &order_books,
2440 last_quotes: &last_quotes,
2441 ws_client: &ws_client,
2442 active_orderbook_subs: &active_orderbook_subs,
2443 active_trade_subs: &active_trade_subs,
2444 active_bar_subs: &active_bar_subs,
2445 };
2446 DydxDataClient::handle_ws_message(message, &ctx);
2447
2448 assert!(order_books.get(&instrument_id).is_some());
2450 assert!(last_quotes.get(&instrument_id).is_some());
2451
2452 let mut saw_quote = false;
2454 let mut saw_deltas = false;
2455
2456 while let Ok(event) = rx.try_recv() {
2457 if let DataEvent::Data(data) = event {
2458 match data {
2459 NautilusData::Quote(_) => saw_quote = true,
2460 NautilusData::Deltas(_) => saw_deltas = true,
2461 _ => {}
2462 }
2463 }
2464 }
2465
2466 assert!(saw_quote);
2467 assert!(saw_deltas);
2468 }
2469
2470 #[rstest]
2471 fn test_handle_ws_message_error_does_not_panic() {
2472 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2475 let instruments = Arc::new(DashMap::new());
2476 let order_books = Arc::new(DashMap::new());
2477 let last_quotes = Arc::new(DashMap::new());
2478 let ws_client: Option<DydxWebSocketClient> = None;
2479 let active_orderbook_subs = Arc::new(DashMap::new());
2480 let active_trade_subs = Arc::new(DashMap::new());
2481 let active_bar_subs = Arc::new(DashMap::new());
2482
2483 let ctx = WsMessageContext {
2484 data_sender: &sender,
2485 instruments: &instruments,
2486 order_books: &order_books,
2487 last_quotes: &last_quotes,
2488 ws_client: &ws_client,
2489 active_orderbook_subs: &active_orderbook_subs,
2490 active_trade_subs: &active_trade_subs,
2491 active_bar_subs: &active_bar_subs,
2492 };
2493
2494 let err = crate::websocket::error::DydxWebSocketError::from_message(
2495 "malformed WebSocket payload".to_string(),
2496 );
2497
2498 DydxDataClient::handle_ws_message(
2499 crate::websocket::messages::NautilusWsMessage::Error(err),
2500 &ctx,
2501 );
2502 }
2503
2504 #[tokio::test]
2505 async fn test_request_bars_partitioning_math_does_not_panic() {
2506 setup_test_env();
2507
2508 let client_id = ClientId::from("DYDX-BARS");
2509 let config = DydxDataClientConfig::default();
2510 let http_client = DydxHttpClient::default();
2511
2512 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2513
2514 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2515 let spec = BarSpecification {
2516 step: std::num::NonZeroUsize::new(1).unwrap(),
2517 aggregation: BarAggregation::Minute,
2518 price_type: PriceType::Last,
2519 };
2520 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2521
2522 let now = chrono::Utc::now();
2523 let start = Some(now - chrono::Duration::hours(10));
2524 let end = Some(now);
2525
2526 let request = RequestBars::new(
2527 bar_type,
2528 start,
2529 end,
2530 None,
2531 Some(client_id),
2532 UUID4::new(),
2533 get_atomic_clock_realtime().get_time_ns(),
2534 None,
2535 );
2536
2537 assert!(client.request_bars(&request).is_ok());
2540 }
2541
2542 #[tokio::test]
2543 async fn test_request_bars_partitioning_months_range_does_not_overflow() {
2544 setup_test_env();
2545
2546 let now = chrono::Utc::now();
2548 let candle = crate::http::models::Candle {
2549 started_at: now - chrono::Duration::minutes(1),
2550 ticker: "BTC-USD".to_string(),
2551 resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2552 open: dec!(100.0),
2553 high: dec!(101.0),
2554 low: dec!(99.0),
2555 close: dec!(100.5),
2556 base_token_volume: dec!(1.0),
2557 usd_volume: dec!(100.0),
2558 trades: 10,
2559 starting_open_interest: dec!(1000.0),
2560 };
2561 let candles_response = crate::http::models::CandlesResponse {
2562 candles: vec![candle],
2563 };
2564 let state = CandlesTestState {
2565 response: Arc::new(candles_response),
2566 };
2567 let addr = start_candles_test_server(state).await;
2568 let base_url = format!("http://{}", addr);
2569
2570 let client_id = ClientId::from("DYDX-BARS-MONTHS");
2571 let config = DydxDataClientConfig {
2572 base_url_http: Some(base_url),
2573 is_testnet: true,
2574 ..Default::default()
2575 };
2576
2577 let http_client = DydxHttpClient::new(
2578 config.base_url_http.clone(),
2579 config.http_timeout_secs,
2580 config.http_proxy_url.clone(),
2581 config.is_testnet,
2582 None,
2583 )
2584 .unwrap();
2585
2586 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2587
2588 let instrument = create_test_instrument_any();
2590 let instrument_id = instrument.id();
2591 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
2592 client.instruments.insert(symbol_key, instrument);
2593
2594 let spec = BarSpecification {
2595 step: std::num::NonZeroUsize::new(1).unwrap(),
2596 aggregation: BarAggregation::Minute,
2597 price_type: PriceType::Last,
2598 };
2599 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2600
2601 let start = Some(now - chrono::Duration::days(90));
2603 let end = Some(now);
2604
2605 let limit = Some(std::num::NonZeroUsize::new(10).unwrap());
2607
2608 let request = RequestBars::new(
2609 bar_type,
2610 start,
2611 end,
2612 limit,
2613 Some(client_id),
2614 UUID4::new(),
2615 get_atomic_clock_realtime().get_time_ns(),
2616 None,
2617 );
2618
2619 assert!(client.request_bars(&request).is_ok());
2620 }
2621
2622 #[derive(Clone)]
2623 struct OrderbookTestState {
2624 snapshot: Arc<crate::http::models::OrderbookResponse>,
2625 }
2626
2627 #[derive(Clone)]
2628 struct TradesTestState {
2629 response: Arc<crate::http::models::TradesResponse>,
2630 last_ticker: Arc<tokio::sync::Mutex<Option<String>>>,
2631 last_limit: Arc<tokio::sync::Mutex<Option<Option<u32>>>>,
2632 }
2633
2634 #[derive(Clone)]
2635 struct CandlesTestState {
2636 response: Arc<crate::http::models::CandlesResponse>,
2637 }
2638
2639 async fn start_orderbook_test_server(state: OrderbookTestState) -> SocketAddr {
2640 async fn handle_orderbook(
2641 Path(_ticker): Path<String>,
2642 State(state): State<OrderbookTestState>,
2643 ) -> Json<crate::http::models::OrderbookResponse> {
2644 Json((*state.snapshot).clone())
2645 }
2646
2647 let router = Router::new().route(
2648 "/v4/orderbooks/perpetualMarket/{ticker}",
2649 get(handle_orderbook).with_state(state),
2650 );
2651
2652 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2653 let addr = listener.local_addr().unwrap();
2654
2655 tokio::spawn(async move {
2656 axum::serve(listener, router.into_make_service())
2657 .await
2658 .unwrap();
2659 });
2660
2661 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2662 addr
2663 }
2664
2665 async fn start_trades_test_server(state: TradesTestState) -> SocketAddr {
2666 async fn handle_trades(
2667 Path(ticker): Path<String>,
2668 Query(params): Query<HashMap<String, String>>,
2669 State(state): State<TradesTestState>,
2670 ) -> Json<crate::http::models::TradesResponse> {
2671 {
2672 let mut last_ticker = state.last_ticker.lock().await;
2673 *last_ticker = Some(ticker);
2674 }
2675
2676 let limit = params
2677 .get("limit")
2678 .and_then(|value| value.parse::<u32>().ok());
2679 {
2680 let mut last_limit = state.last_limit.lock().await;
2681 *last_limit = Some(limit);
2682 }
2683
2684 Json((*state.response).clone())
2685 }
2686
2687 let router = Router::new().route(
2688 "/v4/trades/perpetualMarket/{ticker}",
2689 get(handle_trades).with_state(state),
2690 );
2691
2692 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2693 let addr = listener.local_addr().unwrap();
2694
2695 tokio::spawn(async move {
2696 axum::serve(listener, router.into_make_service())
2697 .await
2698 .unwrap();
2699 });
2700
2701 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2702 addr
2703 }
2704
2705 async fn start_candles_test_server(state: CandlesTestState) -> SocketAddr {
2706 async fn handle_candles(
2707 Path(_ticker): Path<String>,
2708 Query(_params): Query<HashMap<String, String>>,
2709 State(state): State<CandlesTestState>,
2710 ) -> Json<crate::http::models::CandlesResponse> {
2711 Json((*state.response).clone())
2712 }
2713
2714 let router = Router::new().route(
2715 "/v4/candles/perpetualMarkets/{ticker}",
2716 get(handle_candles).with_state(state),
2717 );
2718
2719 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2720 let addr = listener.local_addr().unwrap();
2721
2722 tokio::spawn(async move {
2723 axum::serve(listener, router.into_make_service())
2724 .await
2725 .unwrap();
2726 });
2727
2728 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2729 addr
2730 }
2731
2732 fn create_test_instrument_any() -> InstrumentAny {
2733 let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
2734
2735 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
2736 instrument_id,
2737 instrument_id.symbol,
2738 nautilus_model::types::currency::Currency::BTC(),
2739 nautilus_model::types::currency::Currency::USD(),
2740 nautilus_model::types::currency::Currency::USD(),
2741 false,
2742 2, 8, Price::new(0.01, 2), Quantity::new(0.001, 8), Some(Quantity::new(1.0, 0)), Some(Quantity::new(0.001, 8)), Some(Quantity::new(100000.0, 8)), Some(Quantity::new(0.001, 8)), None, None, Some(Price::new(1000000.0, 2)), Some(Price::new(0.01, 2)), Some(dec!(0.05)), Some(dec!(0.03)), Some(dec!(0.0002)), Some(dec!(0.0005)), UnixNanos::default(), UnixNanos::default(), ))
2761 }
2762
2763 #[tokio::test]
2768 async fn test_candle_to_bar_price_size_edge_cases() {
2769 setup_test_env();
2770
2771 let clock = get_atomic_clock_realtime();
2772 let now = chrono::Utc::now();
2773
2774 let candle = Candle {
2776 started_at: now,
2777 ticker: "BTC-USD".to_string(),
2778 resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2779 open: dec!(123456789.123456),
2780 high: dec!(987654321.987654), low: dec!(123456.789), close: dec!(223456789.123456), base_token_volume: dec!(0.00000001),
2784 usd_volume: dec!(1234500.0),
2785 trades: 42,
2786 starting_open_interest: dec!(1000.0),
2787 };
2788
2789 let instrument = create_test_instrument_any();
2790 let instrument_id = instrument.id();
2791 let spec = BarSpecification {
2792 step: std::num::NonZeroUsize::new(1).unwrap(),
2793 aggregation: BarAggregation::Minute,
2794 price_type: PriceType::Last,
2795 };
2796 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2797
2798 let bar = DydxDataClient::candle_to_bar(
2799 &candle,
2800 bar_type,
2801 instrument.price_precision(),
2802 instrument.size_precision(),
2803 60,
2804 clock,
2805 )
2806 .expect("candle_to_bar should handle large/scientific values");
2807
2808 assert!(bar.open.as_f64() > 0.0);
2809 assert!(bar.high.as_f64() >= bar.low.as_f64());
2810 assert!(bar.volume.as_f64() > 0.0);
2811 }
2812
2813 #[tokio::test]
2814 async fn test_candle_to_bar_ts_event_overflow_safe() {
2815 setup_test_env();
2816
2817 let clock = get_atomic_clock_realtime();
2818 let now = chrono::Utc::now();
2819
2820 let candle = Candle {
2821 started_at: now,
2822 ticker: "BTC-USD".to_string(),
2823 resolution: crate::common::enums::DydxCandleResolution::OneDay,
2824 open: Decimal::from(1),
2825 high: Decimal::from(1),
2826 low: Decimal::from(1),
2827 close: Decimal::from(1),
2828 base_token_volume: Decimal::from(1),
2829 usd_volume: Decimal::from(1),
2830 trades: 1,
2831 starting_open_interest: Decimal::from(1),
2832 };
2833
2834 let instrument = create_test_instrument_any();
2835 let instrument_id = instrument.id();
2836 let spec = BarSpecification {
2837 step: std::num::NonZeroUsize::new(1).unwrap(),
2838 aggregation: BarAggregation::Day,
2839 price_type: PriceType::Last,
2840 };
2841 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2842
2843 let bar_secs = i64::MAX / 1_000_000_000;
2845 let bar = DydxDataClient::candle_to_bar(
2846 &candle,
2847 bar_type,
2848 instrument.price_precision(),
2849 instrument.size_precision(),
2850 bar_secs,
2851 clock,
2852 )
2853 .expect("candle_to_bar should not overflow on ts_event");
2854
2855 assert!(bar.ts_event.as_u64() >= bar.ts_init.as_u64());
2856 }
2857
2858 #[tokio::test]
2859 async fn test_request_bars_incomplete_bar_filtering_with_clock_skew() {
2860 let clock = get_atomic_clock_realtime();
2863 let now = chrono::Utc::now();
2864
2865 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2868 set_data_event_sender(sender);
2869
2870 let candle_past = Candle {
2872 started_at: now - chrono::Duration::minutes(2),
2873 ticker: "BTC-USD".to_string(),
2874 resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2875 open: Decimal::from(1),
2876 high: Decimal::from(2),
2877 low: Decimal::from(1),
2878 close: Decimal::from(1),
2879 base_token_volume: Decimal::from(1),
2880 usd_volume: Decimal::from(1),
2881 trades: 1,
2882 starting_open_interest: Decimal::from(1),
2883 };
2884 let candle_future = Candle {
2885 started_at: now + chrono::Duration::minutes(2),
2886 ..candle_past.clone()
2887 };
2888
2889 let candles_response = CandlesResponse {
2890 candles: vec![candle_past, candle_future],
2891 };
2892
2893 let state = CandlesTestState {
2894 response: Arc::new(candles_response),
2895 };
2896 let addr = start_candles_test_server(state).await;
2897 let base_url = format!("http://{}", addr);
2898
2899 let client_id = ClientId::from("DYDX-BARS-SKEW");
2900 let config = DydxDataClientConfig {
2901 base_url_http: Some(base_url),
2902 is_testnet: true,
2903 ..Default::default()
2904 };
2905
2906 let http_client = DydxHttpClient::new(
2907 config.base_url_http.clone(),
2908 config.http_timeout_secs,
2909 config.http_proxy_url.clone(),
2910 config.is_testnet,
2911 None,
2912 )
2913 .unwrap();
2914
2915 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2916
2917 let instrument = create_test_instrument_any();
2918 let instrument_id = instrument.id();
2919 let symbol_key = Ustr::from(instrument_id.symbol.as_ref());
2920 client.instruments.insert(symbol_key, instrument);
2921
2922 let spec = BarSpecification {
2923 step: std::num::NonZeroUsize::new(1).unwrap(),
2924 aggregation: BarAggregation::Minute,
2925 price_type: PriceType::Last,
2926 };
2927 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2928
2929 let request = RequestBars::new(
2930 bar_type,
2931 Some(now - chrono::Duration::minutes(5)),
2932 Some(now + chrono::Duration::minutes(5)),
2933 None,
2934 Some(client_id),
2935 UUID4::new(),
2936 clock.get_time_ns(),
2937 None,
2938 );
2939
2940 assert!(client.request_bars(&request).is_ok());
2941
2942 let timeout = tokio::time::Duration::from_secs(3);
2943 if let Ok(Some(DataEvent::Response(DataResponse::Bars(resp)))) =
2944 tokio::time::timeout(timeout, rx.recv()).await
2945 {
2946 assert_eq!(resp.data.len(), 1);
2948 }
2949 }
2950
2951 #[rstest]
2952 fn test_decimal_to_f64_precision_loss_within_tolerance() {
2953 let price_value = 12345.125_f64;
2955 let qty_value = 0.00012345_f64;
2956
2957 let price = Price::new(price_value, 6);
2958 let qty = Quantity::new(qty_value, 8);
2959
2960 let price_diff = (price.as_f64() - price_value).abs();
2961 let qty_diff = (qty.as_f64() - qty_value).abs();
2962
2963 assert!(price_diff < 1e-10);
2965 assert!(qty_diff < 1e-12);
2966 }
2967
2968 #[tokio::test]
2969 async fn test_orderbook_refresh_task_applies_http_snapshot_and_emits_event() {
2970 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2972 set_data_event_sender(sender);
2973
2974 let snapshot = crate::http::models::OrderbookResponse {
2976 bids: vec![crate::http::models::OrderbookLevel {
2977 price: dec!(100.0),
2978 size: dec!(1.0),
2979 }],
2980 asks: vec![crate::http::models::OrderbookLevel {
2981 price: dec!(101.0),
2982 size: dec!(2.0),
2983 }],
2984 };
2985 let state = OrderbookTestState {
2986 snapshot: Arc::new(snapshot),
2987 };
2988 let addr = start_orderbook_test_server(state).await;
2989 let base_url = format!("http://{}", addr);
2990
2991 let client_id = ClientId::from("DYDX-REFRESH");
2993 let config = DydxDataClientConfig {
2994 is_testnet: true,
2995 base_url_http: Some(base_url),
2996 orderbook_refresh_interval_secs: Some(1),
2997 instrument_refresh_interval_secs: None,
2998 ..Default::default()
2999 };
3000
3001 let http_client = DydxHttpClient::new(
3002 config.base_url_http.clone(),
3003 config.http_timeout_secs,
3004 config.http_proxy_url.clone(),
3005 config.is_testnet,
3006 None,
3007 )
3008 .unwrap();
3009
3010 let mut client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3011
3012 let instrument = create_test_instrument_any();
3014 let instrument_id = instrument.id();
3015 let symbol_key = Ustr::from(instrument_id.symbol.as_ref());
3016 client.instruments.insert(symbol_key, instrument);
3017 client.order_books.insert(
3018 instrument_id,
3019 OrderBook::new(instrument_id, BookType::L2_MBP),
3020 );
3021 client.active_orderbook_subs.insert(instrument_id, ());
3022
3023 client.start_orderbook_refresh_task().unwrap();
3025
3026 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
3027 let mut saw_snapshot_event = false;
3028
3029 while std::time::Instant::now() < deadline {
3030 if let Ok(Some(DataEvent::Data(NautilusData::Deltas(_)))) =
3031 tokio::time::timeout(std::time::Duration::from_millis(250), rx.recv()).await
3032 {
3033 saw_snapshot_event = true;
3034 break;
3035 }
3036 }
3037
3038 assert!(
3039 saw_snapshot_event,
3040 "expected at least one snapshot deltas event from refresh task"
3041 );
3042
3043 let book = client
3045 .order_books
3046 .get(&instrument_id)
3047 .expect("orderbook should exist after refresh");
3048 let best_bid = book.best_bid_price().expect("best bid should be set");
3049 let best_ask = book.best_ask_price().expect("best ask should be set");
3050
3051 assert_eq!(best_bid, Price::from("100.00"));
3052 assert_eq!(best_ask, Price::from("101.00"));
3053 }
3054
3055 #[rstest]
3056 fn test_resolve_crossed_order_book_bid_larger_than_ask() {
3057 let instrument = create_test_instrument_any();
3060 let instrument_id = instrument.id();
3061 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3062 let ts_init = get_atomic_clock_realtime().get_time_ns();
3063
3064 let initial_deltas = vec![
3066 OrderBookDelta::new(
3067 instrument_id,
3068 BookAction::Add,
3069 BookOrder::new(
3070 OrderSide::Buy,
3071 Price::from("99.00"),
3072 Quantity::from("1.0"),
3073 0,
3074 ),
3075 0,
3076 0,
3077 ts_init,
3078 ts_init,
3079 ),
3080 OrderBookDelta::new(
3081 instrument_id,
3082 BookAction::Add,
3083 BookOrder::new(
3084 OrderSide::Sell,
3085 Price::from("101.00"),
3086 Quantity::from("2.0"),
3087 0,
3088 ),
3089 0,
3090 0,
3091 ts_init,
3092 ts_init,
3093 ),
3094 ];
3095 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3096 .unwrap();
3097
3098 let crossed_deltas = vec![OrderBookDelta::new(
3100 instrument_id,
3101 BookAction::Add,
3102 BookOrder::new(
3103 OrderSide::Buy,
3104 Price::from("102.00"),
3105 Quantity::from("5.0"),
3106 0,
3107 ),
3108 0,
3109 0,
3110 ts_init,
3111 ts_init,
3112 )];
3113 let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3114
3115 let resolved =
3116 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3117 .unwrap();
3118
3119 assert_eq!(book.best_bid_price(), Some(Price::from("102.00")));
3122 assert!(book.best_bid_size().unwrap().as_f64() < 5.0); assert!(
3124 book.best_ask_price().is_none()
3125 || book.best_ask_price().unwrap() > book.best_bid_price().unwrap()
3126 ); assert!(resolved.deltas.len() > 1); }
3131
3132 #[rstest]
3133 fn test_resolve_crossed_order_book_ask_larger_than_bid() {
3134 let instrument = create_test_instrument_any();
3137 let instrument_id = instrument.id();
3138 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3139 let ts_init = get_atomic_clock_realtime().get_time_ns();
3140
3141 let initial_deltas = vec![
3143 OrderBookDelta::new(
3144 instrument_id,
3145 BookAction::Add,
3146 BookOrder::new(
3147 OrderSide::Buy,
3148 Price::from("99.00"),
3149 Quantity::from("1.0"),
3150 0,
3151 ),
3152 0,
3153 0,
3154 ts_init,
3155 ts_init,
3156 ),
3157 OrderBookDelta::new(
3158 instrument_id,
3159 BookAction::Add,
3160 BookOrder::new(
3161 OrderSide::Sell,
3162 Price::from("101.00"),
3163 Quantity::from("5.0"),
3164 0,
3165 ),
3166 0,
3167 0,
3168 ts_init,
3169 ts_init,
3170 ),
3171 ];
3172 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3173 .unwrap();
3174
3175 let crossed_deltas = vec![OrderBookDelta::new(
3177 instrument_id,
3178 BookAction::Add,
3179 BookOrder::new(
3180 OrderSide::Buy,
3181 Price::from("102.00"),
3182 Quantity::from("2.0"),
3183 0,
3184 ),
3185 0,
3186 0,
3187 ts_init,
3188 ts_init,
3189 )];
3190 let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3191
3192 let resolved =
3193 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3194 .unwrap();
3195
3196 assert_eq!(book.best_ask_price(), Some(Price::from("101.00")));
3198 assert!(book.best_ask_size().unwrap().as_f64() < 5.0); assert_eq!(book.best_bid_price(), Some(Price::from("99.00"))); assert!(book.best_ask_price().unwrap() > book.best_bid_price().unwrap()); assert!(resolved.deltas.len() > 1);
3204 }
3205
3206 #[rstest]
3207 fn test_resolve_crossed_order_book_equal_sizes() {
3208 let instrument = create_test_instrument_any();
3211 let instrument_id = instrument.id();
3212 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3213 let ts_init = get_atomic_clock_realtime().get_time_ns();
3214
3215 let initial_deltas = vec![
3217 OrderBookDelta::new(
3218 instrument_id,
3219 BookAction::Add,
3220 BookOrder::new(
3221 OrderSide::Buy,
3222 Price::from("99.00"),
3223 Quantity::from("1.0"),
3224 0,
3225 ),
3226 0,
3227 0,
3228 ts_init,
3229 ts_init,
3230 ),
3231 OrderBookDelta::new(
3232 instrument_id,
3233 BookAction::Add,
3234 BookOrder::new(
3235 OrderSide::Sell,
3236 Price::from("101.00"),
3237 Quantity::from("3.0"),
3238 0,
3239 ),
3240 0,
3241 0,
3242 ts_init,
3243 ts_init,
3244 ),
3245 ];
3246 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3247 .unwrap();
3248
3249 let crossed_deltas = vec![OrderBookDelta::new(
3251 instrument_id,
3252 BookAction::Add,
3253 BookOrder::new(
3254 OrderSide::Buy,
3255 Price::from("102.00"),
3256 Quantity::from("3.0"),
3257 0,
3258 ),
3259 0,
3260 0,
3261 ts_init,
3262 ts_init,
3263 )];
3264 let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3265
3266 let resolved =
3267 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3268 .unwrap();
3269
3270 assert_eq!(book.best_bid_price(), Some(Price::from("99.00"))); if let Some(ask_price) = book.best_ask_price() {
3274 assert!(ask_price > book.best_bid_price().unwrap()); }
3276
3277 assert!(resolved.deltas.len() > 1);
3279 }
3280
3281 #[rstest]
3282 fn test_resolve_crossed_order_book_multiple_iterations() {
3283 let instrument = create_test_instrument_any();
3285 let instrument_id = instrument.id();
3286 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3287 let ts_init = get_atomic_clock_realtime().get_time_ns();
3288
3289 let initial_deltas = vec![
3291 OrderBookDelta::new(
3292 instrument_id,
3293 BookAction::Add,
3294 BookOrder::new(
3295 OrderSide::Buy,
3296 Price::from("98.00"),
3297 Quantity::from("1.0"),
3298 0,
3299 ),
3300 0,
3301 0,
3302 ts_init,
3303 ts_init,
3304 ),
3305 OrderBookDelta::new(
3306 instrument_id,
3307 BookAction::Add,
3308 BookOrder::new(
3309 OrderSide::Sell,
3310 Price::from("100.00"),
3311 Quantity::from("1.0"),
3312 0,
3313 ),
3314 0,
3315 0,
3316 ts_init,
3317 ts_init,
3318 ),
3319 OrderBookDelta::new(
3320 instrument_id,
3321 BookAction::Add,
3322 BookOrder::new(
3323 OrderSide::Sell,
3324 Price::from("101.00"),
3325 Quantity::from("1.0"),
3326 0,
3327 ),
3328 0,
3329 0,
3330 ts_init,
3331 ts_init,
3332 ),
3333 ];
3334 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3335 .unwrap();
3336
3337 let crossed_deltas = vec![
3339 OrderBookDelta::new(
3340 instrument_id,
3341 BookAction::Add,
3342 BookOrder::new(
3343 OrderSide::Buy,
3344 Price::from("102.00"),
3345 Quantity::from("1.0"),
3346 0,
3347 ),
3348 0,
3349 0,
3350 ts_init,
3351 ts_init,
3352 ),
3353 OrderBookDelta::new(
3354 instrument_id,
3355 BookAction::Add,
3356 BookOrder::new(
3357 OrderSide::Buy,
3358 Price::from("103.00"),
3359 Quantity::from("1.0"),
3360 0,
3361 ),
3362 0,
3363 0,
3364 ts_init,
3365 ts_init,
3366 ),
3367 ];
3368 let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3369
3370 let resolved =
3371 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3372 .unwrap();
3373
3374 if let (Some(bid_price), Some(ask_price)) = (book.best_bid_price(), book.best_ask_price()) {
3376 assert!(ask_price > bid_price, "Book should be uncrossed");
3377 }
3378
3379 assert!(resolved.deltas.len() > 2); }
3382
3383 #[rstest]
3384 fn test_resolve_crossed_order_book_non_crossed_passthrough() {
3385 let instrument = create_test_instrument_any();
3387 let instrument_id = instrument.id();
3388 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3389 let ts_init = get_atomic_clock_realtime().get_time_ns();
3390
3391 let initial_deltas = vec![
3393 OrderBookDelta::new(
3394 instrument_id,
3395 BookAction::Add,
3396 BookOrder::new(
3397 OrderSide::Buy,
3398 Price::from("99.00"),
3399 Quantity::from("1.0"),
3400 0,
3401 ),
3402 0,
3403 0,
3404 ts_init,
3405 ts_init,
3406 ),
3407 OrderBookDelta::new(
3408 instrument_id,
3409 BookAction::Add,
3410 BookOrder::new(
3411 OrderSide::Sell,
3412 Price::from("101.00"),
3413 Quantity::from("1.0"),
3414 0,
3415 ),
3416 0,
3417 0,
3418 ts_init,
3419 ts_init,
3420 ),
3421 ];
3422 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3423 .unwrap();
3424
3425 let new_deltas = vec![OrderBookDelta::new(
3427 instrument_id,
3428 BookAction::Add,
3429 BookOrder::new(
3430 OrderSide::Buy,
3431 Price::from("98.50"),
3432 Quantity::from("2.0"),
3433 0,
3434 ),
3435 0,
3436 0,
3437 ts_init,
3438 ts_init,
3439 )];
3440 let venue_deltas = OrderBookDeltas::new(instrument_id, new_deltas.clone());
3441
3442 let original_bid = book.best_bid_price();
3443 let original_ask = book.best_ask_price();
3444
3445 let resolved =
3446 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3447 .unwrap();
3448
3449 assert_eq!(resolved.deltas.len(), new_deltas.len());
3451 assert_eq!(book.best_bid_price(), original_bid);
3452 assert_eq!(book.best_ask_price(), original_ask);
3453 assert!(book.best_ask_price().unwrap() > book.best_bid_price().unwrap());
3454 }
3455
3456 #[tokio::test]
3461 async fn test_request_instruments_successful_fetch() {
3462 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3464 set_data_event_sender(sender);
3465
3466 let client_id = ClientId::from("DYDX-TEST");
3467 let config = DydxDataClientConfig::default();
3468 let http_client = DydxHttpClient::default();
3469 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3470
3471 let request = RequestInstruments::new(
3472 None,
3473 None,
3474 Some(client_id),
3475 Some(*DYDX_VENUE),
3476 UUID4::new(),
3477 get_atomic_clock_realtime().get_time_ns(),
3478 None,
3479 );
3480
3481 assert!(client.request_instruments(&request).is_ok());
3483
3484 let timeout = tokio::time::Duration::from_secs(5);
3486 let result = tokio::time::timeout(timeout, rx.recv()).await;
3487
3488 match result {
3489 Ok(Some(DataEvent::Response(resp))) => {
3490 if let DataResponse::Instruments(inst_resp) = resp {
3491 assert_eq!(inst_resp.correlation_id, request.request_id);
3493 assert_eq!(inst_resp.client_id, client_id);
3494 assert_eq!(inst_resp.venue, *DYDX_VENUE);
3495 assert!(inst_resp.start.is_none());
3496 assert!(inst_resp.end.is_none());
3497 }
3499 }
3500 Ok(Some(_)) => panic!("Expected InstrumentsResponse"),
3501 Ok(None) => panic!("Channel closed unexpectedly"),
3502 Err(_) => {
3503 println!("Test timed out - testnet may be unreachable");
3505 }
3506 }
3507 }
3508
3509 #[tokio::test]
3510 async fn test_request_instruments_empty_response_on_http_error() {
3511 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3513 set_data_event_sender(sender);
3514
3515 let client_id = ClientId::from("DYDX-ERROR-TEST");
3516 let config = DydxDataClientConfig {
3517 base_url_http: Some("http://invalid-url-does-not-exist.local".to_string()),
3518 ..Default::default()
3519 };
3520 let http_client = DydxHttpClient::new(
3521 config.base_url_http.clone(),
3522 config.http_timeout_secs,
3523 config.http_proxy_url.clone(),
3524 config.is_testnet,
3525 None,
3526 )
3527 .unwrap();
3528
3529 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3530
3531 let request = RequestInstruments::new(
3532 None,
3533 None,
3534 Some(client_id),
3535 Some(*DYDX_VENUE),
3536 UUID4::new(),
3537 get_atomic_clock_realtime().get_time_ns(),
3538 None,
3539 );
3540
3541 assert!(client.request_instruments(&request).is_ok());
3542
3543 let timeout = tokio::time::Duration::from_secs(3);
3545 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3546 tokio::time::timeout(timeout, rx.recv()).await
3547 {
3548 assert!(
3549 resp.data.is_empty(),
3550 "Expected empty instruments on HTTP error"
3551 );
3552 assert_eq!(resp.correlation_id, request.request_id);
3553 assert_eq!(resp.client_id, client_id);
3554 }
3555 }
3556
3557 #[tokio::test]
3558 async fn test_request_instruments_caching() {
3559 setup_test_env();
3561
3562 let client_id = ClientId::from("DYDX-CACHE-TEST");
3563 let config = DydxDataClientConfig::default();
3564 let http_client = DydxHttpClient::default();
3565 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3566
3567 let initial_cache_size = client.instruments.len();
3568
3569 let request = RequestInstruments::new(
3570 None,
3571 None,
3572 Some(client_id),
3573 Some(*DYDX_VENUE),
3574 UUID4::new(),
3575 get_atomic_clock_realtime().get_time_ns(),
3576 None,
3577 );
3578
3579 assert!(client.request_instruments(&request).is_ok());
3580
3581 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
3583
3584 let final_cache_size = client.instruments.len();
3586 assert!(final_cache_size >= initial_cache_size);
3589 }
3590
3591 #[tokio::test]
3592 async fn test_request_instruments_correlation_id_matching() {
3593 setup_test_env();
3595
3596 let client_id = ClientId::from("DYDX-CORR-TEST");
3597 let config = DydxDataClientConfig::default();
3598 let http_client = DydxHttpClient::default();
3599 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3600
3601 let request_id = UUID4::new();
3602 let request = RequestInstruments::new(
3603 None,
3604 None,
3605 Some(client_id),
3606 Some(*DYDX_VENUE),
3607 request_id,
3608 get_atomic_clock_realtime().get_time_ns(),
3609 None,
3610 );
3611
3612 assert!(client.request_instruments(&request).is_ok());
3614 }
3615
3616 #[tokio::test]
3617 async fn test_request_instruments_venue_assignment() {
3618 setup_test_env();
3620
3621 let client_id = ClientId::from("DYDX-VENUE-TEST");
3622 let config = DydxDataClientConfig::default();
3623 let http_client = DydxHttpClient::default();
3624 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3625
3626 assert_eq!(client.venue(), *DYDX_VENUE);
3627
3628 let request = RequestInstruments::new(
3629 None,
3630 None,
3631 Some(client_id),
3632 Some(*DYDX_VENUE),
3633 UUID4::new(),
3634 get_atomic_clock_realtime().get_time_ns(),
3635 None,
3636 );
3637
3638 assert!(client.request_instruments(&request).is_ok());
3639 }
3640
3641 #[tokio::test]
3642 async fn test_request_instruments_timestamp_handling() {
3643 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3645 set_data_event_sender(sender);
3646
3647 let client_id = ClientId::from("DYDX-TS-TEST");
3648 let config = DydxDataClientConfig::default();
3649 let http_client = DydxHttpClient::default();
3650 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3651
3652 let now = chrono::Utc::now();
3653 let start = Some(now - chrono::Duration::hours(24));
3654 let end = Some(now);
3655
3656 let request = RequestInstruments::new(
3657 start,
3658 end,
3659 Some(client_id),
3660 Some(*DYDX_VENUE),
3661 UUID4::new(),
3662 get_atomic_clock_realtime().get_time_ns(),
3663 None,
3664 );
3665
3666 assert!(client.request_instruments(&request).is_ok());
3667
3668 let timeout = tokio::time::Duration::from_secs(3);
3670 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3671 tokio::time::timeout(timeout, rx.recv()).await
3672 {
3673 assert!(resp.start.unwrap() > 0);
3675 assert!(resp.end.unwrap() > 0);
3676 assert!(resp.start.unwrap() <= resp.end.unwrap());
3677 assert!(resp.ts_init > 0);
3678 }
3679 }
3680
3681 #[tokio::test]
3682 async fn test_request_instruments_with_start_only() {
3683 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3685 set_data_event_sender(sender);
3686
3687 let client_id = ClientId::from("DYDX-TS-START-ONLY");
3688 let config = DydxDataClientConfig::default();
3689 let http_client = DydxHttpClient::default();
3690 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3691
3692 let now = chrono::Utc::now();
3693 let start = Some(now - chrono::Duration::hours(24));
3694
3695 let request = RequestInstruments::new(
3696 start,
3697 None,
3698 Some(client_id),
3699 Some(*DYDX_VENUE),
3700 UUID4::new(),
3701 get_atomic_clock_realtime().get_time_ns(),
3702 None,
3703 );
3704
3705 assert!(client.request_instruments(&request).is_ok());
3706
3707 let timeout = tokio::time::Duration::from_secs(3);
3708 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3709 tokio::time::timeout(timeout, rx.recv()).await
3710 {
3711 assert!(resp.start.is_some());
3712 assert!(resp.end.is_none());
3713 assert!(resp.ts_init > 0);
3714 }
3715 }
3716
3717 #[tokio::test]
3718 async fn test_request_instruments_with_end_only() {
3719 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3721 set_data_event_sender(sender);
3722
3723 let client_id = ClientId::from("DYDX-TS-END-ONLY");
3724 let config = DydxDataClientConfig::default();
3725 let http_client = DydxHttpClient::default();
3726 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3727
3728 let now = chrono::Utc::now();
3729 let end = Some(now);
3730
3731 let request = RequestInstruments::new(
3732 None,
3733 end,
3734 Some(client_id),
3735 Some(*DYDX_VENUE),
3736 UUID4::new(),
3737 get_atomic_clock_realtime().get_time_ns(),
3738 None,
3739 );
3740
3741 assert!(client.request_instruments(&request).is_ok());
3742
3743 let timeout = tokio::time::Duration::from_secs(3);
3744 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3745 tokio::time::timeout(timeout, rx.recv()).await
3746 {
3747 assert!(resp.start.is_none());
3748 assert!(resp.end.is_some());
3749 assert!(resp.ts_init > 0);
3750 }
3751 }
3752
3753 #[tokio::test]
3754 async fn test_request_instruments_client_id_fallback() {
3755 setup_test_env();
3757
3758 let client_id = ClientId::from("DYDX-FALLBACK-TEST");
3759 let config = DydxDataClientConfig::default();
3760 let http_client = DydxHttpClient::default();
3761 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3762
3763 let request = RequestInstruments::new(
3764 None,
3765 None,
3766 None, Some(*DYDX_VENUE),
3768 UUID4::new(),
3769 get_atomic_clock_realtime().get_time_ns(),
3770 None,
3771 );
3772
3773 assert!(client.request_instruments(&request).is_ok());
3775 }
3776
3777 #[tokio::test]
3778 async fn test_request_instruments_with_params() {
3779 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3781 set_data_event_sender(sender);
3782
3783 let client_id = ClientId::from("DYDX-PARAMS-TEST");
3784 let config = DydxDataClientConfig::default();
3785 let http_client = DydxHttpClient::default();
3786 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3787
3788 let mut params_map = IndexMap::new();
3790 params_map.insert("test_key".to_string(), "test_value".to_string());
3791
3792 let request = RequestInstruments::new(
3793 None,
3794 None,
3795 Some(client_id),
3796 Some(*DYDX_VENUE),
3797 UUID4::new(),
3798 get_atomic_clock_realtime().get_time_ns(),
3799 Some(params_map),
3800 );
3801
3802 assert!(client.request_instruments(&request).is_ok());
3803
3804 let timeout = tokio::time::Duration::from_secs(3);
3806 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3807 tokio::time::timeout(timeout, rx.recv()).await
3808 {
3809 assert_eq!(resp.client_id, client_id);
3811 let params = resp
3812 .params
3813 .expect("expected params to be present in InstrumentsResponse");
3814 assert_eq!(
3815 params.get("test_key").map(String::as_str),
3816 Some("test_value")
3817 );
3818 }
3819 }
3820
3821 #[tokio::test]
3826 async fn test_request_instruments_with_start_and_end_range() {
3827 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3829 set_data_event_sender(sender);
3830
3831 let client_id = ClientId::from("DYDX-START-END-RANGE");
3832 let config = DydxDataClientConfig::default();
3833 let http_client = DydxHttpClient::default();
3834 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3835
3836 let now = chrono::Utc::now();
3837 let start = Some(now - chrono::Duration::hours(48));
3838 let end = Some(now - chrono::Duration::hours(24));
3839
3840 let request = RequestInstruments::new(
3841 start,
3842 end,
3843 Some(client_id),
3844 Some(*DYDX_VENUE),
3845 UUID4::new(),
3846 get_atomic_clock_realtime().get_time_ns(),
3847 None,
3848 );
3849
3850 assert!(client.request_instruments(&request).is_ok());
3851
3852 let timeout = tokio::time::Duration::from_secs(3);
3853 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3854 tokio::time::timeout(timeout, rx.recv()).await
3855 {
3856 assert!(
3858 resp.start.is_some(),
3859 "start timestamp should be present when provided"
3860 );
3861 assert!(
3862 resp.end.is_some(),
3863 "end timestamp should be present when provided"
3864 );
3865 assert!(resp.ts_init > 0, "ts_init should always be set");
3866
3867 if let (Some(start_ts), Some(end_ts)) = (resp.start, resp.end) {
3869 assert!(
3870 start_ts < end_ts,
3871 "start timestamp should be before end timestamp"
3872 );
3873 }
3874 }
3875 }
3876
3877 #[tokio::test]
3878 async fn test_request_instruments_different_client_ids() {
3879 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3881 set_data_event_sender(sender);
3882
3883 let timeout = tokio::time::Duration::from_secs(3);
3884
3885 let client_id_1 = ClientId::from("DYDX-CLIENT-1");
3887 let config1 = DydxDataClientConfig::default();
3888 let http_client1 = DydxHttpClient::default();
3889 let client1 = DydxDataClient::new(client_id_1, config1, http_client1, None).unwrap();
3890
3891 let request1 = RequestInstruments::new(
3892 None,
3893 None,
3894 Some(client_id_1),
3895 Some(*DYDX_VENUE),
3896 UUID4::new(),
3897 get_atomic_clock_realtime().get_time_ns(),
3898 None,
3899 );
3900
3901 assert!(client1.request_instruments(&request1).is_ok());
3902
3903 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp1)))) =
3904 tokio::time::timeout(timeout, rx.recv()).await
3905 {
3906 assert_eq!(
3907 resp1.client_id, client_id_1,
3908 "Response should contain client_id_1"
3909 );
3910 }
3911
3912 let client_id_2 = ClientId::from("DYDX-CLIENT-2");
3914 let config2 = DydxDataClientConfig::default();
3915 let http_client2 = DydxHttpClient::default();
3916 let client2 = DydxDataClient::new(client_id_2, config2, http_client2, None).unwrap();
3917
3918 let request2 = RequestInstruments::new(
3919 None,
3920 None,
3921 Some(client_id_2),
3922 Some(*DYDX_VENUE),
3923 UUID4::new(),
3924 get_atomic_clock_realtime().get_time_ns(),
3925 None,
3926 );
3927
3928 assert!(client2.request_instruments(&request2).is_ok());
3929
3930 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp2)))) =
3931 tokio::time::timeout(timeout, rx.recv()).await
3932 {
3933 assert_eq!(
3934 resp2.client_id, client_id_2,
3935 "Response should contain client_id_2"
3936 );
3937 assert_ne!(
3938 resp2.client_id, client_id_1,
3939 "Different clients should have different client_ids"
3940 );
3941 }
3942 }
3943
3944 #[tokio::test]
3945 async fn test_request_instruments_no_timestamps() {
3946 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3948 set_data_event_sender(sender);
3949
3950 let client_id = ClientId::from("DYDX-NO-TIMESTAMPS");
3951 let config = DydxDataClientConfig::default();
3952 let http_client = DydxHttpClient::default();
3953 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3954
3955 let request = RequestInstruments::new(
3956 None, None, Some(client_id),
3959 Some(*DYDX_VENUE),
3960 UUID4::new(),
3961 get_atomic_clock_realtime().get_time_ns(),
3962 None,
3963 );
3964
3965 assert!(client.request_instruments(&request).is_ok());
3966
3967 let timeout = tokio::time::Duration::from_secs(5);
3968 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3969 tokio::time::timeout(timeout, rx.recv()).await
3970 {
3971 assert!(
3973 resp.start.is_none(),
3974 "start should be None when not provided"
3975 );
3976 assert!(resp.end.is_none(), "end should be None when not provided");
3977
3978 assert_eq!(resp.venue, *DYDX_VENUE);
3980 assert_eq!(resp.client_id, client_id);
3981 assert!(resp.ts_init > 0);
3982 }
3983 }
3984
3985 #[tokio::test]
3990 async fn test_request_instrument_cache_hit() {
3991 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3993 set_data_event_sender(sender);
3994
3995 let client_id = ClientId::from("DYDX-CACHE-HIT");
3996 let config = DydxDataClientConfig::default();
3997 let http_client = DydxHttpClient::default();
3998 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3999
4000 let instrument = create_test_instrument_any();
4002 let instrument_id = instrument.id();
4003 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4004 client.instruments.insert(symbol_key, instrument.clone());
4005
4006 let request = RequestInstrument::new(
4007 instrument_id,
4008 None,
4009 None,
4010 Some(client_id),
4011 UUID4::new(),
4012 get_atomic_clock_realtime().get_time_ns(),
4013 None,
4014 );
4015
4016 assert!(client.request_instrument(&request).is_ok());
4017
4018 let timeout = tokio::time::Duration::from_millis(500);
4020 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4021 tokio::time::timeout(timeout, rx.recv()).await
4022 {
4023 assert_eq!(resp.instrument_id, instrument_id);
4024 assert_eq!(resp.client_id, client_id);
4025 assert_eq!(resp.data.id(), instrument_id);
4026 }
4027 }
4028
4029 #[tokio::test]
4030 async fn test_request_instrument_cache_miss() {
4031 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4033 set_data_event_sender(sender);
4034
4035 let client_id = ClientId::from("DYDX-CACHE-MISS");
4036 let config = DydxDataClientConfig::default();
4037 let http_client = DydxHttpClient::default();
4038 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4039
4040 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
4041
4042 let request = RequestInstrument::new(
4043 instrument_id,
4044 None,
4045 None,
4046 Some(client_id),
4047 UUID4::new(),
4048 get_atomic_clock_realtime().get_time_ns(),
4049 None,
4050 );
4051
4052 assert!(client.request_instrument(&request).is_ok());
4053
4054 let timeout = tokio::time::Duration::from_secs(5);
4056 let result = tokio::time::timeout(timeout, rx.recv()).await;
4057
4058 match result {
4060 Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) => {
4061 assert_eq!(resp.instrument_id, instrument_id);
4062 assert_eq!(resp.client_id, client_id);
4063 }
4064 Ok(Some(_)) => panic!("Expected InstrumentResponse"),
4065 Ok(None) => panic!("Channel closed unexpectedly"),
4066 Err(_) => {
4067 println!("Test timed out - testnet may be unreachable");
4068 }
4069 }
4070 }
4071
4072 #[tokio::test]
4073 async fn test_request_instrument_not_found() {
4074 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4076 set_data_event_sender(sender);
4077
4078 let client_id = ClientId::from("DYDX-NOT-FOUND");
4079 let config = DydxDataClientConfig {
4080 base_url_http: Some("http://invalid-url.local".to_string()),
4081 ..Default::default()
4082 };
4083 let http_client = DydxHttpClient::new(
4084 config.base_url_http.clone(),
4085 config.http_timeout_secs,
4086 config.http_proxy_url.clone(),
4087 config.is_testnet,
4088 None,
4089 )
4090 .unwrap();
4091
4092 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4093
4094 let instrument_id = InstrumentId::from("INVALID-SYMBOL.DYDX");
4095
4096 let request = RequestInstrument::new(
4097 instrument_id,
4098 None,
4099 None,
4100 Some(client_id),
4101 UUID4::new(),
4102 get_atomic_clock_realtime().get_time_ns(),
4103 None,
4104 );
4105
4106 assert!(client.request_instrument(&request).is_ok());
4108
4109 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
4111 }
4112
4113 #[tokio::test]
4114 async fn test_request_instrument_bulk_caching() {
4115 setup_test_env();
4117
4118 let client_id = ClientId::from("DYDX-BULK-CACHE");
4119 let config = DydxDataClientConfig::default();
4120 let http_client = DydxHttpClient::default();
4121 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4122
4123 let initial_cache_size = client.instruments.len();
4124
4125 let instrument_id = InstrumentId::from("ETH-USD-PERP.DYDX");
4126
4127 let request = RequestInstrument::new(
4128 instrument_id,
4129 None,
4130 None,
4131 Some(client_id),
4132 UUID4::new(),
4133 get_atomic_clock_realtime().get_time_ns(),
4134 None,
4135 );
4136
4137 assert!(client.request_instrument(&request).is_ok());
4138
4139 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
4141
4142 let final_cache_size = client.instruments.len();
4144 assert!(final_cache_size >= initial_cache_size);
4145 }
4147
4148 #[tokio::test]
4149 async fn test_request_instrument_correlation_id() {
4150 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4152 set_data_event_sender(sender);
4153
4154 let client_id = ClientId::from("DYDX-CORR-ID");
4155 let config = DydxDataClientConfig::default();
4156 let http_client = DydxHttpClient::default();
4157 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4158
4159 let instrument = create_test_instrument_any();
4161 let instrument_id = instrument.id();
4162 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4163 client.instruments.insert(symbol_key, instrument.clone());
4164
4165 let request_id = UUID4::new();
4166 let request = RequestInstrument::new(
4167 instrument_id,
4168 None,
4169 None,
4170 Some(client_id),
4171 request_id,
4172 get_atomic_clock_realtime().get_time_ns(),
4173 None,
4174 );
4175
4176 assert!(client.request_instrument(&request).is_ok());
4177
4178 let timeout = tokio::time::Duration::from_millis(500);
4180 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4181 tokio::time::timeout(timeout, rx.recv()).await
4182 {
4183 assert_eq!(resp.correlation_id, request_id);
4184 }
4185 }
4186
4187 #[tokio::test]
4188 async fn test_request_instrument_response_format_boxed() {
4189 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4191 set_data_event_sender(sender);
4192
4193 let client_id = ClientId::from("DYDX-BOXED");
4194 let config = DydxDataClientConfig::default();
4195 let http_client = DydxHttpClient::default();
4196 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4197
4198 let instrument = create_test_instrument_any();
4200 let instrument_id = instrument.id();
4201 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4202 client.instruments.insert(symbol_key, instrument.clone());
4203
4204 let request = RequestInstrument::new(
4205 instrument_id,
4206 None,
4207 None,
4208 Some(client_id),
4209 UUID4::new(),
4210 get_atomic_clock_realtime().get_time_ns(),
4211 None,
4212 );
4213
4214 assert!(client.request_instrument(&request).is_ok());
4215
4216 let timeout = tokio::time::Duration::from_millis(500);
4218 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(boxed_resp)))) =
4219 tokio::time::timeout(timeout, rx.recv()).await
4220 {
4221 assert_eq!(boxed_resp.instrument_id, instrument_id);
4223 assert_eq!(boxed_resp.client_id, client_id);
4224 assert!(boxed_resp.start.is_none());
4225 assert!(boxed_resp.end.is_none());
4226 assert!(boxed_resp.ts_init > 0);
4227 }
4228 }
4229
4230 #[rstest]
4231 fn test_request_instrument_symbol_extraction() {
4232 setup_test_env();
4234
4235 let client_id = ClientId::from("DYDX-SYMBOL");
4236 let config = DydxDataClientConfig::default();
4237 let http_client = DydxHttpClient::default();
4238 let _client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4239
4240 let test_cases = vec![
4243 ("BTC-USD-PERP.DYDX", "BTC-USD-PERP"),
4244 ("ETH-USD-PERP.DYDX", "ETH-USD-PERP"),
4245 ("SOL-USD-PERP.DYDX", "SOL-USD-PERP"),
4246 ];
4247
4248 for (instrument_id_str, expected_symbol) in test_cases {
4249 let instrument_id = InstrumentId::from(instrument_id_str);
4250 let symbol = Ustr::from(instrument_id.symbol.as_str());
4251 assert_eq!(symbol.as_str(), expected_symbol);
4252 }
4253 }
4254
4255 #[tokio::test]
4256 async fn test_request_instrument_client_id_fallback() {
4257 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4259 set_data_event_sender(sender);
4260
4261 let client_id = ClientId::from("DYDX-FALLBACK");
4262 let config = DydxDataClientConfig::default();
4263 let http_client = DydxHttpClient::default();
4264 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4265
4266 let instrument = create_test_instrument_any();
4268 let instrument_id = instrument.id();
4269 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4270 client.instruments.insert(symbol_key, instrument.clone());
4271
4272 let request = RequestInstrument::new(
4273 instrument_id,
4274 None,
4275 None,
4276 None, UUID4::new(),
4278 get_atomic_clock_realtime().get_time_ns(),
4279 None,
4280 );
4281
4282 assert!(client.request_instrument(&request).is_ok());
4283
4284 let timeout = tokio::time::Duration::from_millis(500);
4286 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4287 tokio::time::timeout(timeout, rx.recv()).await
4288 {
4289 assert_eq!(resp.client_id, client_id);
4290 }
4291 }
4292
4293 #[tokio::test]
4298 async fn test_request_trades_success_with_limit_and_symbol_conversion() {
4299 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4300 set_data_event_sender(sender);
4301
4302 let created_at = chrono::Utc::now();
4303
4304 let http_trade = crate::http::models::Trade {
4305 id: "trade-1".to_string(),
4306 side: OrderSide::Buy,
4307 size: dec!(1.5),
4308 price: dec!(100.25),
4309 created_at,
4310 created_at_height: 1,
4311 trade_type: crate::common::enums::DydxTradeType::Limit,
4312 };
4313
4314 let trades_response = crate::http::models::TradesResponse {
4315 trades: vec![http_trade],
4316 };
4317
4318 let state = TradesTestState {
4319 response: Arc::new(trades_response),
4320 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4321 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4322 };
4323
4324 let addr = start_trades_test_server(state.clone()).await;
4325 let base_url = format!("http://{}", addr);
4326
4327 let client_id = ClientId::from("DYDX-TRADES-SUCCESS");
4328 let config = DydxDataClientConfig {
4329 base_url_http: Some(base_url),
4330 is_testnet: true,
4331 ..Default::default()
4332 };
4333
4334 let http_client = DydxHttpClient::new(
4335 config.base_url_http.clone(),
4336 config.http_timeout_secs,
4337 config.http_proxy_url.clone(),
4338 config.is_testnet,
4339 None,
4340 )
4341 .unwrap();
4342
4343 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4344
4345 let instrument = create_test_instrument_any();
4346 let instrument_id = instrument.id();
4347 let price_precision = instrument.price_precision();
4348 let size_precision = instrument.size_precision();
4349 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4350 client.instruments.insert(symbol_key, instrument);
4351
4352 let request_id = UUID4::new();
4353 let now = chrono::Utc::now();
4354 let start = Some(now - chrono::Duration::seconds(10));
4355 let end = Some(now + chrono::Duration::seconds(10));
4356 let limit = std::num::NonZeroUsize::new(100).unwrap();
4357
4358 let request = RequestTrades::new(
4359 instrument_id,
4360 start,
4361 end,
4362 Some(limit),
4363 Some(client_id),
4364 request_id,
4365 get_atomic_clock_realtime().get_time_ns(),
4366 None,
4367 );
4368
4369 assert!(client.request_trades(&request).is_ok());
4370
4371 let timeout = tokio::time::Duration::from_secs(1);
4372 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4373 tokio::time::timeout(timeout, rx.recv()).await
4374 {
4375 assert_eq!(resp.correlation_id, request_id);
4376 assert_eq!(resp.client_id, client_id);
4377 assert_eq!(resp.instrument_id, instrument_id);
4378 assert_eq!(resp.data.len(), 1);
4379
4380 let tick = &resp.data[0];
4381 assert_eq!(tick.instrument_id, instrument_id);
4382 assert_eq!(tick.price, Price::new(100.25, price_precision));
4383 assert_eq!(tick.size, Quantity::new(1.5, size_precision));
4384 assert_eq!(tick.trade_id.to_string(), "trade-1");
4385
4386 use nautilus_model::enums::AggressorSide;
4387 assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
4388 } else {
4389 panic!("did not receive trades response in time");
4390 }
4391
4392 let last_ticker = state.last_ticker.lock().await.clone();
4394 assert_eq!(last_ticker.as_deref(), Some("BTC-USD"));
4395
4396 let last_limit = *state.last_limit.lock().await;
4397 assert_eq!(last_limit, Some(Some(100)));
4398 }
4399
4400 #[tokio::test]
4401 async fn test_request_trades_empty_response_and_no_limit() {
4402 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4403 set_data_event_sender(sender);
4404
4405 let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4406
4407 let state = TradesTestState {
4408 response: Arc::new(trades_response),
4409 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4410 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4411 };
4412
4413 let addr = start_trades_test_server(state.clone()).await;
4414 let base_url = format!("http://{}", addr);
4415
4416 let client_id = ClientId::from("DYDX-TRADES-EMPTY");
4417 let config = DydxDataClientConfig {
4418 base_url_http: Some(base_url),
4419 is_testnet: true,
4420 ..Default::default()
4421 };
4422
4423 let http_client = DydxHttpClient::new(
4424 config.base_url_http.clone(),
4425 config.http_timeout_secs,
4426 config.http_proxy_url.clone(),
4427 config.is_testnet,
4428 None,
4429 )
4430 .unwrap();
4431
4432 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4433
4434 let instrument = create_test_instrument_any();
4435 let instrument_id = instrument.id();
4436 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4437 client.instruments.insert(symbol_key, instrument);
4438
4439 let request_id = UUID4::new();
4440
4441 let request = RequestTrades::new(
4442 instrument_id,
4443 None,
4444 None,
4445 None, Some(client_id),
4447 request_id,
4448 get_atomic_clock_realtime().get_time_ns(),
4449 None,
4450 );
4451
4452 assert!(client.request_trades(&request).is_ok());
4453
4454 let timeout = tokio::time::Duration::from_secs(1);
4455 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4456 tokio::time::timeout(timeout, rx.recv()).await
4457 {
4458 assert_eq!(resp.correlation_id, request_id);
4459 assert_eq!(resp.client_id, client_id);
4460 assert_eq!(resp.instrument_id, instrument_id);
4461 assert!(resp.data.is_empty());
4462 } else {
4463 panic!("did not receive trades response in time");
4464 }
4465
4466 let last_limit = *state.last_limit.lock().await;
4468 assert_eq!(last_limit, Some(None));
4469 }
4470
4471 #[tokio::test]
4472 async fn test_request_trades_timestamp_filtering() {
4473 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4474 set_data_event_sender(sender);
4475
4476 let now = chrono::Utc::now();
4477 let trade_before = crate::http::models::Trade {
4478 id: "before".to_string(),
4479 side: OrderSide::Buy,
4480 size: dec!(1.0),
4481 price: dec!(100.0),
4482 created_at: now - chrono::Duration::seconds(60),
4483 created_at_height: 1,
4484 trade_type: crate::common::enums::DydxTradeType::Limit,
4485 };
4486 let trade_inside = crate::http::models::Trade {
4487 id: "inside".to_string(),
4488 side: OrderSide::Sell,
4489 size: dec!(2.0),
4490 price: dec!(101.0),
4491 created_at: now,
4492 created_at_height: 2,
4493 trade_type: crate::common::enums::DydxTradeType::Limit,
4494 };
4495 let trade_after = crate::http::models::Trade {
4496 id: "after".to_string(),
4497 side: OrderSide::Buy,
4498 size: dec!(3.0),
4499 price: dec!(102.0),
4500 created_at: now + chrono::Duration::seconds(60),
4501 created_at_height: 3,
4502 trade_type: crate::common::enums::DydxTradeType::Limit,
4503 };
4504
4505 let trades_response = crate::http::models::TradesResponse {
4506 trades: vec![trade_before, trade_inside.clone(), trade_after],
4507 };
4508
4509 let state = TradesTestState {
4510 response: Arc::new(trades_response),
4511 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4512 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4513 };
4514
4515 let addr = start_trades_test_server(state).await;
4516 let base_url = format!("http://{}", addr);
4517
4518 let client_id = ClientId::from("DYDX-TRADES-FILTER");
4519 let config = DydxDataClientConfig {
4520 base_url_http: Some(base_url),
4521 is_testnet: true,
4522 ..Default::default()
4523 };
4524
4525 let http_client = DydxHttpClient::new(
4526 config.base_url_http.clone(),
4527 config.http_timeout_secs,
4528 config.http_proxy_url.clone(),
4529 config.is_testnet,
4530 None,
4531 )
4532 .unwrap();
4533
4534 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4535
4536 let instrument = create_test_instrument_any();
4537 let instrument_id = instrument.id();
4538 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4539 client.instruments.insert(symbol_key, instrument);
4540
4541 let request_id = UUID4::new();
4542
4543 let start = Some(now - chrono::Duration::seconds(10));
4545 let end = Some(now + chrono::Duration::seconds(10));
4546
4547 let request = RequestTrades::new(
4548 instrument_id,
4549 start,
4550 end,
4551 None,
4552 Some(client_id),
4553 request_id,
4554 get_atomic_clock_realtime().get_time_ns(),
4555 None,
4556 );
4557
4558 assert!(client.request_trades(&request).is_ok());
4559
4560 let timeout = tokio::time::Duration::from_secs(1);
4561 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4562 tokio::time::timeout(timeout, rx.recv()).await
4563 {
4564 assert_eq!(resp.correlation_id, request_id);
4565 assert_eq!(resp.client_id, client_id);
4566 assert_eq!(resp.instrument_id, instrument_id);
4567 assert_eq!(resp.data.len(), 1);
4568
4569 let tick = &resp.data[0];
4570 assert_eq!(tick.trade_id.to_string(), "inside");
4571 assert_eq!(tick.price.as_decimal(), dec!(101.0));
4572 } else {
4573 panic!("did not receive trades response in time");
4574 }
4575 }
4576
4577 #[tokio::test]
4578 async fn test_request_trades_correlation_id_matching() {
4579 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4581 set_data_event_sender(sender);
4582
4583 let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4584
4585 let state = TradesTestState {
4586 response: Arc::new(trades_response),
4587 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4588 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4589 };
4590
4591 let addr = start_trades_test_server(state).await;
4592 let base_url = format!("http://{}", addr);
4593
4594 let client_id = ClientId::from("DYDX-TRADES-CORR");
4595 let config = DydxDataClientConfig {
4596 base_url_http: Some(base_url),
4597 is_testnet: true,
4598 ..Default::default()
4599 };
4600
4601 let http_client = DydxHttpClient::new(
4602 config.base_url_http.clone(),
4603 config.http_timeout_secs,
4604 config.http_proxy_url.clone(),
4605 config.is_testnet,
4606 None,
4607 )
4608 .unwrap();
4609
4610 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4611
4612 let instrument = create_test_instrument_any();
4613 let instrument_id = instrument.id();
4614 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4615 client.instruments.insert(symbol_key, instrument);
4616
4617 let request_id = UUID4::new();
4618 let request = RequestTrades::new(
4619 instrument_id,
4620 None,
4621 None,
4622 None,
4623 Some(client_id),
4624 request_id,
4625 get_atomic_clock_realtime().get_time_ns(),
4626 None,
4627 );
4628
4629 assert!(client.request_trades(&request).is_ok());
4630
4631 let timeout = tokio::time::Duration::from_millis(500);
4632 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4633 tokio::time::timeout(timeout, rx.recv()).await
4634 {
4635 assert_eq!(resp.correlation_id, request_id);
4636 }
4637 }
4638
4639 #[tokio::test]
4640 async fn test_request_trades_response_format() {
4641 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4643 set_data_event_sender(sender);
4644
4645 let created_at = chrono::Utc::now();
4646 let http_trade = crate::http::models::Trade {
4647 id: "format-test".to_string(),
4648 side: OrderSide::Sell,
4649 size: dec!(5.0),
4650 price: dec!(200.0),
4651 created_at,
4652 created_at_height: 100,
4653 trade_type: crate::common::enums::DydxTradeType::Limit,
4654 };
4655
4656 let trades_response = crate::http::models::TradesResponse {
4657 trades: vec![http_trade],
4658 };
4659
4660 let state = TradesTestState {
4661 response: Arc::new(trades_response),
4662 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4663 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4664 };
4665
4666 let addr = start_trades_test_server(state).await;
4667 let base_url = format!("http://{}", addr);
4668
4669 let client_id = ClientId::from("DYDX-TRADES-FORMAT");
4670 let config = DydxDataClientConfig {
4671 base_url_http: Some(base_url),
4672 is_testnet: true,
4673 ..Default::default()
4674 };
4675
4676 let http_client = DydxHttpClient::new(
4677 config.base_url_http.clone(),
4678 config.http_timeout_secs,
4679 config.http_proxy_url.clone(),
4680 config.is_testnet,
4681 None,
4682 )
4683 .unwrap();
4684
4685 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4686
4687 let instrument = create_test_instrument_any();
4688 let instrument_id = instrument.id();
4689 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4690 client.instruments.insert(symbol_key, instrument);
4691
4692 let request = RequestTrades::new(
4693 instrument_id,
4694 None,
4695 None,
4696 None,
4697 Some(client_id),
4698 UUID4::new(),
4699 get_atomic_clock_realtime().get_time_ns(),
4700 None,
4701 );
4702
4703 assert!(client.request_trades(&request).is_ok());
4704
4705 let timeout = tokio::time::Duration::from_millis(500);
4706 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4707 tokio::time::timeout(timeout, rx.recv()).await
4708 {
4709 assert_eq!(resp.client_id, client_id);
4711 assert_eq!(resp.instrument_id, instrument_id);
4712 assert!(resp.data.len() == 1);
4713 assert!(resp.ts_init > 0);
4714
4715 let tick = &resp.data[0];
4717 assert_eq!(tick.instrument_id, instrument_id);
4718 assert!(tick.ts_event > 0);
4719 assert!(tick.ts_init > 0);
4720 }
4721 }
4722
4723 #[tokio::test]
4724 async fn test_request_trades_no_instrument_in_cache() {
4725 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4727 set_data_event_sender(sender);
4728
4729 let client_id = ClientId::from("DYDX-TRADES-NO-INST");
4730 let config = DydxDataClientConfig::default();
4731 let http_client = DydxHttpClient::default();
4732 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4733
4734 let instrument_id = InstrumentId::from("UNKNOWN-SYMBOL.DYDX");
4736
4737 let request = RequestTrades::new(
4738 instrument_id,
4739 None,
4740 None,
4741 None,
4742 Some(client_id),
4743 UUID4::new(),
4744 get_atomic_clock_realtime().get_time_ns(),
4745 None,
4746 );
4747
4748 assert!(client.request_trades(&request).is_ok());
4749
4750 let timeout = tokio::time::Duration::from_millis(500);
4752 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4753 tokio::time::timeout(timeout, rx.recv()).await
4754 {
4755 assert!(resp.data.is_empty());
4756 }
4757 }
4758
4759 #[tokio::test]
4760 async fn test_request_trades_limit_parameter() {
4761 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4763 set_data_event_sender(sender);
4764
4765 let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4766
4767 let state = TradesTestState {
4768 response: Arc::new(trades_response),
4769 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4770 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4771 };
4772
4773 let addr = start_trades_test_server(state.clone()).await;
4774 let base_url = format!("http://{}", addr);
4775
4776 let client_id = ClientId::from("DYDX-TRADES-LIMIT");
4777 let config = DydxDataClientConfig {
4778 base_url_http: Some(base_url),
4779 is_testnet: true,
4780 ..Default::default()
4781 };
4782
4783 let http_client = DydxHttpClient::new(
4784 config.base_url_http.clone(),
4785 config.http_timeout_secs,
4786 config.http_proxy_url.clone(),
4787 config.is_testnet,
4788 None,
4789 )
4790 .unwrap();
4791
4792 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4793
4794 let instrument = create_test_instrument_any();
4795 let instrument_id = instrument.id();
4796 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4797 client.instruments.insert(symbol_key, instrument);
4798
4799 let limit = std::num::NonZeroUsize::new(500).unwrap();
4801 let request = RequestTrades::new(
4802 instrument_id,
4803 None,
4804 None,
4805 Some(limit),
4806 Some(client_id),
4807 UUID4::new(),
4808 get_atomic_clock_realtime().get_time_ns(),
4809 None,
4810 );
4811
4812 assert!(client.request_trades(&request).is_ok());
4813 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
4814
4815 let last_limit = *state.last_limit.lock().await;
4817 assert_eq!(last_limit, Some(Some(500)));
4818 }
4819
4820 #[rstest]
4821 fn test_request_trades_symbol_conversion() {
4822 setup_test_env();
4824
4825 let client_id = ClientId::from("DYDX-SYMBOL-CONV");
4826 let config = DydxDataClientConfig::default();
4827 let http_client = DydxHttpClient::default();
4828 let _client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4829
4830 let test_cases = vec![
4832 ("BTC-USD-PERP.DYDX", "BTC-USD"),
4833 ("ETH-USD-PERP.DYDX", "ETH-USD"),
4834 ("SOL-USD-PERP.DYDX", "SOL-USD"),
4835 ];
4836
4837 for (instrument_id_str, expected_ticker) in test_cases {
4838 let instrument_id = InstrumentId::from(instrument_id_str);
4839 let ticker = instrument_id
4840 .symbol
4841 .as_str()
4842 .trim_end_matches("-PERP")
4843 .to_string();
4844 assert_eq!(ticker, expected_ticker);
4845 }
4846 }
4847
4848 #[tokio::test]
4853 async fn test_http_404_handling() {
4854 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4856 set_data_event_sender(sender);
4857
4858 let client_id = ClientId::from("DYDX-404");
4859 let config = DydxDataClientConfig {
4860 base_url_http: Some("http://localhost:1/nonexistent".to_string()),
4861 http_timeout_secs: Some(1),
4862 ..Default::default()
4863 };
4864
4865 let http_client = DydxHttpClient::new(
4866 config.base_url_http.clone(),
4867 config.http_timeout_secs,
4868 config.http_proxy_url.clone(),
4869 config.is_testnet,
4870 None,
4871 )
4872 .unwrap();
4873
4874 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4875
4876 let request = RequestInstruments::new(
4877 None,
4878 None,
4879 Some(client_id),
4880 Some(*DYDX_VENUE),
4881 UUID4::new(),
4882 get_atomic_clock_realtime().get_time_ns(),
4883 None,
4884 );
4885
4886 assert!(client.request_instruments(&request).is_ok());
4887
4888 let timeout = tokio::time::Duration::from_secs(2);
4890 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
4891 tokio::time::timeout(timeout, rx.recv()).await
4892 {
4893 assert!(resp.data.is_empty(), "Expected empty response on 404");
4894 }
4895 }
4896
4897 #[tokio::test]
4898 async fn test_http_500_handling() {
4899 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4901 set_data_event_sender(sender);
4902
4903 let client_id = ClientId::from("DYDX-500");
4904 let config = DydxDataClientConfig {
4905 base_url_http: Some("http://httpstat.us/500".to_string()),
4906 http_timeout_secs: Some(2),
4907 ..Default::default()
4908 };
4909
4910 let http_client = DydxHttpClient::new(
4911 config.base_url_http.clone(),
4912 config.http_timeout_secs,
4913 config.http_proxy_url.clone(),
4914 config.is_testnet,
4915 None,
4916 )
4917 .unwrap();
4918
4919 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4920
4921 let instrument = create_test_instrument_any();
4922 let instrument_id = instrument.id();
4923 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4924 client.instruments.insert(symbol_key, instrument);
4925
4926 let request = RequestTrades::new(
4927 instrument_id,
4928 None,
4929 None,
4930 None,
4931 Some(client_id),
4932 UUID4::new(),
4933 get_atomic_clock_realtime().get_time_ns(),
4934 None,
4935 );
4936
4937 assert!(client.request_trades(&request).is_ok());
4938
4939 let timeout = tokio::time::Duration::from_secs(3);
4941 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4942 tokio::time::timeout(timeout, rx.recv()).await
4943 {
4944 assert!(resp.data.is_empty(), "Expected empty response on 500");
4945 }
4946 }
4947
4948 #[tokio::test]
4949 async fn test_network_timeout_handling() {
4950 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4952 set_data_event_sender(sender);
4953
4954 let client_id = ClientId::from("DYDX-TIMEOUT");
4955 let config = DydxDataClientConfig {
4956 base_url_http: Some("http://10.255.255.1:81".to_string()), http_timeout_secs: Some(1), ..Default::default()
4959 };
4960
4961 let http_client = DydxHttpClient::new(
4962 config.base_url_http.clone(),
4963 config.http_timeout_secs,
4964 config.http_proxy_url.clone(),
4965 config.is_testnet,
4966 None,
4967 )
4968 .unwrap();
4969
4970 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4971
4972 let request = RequestInstruments::new(
4973 None,
4974 None,
4975 Some(client_id),
4976 Some(*DYDX_VENUE),
4977 UUID4::new(),
4978 get_atomic_clock_realtime().get_time_ns(),
4979 None,
4980 );
4981
4982 assert!(client.request_instruments(&request).is_ok());
4983
4984 let timeout = tokio::time::Duration::from_secs(3);
4986 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
4987 tokio::time::timeout(timeout, rx.recv()).await
4988 {
4989 assert!(resp.data.is_empty(), "Expected empty response on timeout");
4990 }
4991 }
4992
4993 #[tokio::test]
4994 async fn test_connection_refused_handling() {
4995 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4997 set_data_event_sender(sender);
4998
4999 let client_id = ClientId::from("DYDX-REFUSED");
5000 let config = DydxDataClientConfig {
5001 base_url_http: Some("http://localhost:9999".to_string()), http_timeout_secs: Some(1),
5003 ..Default::default()
5004 };
5005
5006 let http_client = DydxHttpClient::new(
5007 config.base_url_http.clone(),
5008 config.http_timeout_secs,
5009 config.http_proxy_url.clone(),
5010 config.is_testnet,
5011 None,
5012 )
5013 .unwrap();
5014
5015 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5016
5017 let instrument = create_test_instrument_any();
5018 let instrument_id = instrument.id();
5019 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5020 client.instruments.insert(symbol_key, instrument);
5021
5022 let request = RequestInstrument::new(
5023 instrument_id,
5024 None,
5025 None,
5026 Some(client_id),
5027 UUID4::new(),
5028 get_atomic_clock_realtime().get_time_ns(),
5029 None,
5030 );
5031
5032 assert!(client.request_instrument(&request).is_ok());
5033
5034 let timeout = tokio::time::Duration::from_secs(2);
5036 let result = tokio::time::timeout(timeout, rx.recv()).await;
5037
5038 match result {
5041 Ok(Some(DataEvent::Response(_))) => {
5042 }
5044 Ok(None) | Err(_) => {
5045 }
5047 _ => {}
5048 }
5049 }
5050
5051 #[tokio::test]
5052 async fn test_dns_resolution_failure_handling() {
5053 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5055 set_data_event_sender(sender);
5056
5057 let client_id = ClientId::from("DYDX-DNS");
5058 let config = DydxDataClientConfig {
5059 base_url_http: Some(
5060 "http://this-domain-definitely-does-not-exist-12345.invalid".to_string(),
5061 ),
5062 http_timeout_secs: Some(2),
5063 ..Default::default()
5064 };
5065
5066 let http_client = DydxHttpClient::new(
5067 config.base_url_http.clone(),
5068 config.http_timeout_secs,
5069 config.http_proxy_url.clone(),
5070 config.is_testnet,
5071 None,
5072 )
5073 .unwrap();
5074
5075 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5076
5077 let request = RequestInstruments::new(
5078 None,
5079 None,
5080 Some(client_id),
5081 Some(*DYDX_VENUE),
5082 UUID4::new(),
5083 get_atomic_clock_realtime().get_time_ns(),
5084 None,
5085 );
5086
5087 assert!(client.request_instruments(&request).is_ok());
5088
5089 let timeout = tokio::time::Duration::from_secs(3);
5091 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5092 tokio::time::timeout(timeout, rx.recv()).await
5093 {
5094 assert!(
5095 resp.data.is_empty(),
5096 "Expected empty response on DNS failure"
5097 );
5098 }
5099 }
5100
5101 #[tokio::test]
5102 async fn test_http_502_503_handling() {
5103 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5105 set_data_event_sender(sender);
5106
5107 let client_id = ClientId::from("DYDX-503");
5108 let config = DydxDataClientConfig {
5109 base_url_http: Some("http://httpstat.us/503".to_string()),
5110 http_timeout_secs: Some(2),
5111 ..Default::default()
5112 };
5113
5114 let http_client = DydxHttpClient::new(
5115 config.base_url_http.clone(),
5116 config.http_timeout_secs,
5117 config.http_proxy_url.clone(),
5118 config.is_testnet,
5119 None,
5120 )
5121 .unwrap();
5122
5123 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5124
5125 let request = RequestInstruments::new(
5126 None,
5127 None,
5128 Some(client_id),
5129 Some(*DYDX_VENUE),
5130 UUID4::new(),
5131 get_atomic_clock_realtime().get_time_ns(),
5132 None,
5133 );
5134
5135 assert!(client.request_instruments(&request).is_ok());
5136
5137 let timeout = tokio::time::Duration::from_secs(3);
5139 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5140 tokio::time::timeout(timeout, rx.recv()).await
5141 {
5142 assert!(resp.data.is_empty(), "Expected empty response on 503");
5143 }
5144 }
5145
5146 #[tokio::test]
5147 async fn test_http_429_rate_limit_handling() {
5148 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5150 set_data_event_sender(sender);
5151
5152 let client_id = ClientId::from("DYDX-429");
5153 let config = DydxDataClientConfig {
5154 base_url_http: Some("http://httpstat.us/429".to_string()),
5155 http_timeout_secs: Some(2),
5156 ..Default::default()
5157 };
5158
5159 let http_client = DydxHttpClient::new(
5160 config.base_url_http.clone(),
5161 config.http_timeout_secs,
5162 config.http_proxy_url.clone(),
5163 config.is_testnet,
5164 None,
5165 )
5166 .unwrap();
5167
5168 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5169
5170 let instrument = create_test_instrument_any();
5171 let instrument_id = instrument.id();
5172 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5173 client.instruments.insert(symbol_key, instrument);
5174
5175 let request = RequestTrades::new(
5176 instrument_id,
5177 None,
5178 None,
5179 None,
5180 Some(client_id),
5181 UUID4::new(),
5182 get_atomic_clock_realtime().get_time_ns(),
5183 None,
5184 );
5185
5186 assert!(client.request_trades(&request).is_ok());
5187
5188 let timeout = tokio::time::Duration::from_secs(3);
5190 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5191 tokio::time::timeout(timeout, rx.recv()).await
5192 {
5193 assert!(
5194 resp.data.is_empty(),
5195 "Expected empty response on rate limit"
5196 );
5197 }
5198 }
5199
5200 #[tokio::test]
5201 async fn test_error_handling_does_not_panic() {
5202 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5204 set_data_event_sender(sender);
5205
5206 let client_id = ClientId::from("DYDX-NO-PANIC");
5207 let config = DydxDataClientConfig {
5208 base_url_http: Some("http://invalid".to_string()),
5209 ..Default::default()
5210 };
5211
5212 let http_client = DydxHttpClient::new(
5213 config.base_url_http.clone(),
5214 config.http_timeout_secs,
5215 config.http_proxy_url.clone(),
5216 config.is_testnet,
5217 None,
5218 )
5219 .unwrap();
5220
5221 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5222
5223 let request_instruments = RequestInstruments::new(
5225 None,
5226 None,
5227 Some(client_id),
5228 Some(*DYDX_VENUE),
5229 UUID4::new(),
5230 get_atomic_clock_realtime().get_time_ns(),
5231 None,
5232 );
5233 assert!(client.request_instruments(&request_instruments).is_ok());
5234
5235 let instrument_id = InstrumentId::from("INVALID.DYDX");
5236 let request_instrument = RequestInstrument::new(
5237 instrument_id,
5238 None,
5239 None,
5240 Some(client_id),
5241 UUID4::new(),
5242 get_atomic_clock_realtime().get_time_ns(),
5243 None,
5244 );
5245 assert!(client.request_instrument(&request_instrument).is_ok());
5246
5247 let request_trades = RequestTrades::new(
5248 instrument_id,
5249 None,
5250 None,
5251 None,
5252 Some(client_id),
5253 UUID4::new(),
5254 get_atomic_clock_realtime().get_time_ns(),
5255 None,
5256 );
5257 assert!(client.request_trades(&request_trades).is_ok());
5258 }
5259
5260 #[tokio::test]
5265 async fn test_malformed_json_response() {
5266 use axum::{Router, routing::get};
5268
5269 #[derive(Clone)]
5270 struct MalformedState;
5271
5272 async fn malformed_markets_handler() -> String {
5273 r#"{"markets": {"BTC-USD": {"ticker": "BTC-USD""#.to_string()
5275 }
5276
5277 let app = Router::new()
5278 .route("/v4/markets", get(malformed_markets_handler))
5279 .with_state(MalformedState);
5280
5281 let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5282 let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5283 let port = listener.local_addr().unwrap().port();
5284
5285 tokio::spawn(async move {
5286 axum::serve(listener, app).await.unwrap();
5287 });
5288
5289 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5290
5291 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5292 set_data_event_sender(sender);
5293
5294 let client_id = ClientId::from("DYDX-MALFORMED");
5295 let config = DydxDataClientConfig {
5296 base_url_http: Some(format!("http://127.0.0.1:{}", port)),
5297 http_timeout_secs: Some(2),
5298 ..Default::default()
5299 };
5300
5301 let http_client = DydxHttpClient::new(
5302 config.base_url_http.clone(),
5303 config.http_timeout_secs,
5304 config.http_proxy_url.clone(),
5305 config.is_testnet,
5306 None,
5307 )
5308 .unwrap();
5309
5310 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5311
5312 let request = RequestInstruments::new(
5313 None,
5314 None,
5315 Some(client_id),
5316 Some(*DYDX_VENUE),
5317 UUID4::new(),
5318 get_atomic_clock_realtime().get_time_ns(),
5319 None,
5320 );
5321
5322 assert!(client.request_instruments(&request).is_ok());
5323
5324 let timeout = tokio::time::Duration::from_secs(3);
5326 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5327 tokio::time::timeout(timeout, rx.recv()).await
5328 {
5329 assert!(
5330 resp.data.is_empty(),
5331 "Expected empty response on malformed JSON"
5332 );
5333 }
5334 }
5335
5336 #[tokio::test]
5337 async fn test_missing_required_fields_in_response() {
5338 use axum::{Json, Router, routing::get};
5340 use serde_json::{Value, json};
5341
5342 #[derive(Clone)]
5343 struct MissingFieldsState;
5344
5345 async fn missing_fields_handler() -> Json<Value> {
5346 Json(json!({
5348 "markets": {
5349 "BTC-USD": {
5350 "status": "ACTIVE",
5352 "baseAsset": "BTC",
5353 "quoteAsset": "USD",
5354 }
5356 }
5357 }))
5358 }
5359
5360 let app = Router::new()
5361 .route("/v4/markets", get(missing_fields_handler))
5362 .with_state(MissingFieldsState);
5363
5364 let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5365 let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5366 let port = listener.local_addr().unwrap().port();
5367
5368 tokio::spawn(async move {
5369 axum::serve(listener, app).await.unwrap();
5370 });
5371
5372 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5373
5374 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5375 set_data_event_sender(sender);
5376
5377 let client_id = ClientId::from("DYDX-MISSING");
5378 let config = DydxDataClientConfig {
5379 base_url_http: Some(format!("http://127.0.0.1:{}", port)),
5380 http_timeout_secs: Some(2),
5381 ..Default::default()
5382 };
5383
5384 let http_client = DydxHttpClient::new(
5385 config.base_url_http.clone(),
5386 config.http_timeout_secs,
5387 config.http_proxy_url.clone(),
5388 config.is_testnet,
5389 None,
5390 )
5391 .unwrap();
5392
5393 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5394
5395 let request = RequestInstruments::new(
5396 None,
5397 None,
5398 Some(client_id),
5399 Some(*DYDX_VENUE),
5400 UUID4::new(),
5401 get_atomic_clock_realtime().get_time_ns(),
5402 None,
5403 );
5404
5405 assert!(client.request_instruments(&request).is_ok());
5406
5407 let timeout = tokio::time::Duration::from_secs(3);
5409 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5410 tokio::time::timeout(timeout, rx.recv()).await
5411 {
5412 assert!(resp.correlation_id == request.request_id);
5415 }
5416 }
5417
5418 #[tokio::test]
5419 async fn test_invalid_data_types_in_response() {
5420 use axum::{Json, Router, routing::get};
5422 use serde_json::{Value, json};
5423
5424 #[derive(Clone)]
5425 struct InvalidTypesState;
5426
5427 async fn invalid_types_handler() -> Json<Value> {
5428 Json(json!({
5430 "markets": {
5431 "BTC-USD": {
5432 "ticker": "BTC-USD",
5433 "status": "ACTIVE",
5434 "baseAsset": "BTC",
5435 "quoteAsset": "USD",
5436 "stepSize": "not_a_number", "tickSize": true, "minOrderSize": ["array"], "market": 12345, }
5441 }
5442 }))
5443 }
5444
5445 let app = Router::new()
5446 .route("/v4/markets", get(invalid_types_handler))
5447 .with_state(InvalidTypesState);
5448
5449 let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5450 let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5451 let port = listener.local_addr().unwrap().port();
5452
5453 tokio::spawn(async move {
5454 axum::serve(listener, app).await.unwrap();
5455 });
5456
5457 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5458
5459 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5460 set_data_event_sender(sender);
5461
5462 let client_id = ClientId::from("DYDX-TYPES");
5463 let config = DydxDataClientConfig {
5464 base_url_http: Some(format!("http://127.0.0.1:{}", port)),
5465 http_timeout_secs: Some(2),
5466 ..Default::default()
5467 };
5468
5469 let http_client = DydxHttpClient::new(
5470 config.base_url_http.clone(),
5471 config.http_timeout_secs,
5472 config.http_proxy_url.clone(),
5473 config.is_testnet,
5474 None,
5475 )
5476 .unwrap();
5477
5478 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5479
5480 let request = RequestInstruments::new(
5481 None,
5482 None,
5483 Some(client_id),
5484 Some(*DYDX_VENUE),
5485 UUID4::new(),
5486 get_atomic_clock_realtime().get_time_ns(),
5487 None,
5488 );
5489
5490 assert!(client.request_instruments(&request).is_ok());
5491
5492 let timeout = tokio::time::Duration::from_secs(3);
5494 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5495 tokio::time::timeout(timeout, rx.recv()).await
5496 {
5497 assert!(resp.correlation_id == request.request_id);
5499 }
5500 }
5501
5502 #[tokio::test]
5503 async fn test_unexpected_response_structure() {
5504 use axum::{Json, Router, routing::get};
5506 use serde_json::{Value, json};
5507
5508 #[derive(Clone)]
5509 struct UnexpectedState;
5510
5511 async fn unexpected_structure_handler() -> Json<Value> {
5512 Json(json!({
5514 "error": "Something went wrong",
5515 "code": 500,
5516 "data": null,
5517 "unexpected_field": {
5518 "nested": {
5519 "deeply": [1, 2, 3]
5520 }
5521 }
5522 }))
5523 }
5524
5525 let app = Router::new()
5526 .route("/v4/markets", get(unexpected_structure_handler))
5527 .with_state(UnexpectedState);
5528
5529 let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5530 let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5531 let port = listener.local_addr().unwrap().port();
5532
5533 tokio::spawn(async move {
5534 axum::serve(listener, app).await.unwrap();
5535 });
5536
5537 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5538
5539 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5540 set_data_event_sender(sender);
5541
5542 let client_id = ClientId::from("DYDX-STRUCT");
5543 let config = DydxDataClientConfig {
5544 base_url_http: Some(format!("http://127.0.0.1:{}", port)),
5545 http_timeout_secs: Some(2),
5546 ..Default::default()
5547 };
5548
5549 let http_client = DydxHttpClient::new(
5550 config.base_url_http.clone(),
5551 config.http_timeout_secs,
5552 config.http_proxy_url.clone(),
5553 config.is_testnet,
5554 None,
5555 )
5556 .unwrap();
5557
5558 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5559
5560 let request = RequestInstruments::new(
5561 None,
5562 None,
5563 Some(client_id),
5564 Some(*DYDX_VENUE),
5565 UUID4::new(),
5566 get_atomic_clock_realtime().get_time_ns(),
5567 None,
5568 );
5569
5570 assert!(client.request_instruments(&request).is_ok());
5571
5572 let timeout = tokio::time::Duration::from_secs(3);
5574 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5575 tokio::time::timeout(timeout, rx.recv()).await
5576 {
5577 assert!(
5578 resp.data.is_empty(),
5579 "Expected empty response on unexpected structure"
5580 );
5581 assert!(resp.correlation_id == request.request_id);
5582 }
5583 }
5584
5585 #[tokio::test]
5586 async fn test_empty_markets_object_in_response() {
5587 use axum::{Json, Router, routing::get};
5589 use serde_json::{Value, json};
5590
5591 #[derive(Clone)]
5592 struct EmptyMarketsState;
5593
5594 async fn empty_markets_handler() -> Json<Value> {
5595 Json(json!({
5596 "markets": {}
5597 }))
5598 }
5599
5600 let app = Router::new()
5601 .route("/v4/markets", get(empty_markets_handler))
5602 .with_state(EmptyMarketsState);
5603
5604 let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5605 let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5606 let port = listener.local_addr().unwrap().port();
5607
5608 tokio::spawn(async move {
5609 axum::serve(listener, app).await.unwrap();
5610 });
5611
5612 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5613
5614 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5615 set_data_event_sender(sender);
5616
5617 let client_id = ClientId::from("DYDX-EMPTY");
5618 let config = DydxDataClientConfig {
5619 base_url_http: Some(format!("http://127.0.0.1:{}", port)),
5620 http_timeout_secs: Some(2),
5621 ..Default::default()
5622 };
5623
5624 let http_client = DydxHttpClient::new(
5625 config.base_url_http.clone(),
5626 config.http_timeout_secs,
5627 config.http_proxy_url.clone(),
5628 config.is_testnet,
5629 None,
5630 )
5631 .unwrap();
5632
5633 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5634
5635 let request = RequestInstruments::new(
5636 None,
5637 None,
5638 Some(client_id),
5639 Some(*DYDX_VENUE),
5640 UUID4::new(),
5641 get_atomic_clock_realtime().get_time_ns(),
5642 None,
5643 );
5644
5645 assert!(client.request_instruments(&request).is_ok());
5646
5647 let timeout = tokio::time::Duration::from_secs(3);
5649 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5650 tokio::time::timeout(timeout, rx.recv()).await
5651 {
5652 assert!(
5653 resp.data.is_empty(),
5654 "Expected empty response for empty markets"
5655 );
5656 assert!(resp.correlation_id == request.request_id);
5657 }
5658 }
5659
5660 #[tokio::test]
5661 async fn test_null_values_in_response() {
5662 use axum::{Json, Router, routing::get};
5664 use serde_json::{Value, json};
5665
5666 #[derive(Clone)]
5667 struct NullValuesState;
5668
5669 async fn null_values_handler() -> Json<Value> {
5670 Json(json!({
5671 "markets": {
5672 "BTC-USD": {
5673 "ticker": null,
5674 "status": "ACTIVE",
5675 "baseAsset": null,
5676 "quoteAsset": "USD",
5677 "stepSize": null,
5678 }
5679 }
5680 }))
5681 }
5682
5683 let app = Router::new()
5684 .route("/v4/markets", get(null_values_handler))
5685 .with_state(NullValuesState);
5686
5687 let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5688 let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5689 let port = listener.local_addr().unwrap().port();
5690
5691 tokio::spawn(async move {
5692 axum::serve(listener, app).await.unwrap();
5693 });
5694
5695 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5696
5697 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5698 set_data_event_sender(sender);
5699
5700 let client_id = ClientId::from("DYDX-NULL");
5701 let config = DydxDataClientConfig {
5702 base_url_http: Some(format!("http://127.0.0.1:{}", port)),
5703 http_timeout_secs: Some(2),
5704 ..Default::default()
5705 };
5706
5707 let http_client = DydxHttpClient::new(
5708 config.base_url_http.clone(),
5709 config.http_timeout_secs,
5710 config.http_proxy_url.clone(),
5711 config.is_testnet,
5712 None,
5713 )
5714 .unwrap();
5715
5716 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5717
5718 let request = RequestInstruments::new(
5719 None,
5720 None,
5721 Some(client_id),
5722 Some(*DYDX_VENUE),
5723 UUID4::new(),
5724 get_atomic_clock_realtime().get_time_ns(),
5725 None,
5726 );
5727
5728 assert!(client.request_instruments(&request).is_ok());
5729
5730 let timeout = tokio::time::Duration::from_secs(3);
5732 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5733 tokio::time::timeout(timeout, rx.recv()).await
5734 {
5735 assert!(resp.correlation_id == request.request_id);
5737 }
5738 }
5739
5740 #[tokio::test]
5745 async fn test_invalid_instrument_id_format() {
5746 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5748 set_data_event_sender(sender);
5749
5750 let client_id = ClientId::from("DYDX-INVALID-ID");
5751 let config = DydxDataClientConfig::default();
5752
5753 let http_client = DydxHttpClient::new(
5754 config.base_url_http.clone(),
5755 config.http_timeout_secs,
5756 config.http_proxy_url.clone(),
5757 config.is_testnet,
5758 None,
5759 )
5760 .unwrap();
5761
5762 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5763
5764 let non_existent_id = InstrumentId::from("NONEXISTENT-USD.DYDX");
5766
5767 let request = RequestInstrument::new(
5768 non_existent_id,
5769 None,
5770 None,
5771 Some(client_id),
5772 UUID4::new(),
5773 get_atomic_clock_realtime().get_time_ns(),
5774 None,
5775 );
5776
5777 assert!(client.request_instrument(&request).is_ok());
5778
5779 let timeout = tokio::time::Duration::from_secs(2);
5781 let result = tokio::time::timeout(timeout, rx.recv()).await;
5782
5783 match result {
5785 Ok(Some(DataEvent::Response(DataResponse::Instrument(_)))) => {
5786 }
5788 Ok(None) | Err(_) => {
5789 }
5791 _ => {}
5792 }
5793 }
5794
5795 #[tokio::test]
5796 async fn test_invalid_date_range_end_before_start() {
5797 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5799 set_data_event_sender(sender);
5800
5801 let client_id = ClientId::from("DYDX-DATE-RANGE");
5802 let config = DydxDataClientConfig::default();
5803
5804 let http_client = DydxHttpClient::new(
5805 config.base_url_http.clone(),
5806 config.http_timeout_secs,
5807 config.http_proxy_url.clone(),
5808 config.is_testnet,
5809 None,
5810 )
5811 .unwrap();
5812
5813 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5814
5815 let instrument = create_test_instrument_any();
5816 let instrument_id = instrument.id();
5817 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5818 client.instruments.insert(symbol_key, instrument);
5819
5820 let start = chrono::Utc::now();
5822 let end = start - chrono::Duration::hours(24); let request = RequestTrades::new(
5825 instrument_id,
5826 Some(start),
5827 Some(end),
5828 None,
5829 Some(client_id),
5830 UUID4::new(),
5831 get_atomic_clock_realtime().get_time_ns(),
5832 None,
5833 );
5834
5835 assert!(client.request_trades(&request).is_ok());
5836
5837 let timeout = tokio::time::Duration::from_secs(2);
5839 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5840 tokio::time::timeout(timeout, rx.recv()).await
5841 {
5842 assert!(resp.correlation_id == request.request_id);
5844 }
5845 }
5846
5847 #[tokio::test]
5848 async fn test_negative_limit_value() {
5849 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5852 set_data_event_sender(sender);
5853
5854 let client_id = ClientId::from("DYDX-NEG-LIMIT");
5855 let config = DydxDataClientConfig::default();
5856
5857 let http_client = DydxHttpClient::new(
5858 config.base_url_http.clone(),
5859 config.http_timeout_secs,
5860 config.http_proxy_url.clone(),
5861 config.is_testnet,
5862 None,
5863 )
5864 .unwrap();
5865
5866 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5867
5868 let instrument = create_test_instrument_any();
5869 let instrument_id = instrument.id();
5870 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5871 client.instruments.insert(symbol_key, instrument);
5872
5873 let request = RequestTrades::new(
5875 instrument_id,
5876 None,
5877 None,
5878 std::num::NonZeroUsize::new(1), Some(client_id),
5880 UUID4::new(),
5881 get_atomic_clock_realtime().get_time_ns(),
5882 None,
5883 );
5884
5885 assert!(client.request_trades(&request).is_ok());
5887 }
5888
5889 #[tokio::test]
5890 async fn test_zero_limit_value() {
5891 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5894 set_data_event_sender(sender);
5895
5896 let client_id = ClientId::from("DYDX-ZERO-LIMIT");
5897 let config = DydxDataClientConfig::default();
5898
5899 let http_client = DydxHttpClient::new(
5900 config.base_url_http.clone(),
5901 config.http_timeout_secs,
5902 config.http_proxy_url.clone(),
5903 config.is_testnet,
5904 None,
5905 )
5906 .unwrap();
5907
5908 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5909
5910 let instrument = create_test_instrument_any();
5911 let instrument_id = instrument.id();
5912 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5913 client.instruments.insert(symbol_key, instrument);
5914
5915 let request = RequestTrades::new(
5916 instrument_id,
5917 None,
5918 None,
5919 None, Some(client_id),
5921 UUID4::new(),
5922 get_atomic_clock_realtime().get_time_ns(),
5923 None,
5924 );
5925
5926 assert!(client.request_trades(&request).is_ok());
5927
5928 let timeout = tokio::time::Duration::from_secs(2);
5930 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5931 tokio::time::timeout(timeout, rx.recv()).await
5932 {
5933 assert!(resp.correlation_id == request.request_id);
5934 }
5935 }
5936
5937 #[tokio::test]
5938 async fn test_very_large_limit_value() {
5939 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5941 set_data_event_sender(sender);
5942
5943 let client_id = ClientId::from("DYDX-LARGE-LIMIT");
5944 let config = DydxDataClientConfig::default();
5945
5946 let http_client = DydxHttpClient::new(
5947 config.base_url_http.clone(),
5948 config.http_timeout_secs,
5949 config.http_proxy_url.clone(),
5950 config.is_testnet,
5951 None,
5952 )
5953 .unwrap();
5954
5955 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5956
5957 let instrument = create_test_instrument_any();
5958 let instrument_id = instrument.id();
5959 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5960 client.instruments.insert(symbol_key, instrument);
5961
5962 let request = RequestTrades::new(
5963 instrument_id,
5964 None,
5965 None,
5966 std::num::NonZeroUsize::new(1_000_000), Some(client_id),
5968 UUID4::new(),
5969 get_atomic_clock_realtime().get_time_ns(),
5970 None,
5971 );
5972
5973 assert!(client.request_trades(&request).is_ok());
5975
5976 let timeout = tokio::time::Duration::from_secs(2);
5978 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5979 tokio::time::timeout(timeout, rx.recv()).await
5980 {
5981 assert!(resp.correlation_id == request.request_id);
5982 }
5983 }
5984
5985 #[tokio::test]
5986 async fn test_none_limit_uses_default() {
5987 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5989 set_data_event_sender(sender);
5990
5991 let client_id = ClientId::from("DYDX-NONE-LIMIT");
5992 let config = DydxDataClientConfig::default();
5993
5994 let http_client = DydxHttpClient::new(
5995 config.base_url_http.clone(),
5996 config.http_timeout_secs,
5997 config.http_proxy_url.clone(),
5998 config.is_testnet,
5999 None,
6000 )
6001 .unwrap();
6002
6003 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6004
6005 let instrument = create_test_instrument_any();
6006 let instrument_id = instrument.id();
6007 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6008 client.instruments.insert(symbol_key, instrument);
6009
6010 let request = RequestTrades::new(
6011 instrument_id,
6012 None,
6013 None,
6014 None, Some(client_id),
6016 UUID4::new(),
6017 get_atomic_clock_realtime().get_time_ns(),
6018 None,
6019 );
6020
6021 assert!(client.request_trades(&request).is_ok());
6023
6024 let timeout = tokio::time::Duration::from_secs(2);
6025 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6026 tokio::time::timeout(timeout, rx.recv()).await
6027 {
6028 assert!(resp.correlation_id == request.request_id);
6029 }
6030 }
6031
6032 #[tokio::test]
6033 async fn test_validation_does_not_panic() {
6034 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6036 set_data_event_sender(sender);
6037
6038 let client_id = ClientId::from("DYDX-VALIDATION");
6039 let config = DydxDataClientConfig::default();
6040
6041 let http_client = DydxHttpClient::new(
6042 config.base_url_http.clone(),
6043 config.http_timeout_secs,
6044 config.http_proxy_url.clone(),
6045 config.is_testnet,
6046 None,
6047 )
6048 .unwrap();
6049
6050 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6051
6052 let instrument = create_test_instrument_any();
6053 let instrument_id = instrument.id();
6054 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6055 client.instruments.insert(symbol_key, instrument);
6056
6057 let invalid_id = InstrumentId::from("INVALID.WRONG");
6059 let req1 = RequestInstrument::new(
6060 invalid_id,
6061 None,
6062 None,
6063 Some(client_id),
6064 UUID4::new(),
6065 get_atomic_clock_realtime().get_time_ns(),
6066 None,
6067 );
6068 assert!(client.request_instrument(&req1).is_ok());
6069
6070 let start = chrono::Utc::now();
6072 let end = start - chrono::Duration::hours(1);
6073 let req2 = RequestTrades::new(
6074 instrument_id,
6075 Some(start),
6076 Some(end),
6077 None,
6078 Some(client_id),
6079 UUID4::new(),
6080 get_atomic_clock_realtime().get_time_ns(),
6081 None,
6082 );
6083 assert!(client.request_trades(&req2).is_ok());
6084
6085 let req3 = RequestTrades::new(
6087 instrument_id,
6088 None,
6089 None,
6090 std::num::NonZeroUsize::new(1),
6091 Some(client_id),
6092 UUID4::new(),
6093 get_atomic_clock_realtime().get_time_ns(),
6094 None,
6095 );
6096 assert!(client.request_trades(&req3).is_ok());
6097
6098 let req4 = RequestTrades::new(
6100 instrument_id,
6101 None,
6102 None,
6103 std::num::NonZeroUsize::new(usize::MAX),
6104 Some(client_id),
6105 UUID4::new(),
6106 get_atomic_clock_realtime().get_time_ns(),
6107 None,
6108 );
6109 assert!(client.request_trades(&req4).is_ok());
6110
6111 }
6113
6114 #[tokio::test]
6119 async fn test_instruments_response_has_correct_venue() {
6120 use axum::{Json, Router, routing::get};
6122 use serde_json::{Value, json};
6123
6124 #[derive(Clone)]
6125 struct VenueTestState;
6126
6127 async fn venue_handler() -> Json<Value> {
6128 Json(json!({
6129 "markets": {
6130 "BTC-USD": {
6131 "ticker": "BTC-USD",
6132 "status": "ACTIVE",
6133 "baseAsset": "BTC",
6134 "quoteAsset": "USD",
6135 "stepSize": "0.0001",
6136 "tickSize": "1",
6137 "indexPrice": "50000",
6138 "oraclePrice": "50000",
6139 "priceChange24H": "1000",
6140 "nextFundingRate": "0.0001",
6141 "nextFundingAt": "2024-01-01T00:00:00.000Z",
6142 "minOrderSize": "0.001",
6143 "type": "PERPETUAL",
6144 "initialMarginFraction": "0.05",
6145 "maintenanceMarginFraction": "0.03",
6146 "volume24H": "1000000",
6147 "trades24H": "10000",
6148 "openInterest": "5000000",
6149 "incrementalInitialMarginFraction": "0.01",
6150 "incrementalPositionSize": "10",
6151 "maxPositionSize": "1000",
6152 "baselinePositionSize": "100",
6153 "assetResolution": "10000000000"
6154 }
6155 }
6156 }))
6157 }
6158
6159 let app = Router::new()
6160 .route("/v4/markets", get(venue_handler))
6161 .with_state(VenueTestState);
6162
6163 let addr = SocketAddr::from(([127, 0, 0, 1], 0));
6164 let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
6165 let port = listener.local_addr().unwrap().port();
6166
6167 tokio::spawn(async move {
6168 axum::serve(listener, app).await.unwrap();
6169 });
6170
6171 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
6172
6173 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6174 set_data_event_sender(sender);
6175
6176 let client_id = ClientId::from("DYDX-VENUE-TEST");
6177 let config = DydxDataClientConfig {
6178 base_url_http: Some(format!("http://127.0.0.1:{}", port)),
6179 http_timeout_secs: Some(2),
6180 ..Default::default()
6181 };
6182
6183 let http_client = DydxHttpClient::new(
6184 config.base_url_http.clone(),
6185 config.http_timeout_secs,
6186 config.http_proxy_url.clone(),
6187 config.is_testnet,
6188 None,
6189 )
6190 .unwrap();
6191
6192 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6193
6194 let request = RequestInstruments::new(
6195 None,
6196 None,
6197 Some(client_id),
6198 Some(*DYDX_VENUE),
6199 UUID4::new(),
6200 get_atomic_clock_realtime().get_time_ns(),
6201 None,
6202 );
6203
6204 assert!(client.request_instruments(&request).is_ok());
6205
6206 let timeout = tokio::time::Duration::from_secs(3);
6207 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6208 tokio::time::timeout(timeout, rx.recv()).await
6209 {
6210 assert_eq!(resp.venue, *DYDX_VENUE, "Response should have DYDX venue");
6212 }
6213 }
6214
6215 #[tokio::test]
6216 async fn test_instruments_response_contains_vec_instrument_any() {
6217 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6219 set_data_event_sender(sender);
6220
6221 let client_id = ClientId::from("DYDX-VEC-TEST");
6222 let config = DydxDataClientConfig::default();
6223
6224 let http_client = DydxHttpClient::new(
6225 config.base_url_http.clone(),
6226 config.http_timeout_secs,
6227 config.http_proxy_url.clone(),
6228 config.is_testnet,
6229 None,
6230 )
6231 .unwrap();
6232
6233 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6234
6235 let request = RequestInstruments::new(
6236 None,
6237 None,
6238 Some(client_id),
6239 Some(*DYDX_VENUE),
6240 UUID4::new(),
6241 get_atomic_clock_realtime().get_time_ns(),
6242 None,
6243 );
6244
6245 assert!(client.request_instruments(&request).is_ok());
6246
6247 let timeout = tokio::time::Duration::from_secs(2);
6248 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6249 tokio::time::timeout(timeout, rx.recv()).await
6250 {
6251 assert!(
6253 resp.data.is_empty() || !resp.data.is_empty(),
6254 "data should be Vec<InstrumentAny>"
6255 );
6256 }
6257 }
6258
6259 #[tokio::test]
6260 async fn test_instruments_response_includes_correlation_id() {
6261 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6263 set_data_event_sender(sender);
6264
6265 let client_id = ClientId::from("DYDX-CORR-TEST");
6266 let config = DydxDataClientConfig::default();
6267
6268 let http_client = DydxHttpClient::new(
6269 config.base_url_http.clone(),
6270 config.http_timeout_secs,
6271 config.http_proxy_url.clone(),
6272 config.is_testnet,
6273 None,
6274 )
6275 .unwrap();
6276
6277 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6278
6279 let request_id = UUID4::new();
6280 let request = RequestInstruments::new(
6281 None,
6282 None,
6283 Some(client_id),
6284 Some(*DYDX_VENUE),
6285 request_id,
6286 get_atomic_clock_realtime().get_time_ns(),
6287 None,
6288 );
6289
6290 assert!(client.request_instruments(&request).is_ok());
6291
6292 let timeout = tokio::time::Duration::from_secs(2);
6293 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6294 tokio::time::timeout(timeout, rx.recv()).await
6295 {
6296 assert_eq!(
6298 resp.correlation_id, request_id,
6299 "correlation_id should match request_id"
6300 );
6301 }
6302 }
6303
6304 #[tokio::test]
6305 async fn test_instruments_response_includes_client_id() {
6306 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6308 set_data_event_sender(sender);
6309
6310 let client_id = ClientId::from("DYDX-CLIENT-TEST");
6311 let config = DydxDataClientConfig::default();
6312
6313 let http_client = DydxHttpClient::new(
6314 config.base_url_http.clone(),
6315 config.http_timeout_secs,
6316 config.http_proxy_url.clone(),
6317 config.is_testnet,
6318 None,
6319 )
6320 .unwrap();
6321
6322 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6323
6324 let request = RequestInstruments::new(
6325 None,
6326 None,
6327 Some(client_id),
6328 Some(*DYDX_VENUE),
6329 UUID4::new(),
6330 get_atomic_clock_realtime().get_time_ns(),
6331 None,
6332 );
6333
6334 assert!(client.request_instruments(&request).is_ok());
6335
6336 let timeout = tokio::time::Duration::from_secs(2);
6337 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6338 tokio::time::timeout(timeout, rx.recv()).await
6339 {
6340 assert_eq!(
6342 resp.client_id, client_id,
6343 "client_id should be included in response"
6344 );
6345 }
6346 }
6347
6348 #[tokio::test]
6349 async fn test_instruments_response_includes_timestamps() {
6350 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6352 set_data_event_sender(sender);
6353
6354 let client_id = ClientId::from("DYDX-TS-TEST");
6355 let config = DydxDataClientConfig::default();
6356
6357 let http_client = DydxHttpClient::new(
6358 config.base_url_http.clone(),
6359 config.http_timeout_secs,
6360 config.http_proxy_url.clone(),
6361 config.is_testnet,
6362 None,
6363 )
6364 .unwrap();
6365
6366 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6367
6368 let start = Some(chrono::Utc::now() - chrono::Duration::days(1));
6369 let end = Some(chrono::Utc::now());
6370 let ts_init = get_atomic_clock_realtime().get_time_ns();
6371
6372 let request = RequestInstruments::new(
6373 start,
6374 end,
6375 Some(client_id),
6376 Some(*DYDX_VENUE),
6377 UUID4::new(),
6378 ts_init,
6379 None,
6380 );
6381
6382 assert!(client.request_instruments(&request).is_ok());
6383
6384 let timeout = tokio::time::Duration::from_secs(2);
6385 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6386 tokio::time::timeout(timeout, rx.recv()).await
6387 {
6388 assert!(
6390 resp.start.is_some() || resp.start.is_none(),
6391 "start timestamp field exists"
6392 );
6393 assert!(
6394 resp.end.is_some() || resp.end.is_none(),
6395 "end timestamp field exists"
6396 );
6397 assert!(resp.ts_init > 0, "ts_init should be greater than 0");
6398 }
6399 }
6400
6401 #[tokio::test]
6402 async fn test_instruments_response_includes_params_when_provided() {
6403 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6405 set_data_event_sender(sender);
6406
6407 let client_id = ClientId::from("DYDX-PARAMS-TEST");
6408 let config = DydxDataClientConfig::default();
6409
6410 let http_client = DydxHttpClient::new(
6411 config.base_url_http.clone(),
6412 config.http_timeout_secs,
6413 config.http_proxy_url.clone(),
6414 config.is_testnet,
6415 None,
6416 )
6417 .unwrap();
6418
6419 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6420
6421 let request = RequestInstruments::new(
6424 None,
6425 None,
6426 Some(client_id),
6427 Some(*DYDX_VENUE),
6428 UUID4::new(),
6429 get_atomic_clock_realtime().get_time_ns(),
6430 None, );
6432
6433 assert!(client.request_instruments(&request).is_ok());
6434
6435 let timeout = tokio::time::Duration::from_secs(2);
6436 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6437 tokio::time::timeout(timeout, rx.recv()).await
6438 {
6439 let _params = resp.params;
6441 }
6442 }
6443
6444 #[tokio::test]
6445 async fn test_instruments_response_params_none_when_not_provided() {
6446 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6448 set_data_event_sender(sender);
6449
6450 let client_id = ClientId::from("DYDX-NO-PARAMS");
6451 let config = DydxDataClientConfig::default();
6452
6453 let http_client = DydxHttpClient::new(
6454 config.base_url_http.clone(),
6455 config.http_timeout_secs,
6456 config.http_proxy_url.clone(),
6457 config.is_testnet,
6458 None,
6459 )
6460 .unwrap();
6461
6462 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6463
6464 let request = RequestInstruments::new(
6465 None,
6466 None,
6467 Some(client_id),
6468 Some(*DYDX_VENUE),
6469 UUID4::new(),
6470 get_atomic_clock_realtime().get_time_ns(),
6471 None, );
6473
6474 assert!(client.request_instruments(&request).is_ok());
6475
6476 let timeout = tokio::time::Duration::from_secs(2);
6477 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6478 tokio::time::timeout(timeout, rx.recv()).await
6479 {
6480 assert!(
6482 resp.params.is_none(),
6483 "params should be None when not provided"
6484 );
6485 }
6486 }
6487
6488 #[tokio::test]
6489 async fn test_instruments_response_complete_structure() {
6490 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6492 set_data_event_sender(sender);
6493
6494 let client_id = ClientId::from("DYDX-FULL-TEST");
6495 let config = DydxDataClientConfig::default();
6496
6497 let http_client = DydxHttpClient::new(
6498 config.base_url_http.clone(),
6499 config.http_timeout_secs,
6500 config.http_proxy_url.clone(),
6501 config.is_testnet,
6502 None,
6503 )
6504 .unwrap();
6505
6506 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6507
6508 let request_id = UUID4::new();
6509 let start = Some(chrono::Utc::now() - chrono::Duration::hours(1));
6510 let end = Some(chrono::Utc::now());
6511 let ts_init = get_atomic_clock_realtime().get_time_ns();
6512
6513 let request = RequestInstruments::new(
6514 start,
6515 end,
6516 Some(client_id),
6517 Some(*DYDX_VENUE),
6518 request_id,
6519 ts_init,
6520 None,
6521 );
6522
6523 assert!(client.request_instruments(&request).is_ok());
6524
6525 let timeout = tokio::time::Duration::from_secs(2);
6526 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6527 tokio::time::timeout(timeout, rx.recv()).await
6528 {
6529 assert_eq!(resp.venue, *DYDX_VENUE, "venue should be DYDX");
6531 assert_eq!(
6532 resp.correlation_id, request_id,
6533 "correlation_id should match"
6534 );
6535 assert_eq!(resp.client_id, client_id, "client_id should match");
6536 assert!(resp.ts_init > 0, "ts_init should be set");
6537
6538 let _data: Vec<InstrumentAny> = resp.data;
6540
6541 let _start = resp.start;
6543 let _end = resp.end;
6544 let _params = resp.params;
6545 }
6546 }
6547
6548 #[tokio::test]
6553 async fn test_instrument_response_properly_boxed() {
6554 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6556 set_data_event_sender(sender);
6557
6558 let client_id = ClientId::from("DYDX-BOXED-TEST");
6559 let config = DydxDataClientConfig::default();
6560
6561 let http_client = DydxHttpClient::new(
6562 config.base_url_http.clone(),
6563 config.http_timeout_secs,
6564 config.http_proxy_url.clone(),
6565 config.is_testnet,
6566 None,
6567 )
6568 .unwrap();
6569
6570 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6571
6572 let instrument = create_test_instrument_any();
6573 let instrument_id = instrument.id();
6574 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6575 client.instruments.insert(symbol_key, instrument);
6576
6577 let request = RequestInstrument::new(
6578 instrument_id,
6579 None,
6580 None,
6581 Some(client_id),
6582 UUID4::new(),
6583 get_atomic_clock_realtime().get_time_ns(),
6584 None,
6585 );
6586
6587 assert!(client.request_instrument(&request).is_ok());
6588
6589 let timeout = tokio::time::Duration::from_secs(2);
6590 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(boxed_resp)))) =
6591 tokio::time::timeout(timeout, rx.recv()).await
6592 {
6593 let _response: Box<InstrumentResponse> = boxed_resp;
6595 }
6597 }
6598
6599 #[tokio::test]
6600 async fn test_instrument_response_contains_single_instrument() {
6601 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6603 set_data_event_sender(sender);
6604
6605 let client_id = ClientId::from("DYDX-SINGLE-TEST");
6606 let config = DydxDataClientConfig::default();
6607
6608 let http_client = DydxHttpClient::new(
6609 config.base_url_http.clone(),
6610 config.http_timeout_secs,
6611 config.http_proxy_url.clone(),
6612 config.is_testnet,
6613 None,
6614 )
6615 .unwrap();
6616
6617 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6618
6619 let instrument = create_test_instrument_any();
6620 let instrument_id = instrument.id();
6621 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6622 client.instruments.insert(symbol_key, instrument.clone());
6623
6624 let request = RequestInstrument::new(
6625 instrument_id,
6626 None,
6627 None,
6628 Some(client_id),
6629 UUID4::new(),
6630 get_atomic_clock_realtime().get_time_ns(),
6631 None,
6632 );
6633
6634 assert!(client.request_instrument(&request).is_ok());
6635
6636 let timeout = tokio::time::Duration::from_secs(2);
6637 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6638 tokio::time::timeout(timeout, rx.recv()).await
6639 {
6640 let _instrument: InstrumentAny = resp.data;
6642 }
6644 }
6645
6646 #[tokio::test]
6647 async fn test_instrument_response_has_correct_instrument_id() {
6648 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6650 set_data_event_sender(sender);
6651
6652 let client_id = ClientId::from("DYDX-ID-TEST");
6653 let config = DydxDataClientConfig::default();
6654
6655 let http_client = DydxHttpClient::new(
6656 config.base_url_http.clone(),
6657 config.http_timeout_secs,
6658 config.http_proxy_url.clone(),
6659 config.is_testnet,
6660 None,
6661 )
6662 .unwrap();
6663
6664 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6665
6666 let instrument = create_test_instrument_any();
6667 let instrument_id = instrument.id();
6668 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6669 client.instruments.insert(symbol_key, instrument);
6670
6671 let request = RequestInstrument::new(
6672 instrument_id,
6673 None,
6674 None,
6675 Some(client_id),
6676 UUID4::new(),
6677 get_atomic_clock_realtime().get_time_ns(),
6678 None,
6679 );
6680
6681 assert!(client.request_instrument(&request).is_ok());
6682
6683 let timeout = tokio::time::Duration::from_secs(2);
6684 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6685 tokio::time::timeout(timeout, rx.recv()).await
6686 {
6687 assert_eq!(
6689 resp.instrument_id, instrument_id,
6690 "instrument_id should match requested ID"
6691 );
6692 }
6693 }
6694
6695 #[tokio::test]
6696 async fn test_instrument_response_includes_metadata() {
6697 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6699 set_data_event_sender(sender);
6700
6701 let client_id = ClientId::from("DYDX-META-TEST");
6702 let config = DydxDataClientConfig::default();
6703
6704 let http_client = DydxHttpClient::new(
6705 config.base_url_http.clone(),
6706 config.http_timeout_secs,
6707 config.http_proxy_url.clone(),
6708 config.is_testnet,
6709 None,
6710 )
6711 .unwrap();
6712
6713 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6714
6715 let instrument = create_test_instrument_any();
6716 let instrument_id = instrument.id();
6717 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6718 client.instruments.insert(symbol_key, instrument);
6719
6720 let request_id = UUID4::new();
6721 let start = Some(chrono::Utc::now() - chrono::Duration::hours(1));
6722 let end = Some(chrono::Utc::now());
6723 let ts_init = get_atomic_clock_realtime().get_time_ns();
6724
6725 let request = RequestInstrument::new(
6726 instrument_id,
6727 start,
6728 end,
6729 Some(client_id),
6730 request_id,
6731 ts_init,
6732 None,
6733 );
6734
6735 assert!(client.request_instrument(&request).is_ok());
6736
6737 let timeout = tokio::time::Duration::from_secs(2);
6738 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6739 tokio::time::timeout(timeout, rx.recv()).await
6740 {
6741 assert_eq!(
6743 resp.correlation_id, request_id,
6744 "correlation_id should match"
6745 );
6746 assert_eq!(resp.client_id, client_id, "client_id should match");
6747 assert!(resp.ts_init > 0, "ts_init should be set");
6748
6749 let _start = resp.start;
6751 let _end = resp.end;
6752
6753 let _params = resp.params;
6755 }
6756 }
6757
6758 #[tokio::test]
6759 async fn test_instrument_response_matches_requested_instrument() {
6760 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6762 set_data_event_sender(sender);
6763
6764 let client_id = ClientId::from("DYDX-MATCH-TEST");
6765 let config = DydxDataClientConfig::default();
6766
6767 let http_client = DydxHttpClient::new(
6768 config.base_url_http.clone(),
6769 config.http_timeout_secs,
6770 config.http_proxy_url.clone(),
6771 config.is_testnet,
6772 None,
6773 )
6774 .unwrap();
6775
6776 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6777
6778 let instrument = create_test_instrument_any();
6779 let instrument_id = instrument.id();
6780 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6781 client.instruments.insert(symbol_key, instrument.clone());
6782
6783 let request = RequestInstrument::new(
6784 instrument_id,
6785 None,
6786 None,
6787 Some(client_id),
6788 UUID4::new(),
6789 get_atomic_clock_realtime().get_time_ns(),
6790 None,
6791 );
6792
6793 assert!(client.request_instrument(&request).is_ok());
6794
6795 let timeout = tokio::time::Duration::from_secs(2);
6796 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6797 tokio::time::timeout(timeout, rx.recv()).await
6798 {
6799 assert_eq!(
6801 resp.data.id(),
6802 instrument_id,
6803 "Returned instrument should match requested"
6804 );
6805 assert_eq!(
6806 resp.instrument_id, instrument_id,
6807 "instrument_id field should match"
6808 );
6809
6810 assert_eq!(
6812 resp.data.id(),
6813 resp.instrument_id,
6814 "data.id() should match instrument_id field"
6815 );
6816 }
6817 }
6818
6819 #[tokio::test]
6820 async fn test_instrument_response_complete_structure() {
6821 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6823 set_data_event_sender(sender);
6824
6825 let client_id = ClientId::from("DYDX-FULL-INST-TEST");
6826 let config = DydxDataClientConfig::default();
6827
6828 let http_client = DydxHttpClient::new(
6829 config.base_url_http.clone(),
6830 config.http_timeout_secs,
6831 config.http_proxy_url.clone(),
6832 config.is_testnet,
6833 None,
6834 )
6835 .unwrap();
6836
6837 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6838
6839 let instrument = create_test_instrument_any();
6840 let instrument_id = instrument.id();
6841 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6842 client.instruments.insert(symbol_key, instrument.clone());
6843
6844 let request_id = UUID4::new();
6845 let ts_init = get_atomic_clock_realtime().get_time_ns();
6846
6847 let request = RequestInstrument::new(
6848 instrument_id,
6849 None,
6850 None,
6851 Some(client_id),
6852 request_id,
6853 ts_init,
6854 None,
6855 );
6856
6857 assert!(client.request_instrument(&request).is_ok());
6858
6859 let timeout = tokio::time::Duration::from_secs(2);
6860 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6861 tokio::time::timeout(timeout, rx.recv()).await
6862 {
6863 let _boxed: Box<InstrumentResponse> = resp.clone();
6866
6867 assert_eq!(resp.correlation_id, request_id);
6869 assert_eq!(resp.client_id, client_id);
6870 assert_eq!(resp.instrument_id, instrument_id);
6871 assert!(resp.ts_init > 0);
6872
6873 let returned_instrument: InstrumentAny = resp.data;
6875 assert_eq!(returned_instrument.id(), instrument_id);
6876
6877 let _start = resp.start;
6879 let _end = resp.end;
6880 let _params = resp.params;
6881 }
6882 }
6883
6884 #[tokio::test]
6889 async fn test_trades_response_contains_vec_trade_tick() {
6890 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6892 set_data_event_sender(sender);
6893
6894 let created_at = chrono::Utc::now();
6895 let http_trades = vec![
6896 crate::http::models::Trade {
6897 id: "trade-1".to_string(),
6898 side: OrderSide::Buy,
6899 size: dec!(1.0),
6900 price: dec!(100.0),
6901 created_at,
6902 created_at_height: 100,
6903 trade_type: crate::common::enums::DydxTradeType::Limit,
6904 },
6905 crate::http::models::Trade {
6906 id: "trade-2".to_string(),
6907 side: OrderSide::Sell,
6908 size: dec!(2.0),
6909 price: dec!(101.0),
6910 created_at: created_at + chrono::Duration::seconds(1),
6911 created_at_height: 101,
6912 trade_type: crate::common::enums::DydxTradeType::Limit,
6913 },
6914 ];
6915
6916 let trades_response = crate::http::models::TradesResponse {
6917 trades: http_trades,
6918 };
6919
6920 let state = TradesTestState {
6921 response: Arc::new(trades_response),
6922 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
6923 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
6924 };
6925
6926 let addr = start_trades_test_server(state).await;
6927 let base_url = format!("http://{}", addr);
6928
6929 let client_id = ClientId::from("DYDX-VEC-TEST");
6930 let config = DydxDataClientConfig {
6931 base_url_http: Some(base_url),
6932 is_testnet: true,
6933 ..Default::default()
6934 };
6935
6936 let http_client = DydxHttpClient::new(
6937 config.base_url_http.clone(),
6938 config.http_timeout_secs,
6939 config.http_proxy_url.clone(),
6940 config.is_testnet,
6941 None,
6942 )
6943 .unwrap();
6944
6945 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6946
6947 let instrument = create_test_instrument_any();
6948 let instrument_id = instrument.id();
6949 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6950 client.instruments.insert(symbol_key, instrument);
6951
6952 let request = RequestTrades::new(
6953 instrument_id,
6954 None,
6955 None,
6956 None,
6957 Some(client_id),
6958 UUID4::new(),
6959 get_atomic_clock_realtime().get_time_ns(),
6960 None,
6961 );
6962
6963 assert!(client.request_trades(&request).is_ok());
6964
6965 let timeout = tokio::time::Duration::from_millis(500);
6966 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6967 tokio::time::timeout(timeout, rx.recv()).await
6968 {
6969 let trade_ticks: Vec<TradeTick> = resp.data;
6971 assert_eq!(trade_ticks.len(), 2, "Should contain 2 TradeTick elements");
6972
6973 for tick in trade_ticks.iter() {
6975 assert_eq!(tick.instrument_id, instrument_id);
6976 }
6977 }
6978 }
6979
6980 #[tokio::test]
6981 async fn test_trades_response_has_correct_instrument_id() {
6982 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6984 set_data_event_sender(sender);
6985
6986 let created_at = chrono::Utc::now();
6987 let http_trade = crate::http::models::Trade {
6988 id: "instrument-id-test".to_string(),
6989 side: OrderSide::Buy,
6990 size: dec!(1.0),
6991 price: dec!(100.0),
6992 created_at,
6993 created_at_height: 100,
6994 trade_type: crate::common::enums::DydxTradeType::Limit,
6995 };
6996
6997 let trades_response = crate::http::models::TradesResponse {
6998 trades: vec![http_trade],
6999 };
7000
7001 let state = TradesTestState {
7002 response: Arc::new(trades_response),
7003 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7004 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7005 };
7006
7007 let addr = start_trades_test_server(state).await;
7008 let base_url = format!("http://{}", addr);
7009
7010 let client_id = ClientId::from("DYDX-INSTID-TEST");
7011 let config = DydxDataClientConfig {
7012 base_url_http: Some(base_url),
7013 is_testnet: true,
7014 ..Default::default()
7015 };
7016
7017 let http_client = DydxHttpClient::new(
7018 config.base_url_http.clone(),
7019 config.http_timeout_secs,
7020 config.http_proxy_url.clone(),
7021 config.is_testnet,
7022 None,
7023 )
7024 .unwrap();
7025
7026 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7027
7028 let instrument = create_test_instrument_any();
7029 let instrument_id = instrument.id();
7030 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7031 client.instruments.insert(symbol_key, instrument);
7032
7033 let request = RequestTrades::new(
7034 instrument_id,
7035 None,
7036 None,
7037 None,
7038 Some(client_id),
7039 UUID4::new(),
7040 get_atomic_clock_realtime().get_time_ns(),
7041 None,
7042 );
7043
7044 assert!(client.request_trades(&request).is_ok());
7045
7046 let timeout = tokio::time::Duration::from_millis(500);
7047 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7048 tokio::time::timeout(timeout, rx.recv()).await
7049 {
7050 assert_eq!(
7052 resp.instrument_id, instrument_id,
7053 "TradesResponse.instrument_id should match request"
7054 );
7055
7056 for tick in resp.data.iter() {
7058 assert_eq!(
7059 tick.instrument_id, instrument_id,
7060 "Each TradeTick should have correct instrument_id"
7061 );
7062 }
7063 }
7064 }
7065
7066 #[tokio::test]
7067 async fn test_trades_response_properly_ordered_by_timestamp() {
7068 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7070 set_data_event_sender(sender);
7071
7072 let base_time = chrono::Utc::now();
7073 let http_trades = vec![
7074 crate::http::models::Trade {
7075 id: "trade-oldest".to_string(),
7076 side: OrderSide::Buy,
7077 size: dec!(1.0),
7078 price: dec!(100.0),
7079 created_at: base_time,
7080 created_at_height: 100,
7081 trade_type: crate::common::enums::DydxTradeType::Limit,
7082 },
7083 crate::http::models::Trade {
7084 id: "trade-middle".to_string(),
7085 side: OrderSide::Sell,
7086 size: dec!(2.0),
7087 price: dec!(101.0),
7088 created_at: base_time + chrono::Duration::seconds(1),
7089 created_at_height: 101,
7090 trade_type: crate::common::enums::DydxTradeType::Limit,
7091 },
7092 crate::http::models::Trade {
7093 id: "trade-newest".to_string(),
7094 side: OrderSide::Buy,
7095 size: dec!(3.0),
7096 price: dec!(102.0),
7097 created_at: base_time + chrono::Duration::seconds(2),
7098 created_at_height: 102,
7099 trade_type: crate::common::enums::DydxTradeType::Limit,
7100 },
7101 ];
7102
7103 let trades_response = crate::http::models::TradesResponse {
7104 trades: http_trades,
7105 };
7106
7107 let state = TradesTestState {
7108 response: Arc::new(trades_response),
7109 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7110 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7111 };
7112
7113 let addr = start_trades_test_server(state).await;
7114 let base_url = format!("http://{}", addr);
7115
7116 let client_id = ClientId::from("DYDX-ORDER-TEST");
7117 let config = DydxDataClientConfig {
7118 base_url_http: Some(base_url),
7119 is_testnet: true,
7120 ..Default::default()
7121 };
7122
7123 let http_client = DydxHttpClient::new(
7124 config.base_url_http.clone(),
7125 config.http_timeout_secs,
7126 config.http_proxy_url.clone(),
7127 config.is_testnet,
7128 None,
7129 )
7130 .unwrap();
7131
7132 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7133
7134 let instrument = create_test_instrument_any();
7135 let instrument_id = instrument.id();
7136 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7137 client.instruments.insert(symbol_key, instrument);
7138
7139 let request = RequestTrades::new(
7140 instrument_id,
7141 None,
7142 None,
7143 None,
7144 Some(client_id),
7145 UUID4::new(),
7146 get_atomic_clock_realtime().get_time_ns(),
7147 None,
7148 );
7149
7150 assert!(client.request_trades(&request).is_ok());
7151
7152 let timeout = tokio::time::Duration::from_millis(500);
7153 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7154 tokio::time::timeout(timeout, rx.recv()).await
7155 {
7156 let trade_ticks = resp.data;
7158 assert_eq!(trade_ticks.len(), 3, "Should have 3 trades");
7159
7160 for i in 1..trade_ticks.len() {
7162 assert!(
7163 trade_ticks[i].ts_event >= trade_ticks[i - 1].ts_event,
7164 "Trades should be ordered by timestamp (ts_event) in ascending order"
7165 );
7166 }
7167
7168 assert!(
7170 trade_ticks[0].ts_event < trade_ticks[1].ts_event,
7171 "First trade should be before second"
7172 );
7173 assert!(
7174 trade_ticks[1].ts_event < trade_ticks[2].ts_event,
7175 "Second trade should be before third"
7176 );
7177 }
7178 }
7179
7180 #[tokio::test]
7181 async fn test_trades_response_all_trade_tick_fields_populated() {
7182 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7184 set_data_event_sender(sender);
7185
7186 let created_at = chrono::Utc::now();
7187 let http_trade = crate::http::models::Trade {
7188 id: "field-test".to_string(),
7189 side: OrderSide::Buy,
7190 size: dec!(5.5),
7191 price: dec!(12345.67),
7192 created_at,
7193 created_at_height: 999,
7194 trade_type: crate::common::enums::DydxTradeType::Limit,
7195 };
7196
7197 let trades_response = crate::http::models::TradesResponse {
7198 trades: vec![http_trade],
7199 };
7200
7201 let state = TradesTestState {
7202 response: Arc::new(trades_response),
7203 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7204 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7205 };
7206
7207 let addr = start_trades_test_server(state).await;
7208 let base_url = format!("http://{}", addr);
7209
7210 let client_id = ClientId::from("DYDX-FIELDS-TEST");
7211 let config = DydxDataClientConfig {
7212 base_url_http: Some(base_url),
7213 is_testnet: true,
7214 ..Default::default()
7215 };
7216
7217 let http_client = DydxHttpClient::new(
7218 config.base_url_http.clone(),
7219 config.http_timeout_secs,
7220 config.http_proxy_url.clone(),
7221 config.is_testnet,
7222 None,
7223 )
7224 .unwrap();
7225
7226 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7227
7228 let instrument = create_test_instrument_any();
7229 let instrument_id = instrument.id();
7230 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7231 client.instruments.insert(symbol_key, instrument);
7232
7233 let request = RequestTrades::new(
7234 instrument_id,
7235 None,
7236 None,
7237 None,
7238 Some(client_id),
7239 UUID4::new(),
7240 get_atomic_clock_realtime().get_time_ns(),
7241 None,
7242 );
7243
7244 assert!(client.request_trades(&request).is_ok());
7245
7246 let timeout = tokio::time::Duration::from_millis(500);
7247 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7248 tokio::time::timeout(timeout, rx.recv()).await
7249 {
7250 assert_eq!(resp.data.len(), 1, "Should have 1 trade");
7251 let tick = &resp.data[0];
7252
7253 assert_eq!(
7255 tick.instrument_id, instrument_id,
7256 "instrument_id should be set"
7257 );
7258 assert!(tick.price.as_f64() > 0.0, "price should be positive");
7259 assert!(tick.size.as_f64() > 0.0, "size should be positive");
7260
7261 match tick.aggressor_side {
7263 AggressorSide::Buyer | AggressorSide::Seller => {
7264 }
7266 AggressorSide::NoAggressor => {
7267 panic!("aggressor_side should be Buyer or Seller, not NoAggressor")
7268 }
7269 }
7270
7271 assert!(
7273 !tick.trade_id.to_string().is_empty(),
7274 "trade_id should be set"
7275 );
7276
7277 assert!(tick.ts_event > 0, "ts_event should be set");
7279 assert!(tick.ts_init > 0, "ts_init should be set");
7280 assert!(
7281 tick.ts_init >= tick.ts_event,
7282 "ts_init should be >= ts_event"
7283 );
7284 }
7285 }
7286
7287 #[tokio::test]
7288 async fn test_trades_response_includes_metadata() {
7289 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7291 set_data_event_sender(sender);
7292
7293 let created_at = chrono::Utc::now();
7294 let http_trade = crate::http::models::Trade {
7295 id: "metadata-test".to_string(),
7296 side: OrderSide::Buy,
7297 size: dec!(1.0),
7298 price: dec!(100.0),
7299 created_at,
7300 created_at_height: 100,
7301 trade_type: crate::common::enums::DydxTradeType::Limit,
7302 };
7303
7304 let trades_response = crate::http::models::TradesResponse {
7305 trades: vec![http_trade],
7306 };
7307
7308 let state = TradesTestState {
7309 response: Arc::new(trades_response),
7310 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7311 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7312 };
7313
7314 let addr = start_trades_test_server(state).await;
7315 let base_url = format!("http://{}", addr);
7316
7317 let client_id = ClientId::from("DYDX-META-TEST");
7318 let config = DydxDataClientConfig {
7319 base_url_http: Some(base_url),
7320 is_testnet: true,
7321 ..Default::default()
7322 };
7323
7324 let http_client = DydxHttpClient::new(
7325 config.base_url_http.clone(),
7326 config.http_timeout_secs,
7327 config.http_proxy_url.clone(),
7328 config.is_testnet,
7329 None,
7330 )
7331 .unwrap();
7332
7333 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7334
7335 let instrument = create_test_instrument_any();
7336 let instrument_id = instrument.id();
7337 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7338 client.instruments.insert(symbol_key, instrument);
7339
7340 let request_id = UUID4::new();
7341 let request = RequestTrades::new(
7342 instrument_id,
7343 None,
7344 None,
7345 None,
7346 Some(client_id),
7347 request_id,
7348 get_atomic_clock_realtime().get_time_ns(),
7349 None,
7350 );
7351
7352 assert!(client.request_trades(&request).is_ok());
7353
7354 let timeout = tokio::time::Duration::from_millis(500);
7355 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7356 tokio::time::timeout(timeout, rx.recv()).await
7357 {
7358 assert_eq!(
7360 resp.correlation_id, request_id,
7361 "correlation_id should match request"
7362 );
7363 assert_eq!(resp.client_id, client_id, "client_id should be set");
7364 assert_eq!(
7365 resp.instrument_id, instrument_id,
7366 "instrument_id should be set"
7367 );
7368 assert!(resp.ts_init > 0, "ts_init should be set");
7369
7370 let _start = resp.start;
7371 let _end = resp.end;
7372 let _params = resp.params;
7373 }
7374 }
7375
7376 #[tokio::test]
7377 async fn test_orderbook_cache_growth_with_many_instruments() {
7378 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7379 set_data_event_sender(sender);
7380
7381 let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7382 let config = DydxDataClientConfig {
7383 base_url_http: Some(base_url),
7384 is_testnet: true,
7385 ..Default::default()
7386 };
7387
7388 let http_client = DydxHttpClient::new(
7389 config.base_url_http.clone(),
7390 config.http_timeout_secs,
7391 config.http_proxy_url.clone(),
7392 config.is_testnet,
7393 None,
7394 )
7395 .unwrap();
7396
7397 let client =
7398 DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7399
7400 let initial_capacity = client.order_books.capacity();
7401
7402 for i in 0..100 {
7403 let symbol = format!("INSTRUMENT-{i}");
7404 let instrument_id = InstrumentId::from(format!("{symbol}-PERP.DYDX").as_str());
7405 client.order_books.insert(
7406 instrument_id,
7407 OrderBook::new(instrument_id, BookType::L2_MBP),
7408 );
7409 }
7410
7411 assert_eq!(client.order_books.len(), 100);
7412 assert!(client.order_books.capacity() >= initial_capacity);
7413
7414 client.order_books.clear();
7415 assert_eq!(client.order_books.len(), 0);
7416 }
7417
7418 #[rstest]
7419 #[ignore]
7420 fn test_instrument_id_validation_rejects_invalid_formats() {
7421 let invalid_ids = vec!["", "INVALID", "NO-VENUE", ".DYDX", "SYMBOL."];
7422
7423 for invalid_id in invalid_ids {
7424 let result = std::panic::catch_unwind(|| {
7425 let _ = InstrumentId::from(invalid_id);
7426 });
7427 assert!(
7428 result.is_err(),
7429 "Expected {invalid_id} to panic or be rejected"
7430 );
7431 }
7432 }
7433
7434 #[rstest]
7435 fn test_instrument_id_validation_accepts_valid_formats() {
7436 let valid_ids = vec![
7437 "BTC-USD-PERP.DYDX",
7438 "ETH-USD-PERP.DYDX",
7439 "SOL-USD.DYDX",
7440 "AVAX-USD-PERP.DYDX",
7441 ];
7442
7443 for valid_id in valid_ids {
7444 let instrument_id = InstrumentId::from(valid_id);
7445 assert!(
7446 !instrument_id.symbol.as_str().is_empty()
7447 && !instrument_id.venue.as_str().is_empty(),
7448 "Expected {valid_id} to have non-empty symbol and venue"
7449 );
7450 }
7451 }
7452
7453 #[tokio::test]
7454 async fn test_request_bars_with_inverted_date_range() {
7455 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7456 set_data_event_sender(sender);
7457
7458 let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7459 let config = DydxDataClientConfig {
7460 base_url_http: Some(base_url),
7461 is_testnet: true,
7462 ..Default::default()
7463 };
7464
7465 let http_client = DydxHttpClient::new(
7466 config.base_url_http.clone(),
7467 config.http_timeout_secs,
7468 config.http_proxy_url.clone(),
7469 config.is_testnet,
7470 None,
7471 )
7472 .unwrap();
7473
7474 let client =
7475 DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7476
7477 let instrument = create_test_instrument_any();
7478 let instrument_id = instrument.id();
7479 client
7480 .instruments
7481 .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7482
7483 let spec = BarSpecification {
7484 step: std::num::NonZeroUsize::new(1).unwrap(),
7485 aggregation: BarAggregation::Minute,
7486 price_type: PriceType::Last,
7487 };
7488 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
7489
7490 let now = chrono::Utc::now();
7491 let start = Some(now);
7492 let end = Some(now - chrono::Duration::hours(1));
7493
7494 let request = RequestBars::new(
7495 bar_type,
7496 start,
7497 end,
7498 None,
7499 Some(ClientId::from("dydx_test")),
7500 UUID4::new(),
7501 get_atomic_clock_realtime().get_time_ns(),
7502 None,
7503 );
7504
7505 let result = client.request_bars(&request);
7506 assert!(result.is_ok());
7507 }
7508
7509 #[tokio::test]
7510 async fn test_request_bars_with_zero_limit() {
7511 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7512 set_data_event_sender(sender);
7513
7514 let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7515 let config = DydxDataClientConfig {
7516 base_url_http: Some(base_url),
7517 is_testnet: true,
7518 ..Default::default()
7519 };
7520
7521 let http_client = DydxHttpClient::new(
7522 config.base_url_http.clone(),
7523 config.http_timeout_secs,
7524 config.http_proxy_url.clone(),
7525 config.is_testnet,
7526 None,
7527 )
7528 .unwrap();
7529
7530 let client =
7531 DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7532
7533 let instrument = create_test_instrument_any();
7534 let instrument_id = instrument.id();
7535 client
7536 .instruments
7537 .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7538
7539 let spec = BarSpecification {
7540 step: std::num::NonZeroUsize::new(1).unwrap(),
7541 aggregation: BarAggregation::Minute,
7542 price_type: PriceType::Last,
7543 };
7544 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
7545
7546 let request = RequestBars::new(
7547 bar_type,
7548 None,
7549 None,
7550 Some(std::num::NonZeroUsize::new(1).unwrap()),
7551 Some(ClientId::from("dydx_test")),
7552 UUID4::new(),
7553 get_atomic_clock_realtime().get_time_ns(),
7554 None,
7555 );
7556
7557 let result = client.request_bars(&request);
7558 assert!(result.is_ok());
7559 }
7560
7561 #[tokio::test]
7562 async fn test_request_trades_with_excessive_limit() {
7563 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7564 set_data_event_sender(sender);
7565
7566 let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7567 let config = DydxDataClientConfig {
7568 base_url_http: Some(base_url),
7569 is_testnet: true,
7570 ..Default::default()
7571 };
7572
7573 let http_client = DydxHttpClient::new(
7574 config.base_url_http.clone(),
7575 config.http_timeout_secs,
7576 config.http_proxy_url.clone(),
7577 config.is_testnet,
7578 None,
7579 )
7580 .unwrap();
7581
7582 let client =
7583 DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7584
7585 let instrument = create_test_instrument_any();
7586 let instrument_id = instrument.id();
7587 client
7588 .instruments
7589 .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7590
7591 let request = RequestTrades::new(
7592 instrument_id,
7593 None,
7594 None,
7595 Some(std::num::NonZeroUsize::new(100_000).unwrap()),
7596 Some(ClientId::from("dydx_test")),
7597 UUID4::new(),
7598 get_atomic_clock_realtime().get_time_ns(),
7599 None,
7600 );
7601
7602 let result = client.request_trades(&request);
7603 assert!(result.is_ok());
7604 }
7605}