1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use anyhow::Context;
24use dashmap::DashMap;
25use nautilus_common::{
26 live::{runner::get_data_event_sender, runtime::get_runtime},
27 messages::{
28 DataEvent, DataResponse,
29 data::{
30 BarsResponse, InstrumentResponse, InstrumentsResponse, RequestBars, RequestInstrument,
31 RequestInstruments, RequestTrades, SubscribeBars, SubscribeBookDeltas,
32 SubscribeBookSnapshots, SubscribeInstrument, SubscribeInstruments, SubscribeQuotes,
33 SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
34 UnsubscribeBookSnapshots, UnsubscribeInstrument, UnsubscribeInstruments,
35 UnsubscribeQuotes, UnsubscribeTrades,
36 },
37 },
38};
39use nautilus_core::{
40 UnixNanos,
41 datetime::datetime_to_unix_nanos,
42 time::{AtomicTime, get_atomic_clock_realtime},
43};
44use nautilus_data::client::DataClient;
45use nautilus_model::{
46 data::{
47 Bar, BarSpecification, BarType, BookOrder, Data as NautilusData, IndexPriceUpdate,
48 OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API, QuoteTick,
49 },
50 enums::{BarAggregation, BookAction, BookType, OrderSide, RecordFlag},
51 identifiers::{ClientId, InstrumentId, Venue},
52 instruments::{Instrument, InstrumentAny},
53 orderbook::OrderBook,
54 types::{Price, Quantity, price::PriceRaw},
55};
56use tokio::{task::JoinHandle, time::Duration};
57use tokio_util::sync::CancellationToken;
58use ustr::Ustr;
59
60use crate::{
61 common::{consts::DYDX_VENUE, parse::extract_raw_symbol},
62 config::DydxDataClientConfig,
63 http::client::DydxHttpClient,
64 types::DydxOraclePrice,
65 websocket::client::DydxWebSocketClient,
66};
67
68struct WsMessageContext<'a> {
70 data_sender: &'a tokio::sync::mpsc::UnboundedSender<DataEvent>,
71 instruments: &'a Arc<DashMap<Ustr, InstrumentAny>>,
72 order_books: &'a Arc<DashMap<InstrumentId, OrderBook>>,
73 last_quotes: &'a Arc<DashMap<InstrumentId, QuoteTick>>,
74 ws_client: &'a Option<DydxWebSocketClient>,
75 active_orderbook_subs: &'a Arc<DashMap<InstrumentId, ()>>,
76 active_trade_subs: &'a Arc<DashMap<InstrumentId, ()>>,
77 active_bar_subs: &'a Arc<DashMap<(InstrumentId, String), BarType>>,
78 incomplete_bars: &'a Arc<DashMap<BarType, Bar>>,
79}
80
81#[derive(Debug)]
89pub struct DydxDataClient {
90 client_id: ClientId,
92 config: DydxDataClientConfig,
94 http_client: DydxHttpClient,
96 ws_client: Option<DydxWebSocketClient>,
98 is_connected: AtomicBool,
100 cancellation_token: CancellationToken,
102 tasks: Vec<JoinHandle<()>>,
104 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
106 instruments: Arc<DashMap<Ustr, InstrumentAny>>,
108 clock: &'static AtomicTime,
110 order_books: Arc<DashMap<InstrumentId, OrderBook>>,
112 last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
114 incomplete_bars: Arc<DashMap<BarType, Bar>>,
118 bar_type_mappings: Arc<DashMap<String, BarType>>,
122 active_orderbook_subs: Arc<DashMap<InstrumentId, ()>>,
124 active_trade_subs: Arc<DashMap<InstrumentId, ()>>,
126 active_bar_subs: Arc<DashMap<(InstrumentId, String), BarType>>,
128}
129
130impl DydxDataClient {
131 fn map_bar_spec_to_resolution(spec: &BarSpecification) -> anyhow::Result<&'static str> {
137 match spec.step.get() {
138 1 => match spec.aggregation {
139 BarAggregation::Minute => Ok("1MIN"),
140 BarAggregation::Hour => Ok("1HOUR"),
141 BarAggregation::Day => Ok("1DAY"),
142 _ => anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation),
143 },
144 5 if spec.aggregation == BarAggregation::Minute => Ok("5MINS"),
145 15 if spec.aggregation == BarAggregation::Minute => Ok("15MINS"),
146 30 if spec.aggregation == BarAggregation::Minute => Ok("30MINS"),
147 4 if spec.aggregation == BarAggregation::Hour => Ok("4HOURS"),
148 step => anyhow::bail!(
149 "Unsupported bar step: {step} with aggregation {:?}",
150 spec.aggregation
151 ),
152 }
153 }
154
155 pub fn new(
161 client_id: ClientId,
162 config: DydxDataClientConfig,
163 http_client: DydxHttpClient,
164 ws_client: Option<DydxWebSocketClient>,
165 ) -> anyhow::Result<Self> {
166 let clock = get_atomic_clock_realtime();
167 let data_sender = get_data_event_sender();
168
169 let instruments_cache = http_client.instruments().clone();
171
172 Ok(Self {
173 client_id,
174 config,
175 http_client,
176 ws_client,
177 is_connected: AtomicBool::new(false),
178 cancellation_token: CancellationToken::new(),
179 tasks: Vec::new(),
180 data_sender,
181 instruments: instruments_cache,
182 clock,
183 order_books: Arc::new(DashMap::new()),
184 last_quotes: Arc::new(DashMap::new()),
185 incomplete_bars: Arc::new(DashMap::new()),
186 bar_type_mappings: Arc::new(DashMap::new()),
187 active_orderbook_subs: Arc::new(DashMap::new()),
188 active_trade_subs: Arc::new(DashMap::new()),
189 active_bar_subs: Arc::new(DashMap::new()),
190 })
191 }
192
193 #[must_use]
195 pub fn venue(&self) -> Venue {
196 *DYDX_VENUE
197 }
198
199 fn ws_client(&self) -> anyhow::Result<&DydxWebSocketClient> {
200 self.ws_client
201 .as_ref()
202 .context("websocket client not initialized; call connect first")
203 }
204
205 fn ws_client_mut(&mut self) -> anyhow::Result<&mut DydxWebSocketClient> {
207 self.ws_client
208 .as_mut()
209 .context("websocket client not initialized; call connect first")
210 }
211
212 #[must_use]
214 pub fn is_connected(&self) -> bool {
215 self.is_connected.load(Ordering::Relaxed)
216 }
217
218 fn spawn_ws<F>(&self, fut: F, context: &'static str)
222 where
223 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
224 {
225 get_runtime().spawn(async move {
226 if let Err(e) = fut.await {
227 tracing::error!("{context}: {e:?}");
228 }
229 });
230 }
231
232 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
247 tracing::info!("Bootstrapping dYdX instruments");
248
249 self.http_client
251 .fetch_and_cache_instruments()
252 .await
253 .context("failed to load instruments from dYdX")?;
254
255 let instruments: Vec<InstrumentAny> = self
256 .http_client
257 .instruments()
258 .iter()
259 .map(|entry| entry.value().clone())
260 .collect();
261
262 if instruments.is_empty() {
263 tracing::warn!("No dYdX instruments were loaded");
264 return Ok(instruments);
265 }
266
267 tracing::info!("Loaded {} dYdX instruments", instruments.len());
268
269 if let Some(ref ws) = self.ws_client {
270 ws.cache_instruments(instruments.clone());
271 }
272
273 Ok(instruments)
274 }
275}
276
277#[async_trait::async_trait(?Send)]
278impl DataClient for DydxDataClient {
279 fn client_id(&self) -> ClientId {
280 self.client_id
281 }
282
283 fn venue(&self) -> Option<Venue> {
284 Some(*DYDX_VENUE)
285 }
286
287 fn start(&mut self) -> anyhow::Result<()> {
288 tracing::info!(
289 client_id = %self.client_id,
290 is_testnet = self.http_client.is_testnet(),
291 "Starting dYdX data client"
292 );
293 Ok(())
294 }
295
296 fn stop(&mut self) -> anyhow::Result<()> {
297 tracing::info!("Stopping dYdX data client {}", self.client_id);
298 self.cancellation_token.cancel();
299 self.is_connected.store(false, Ordering::Relaxed);
300 Ok(())
301 }
302
303 fn reset(&mut self) -> anyhow::Result<()> {
304 tracing::debug!("Resetting dYdX data client {}", self.client_id);
305 self.is_connected.store(false, Ordering::Relaxed);
306 self.cancellation_token = CancellationToken::new();
307 self.tasks.clear();
308 Ok(())
309 }
310
311 fn dispose(&mut self) -> anyhow::Result<()> {
312 tracing::debug!("Disposing dYdX data client {}", self.client_id);
313 self.stop()
314 }
315
316 async fn connect(&mut self) -> anyhow::Result<()> {
317 if self.is_connected() {
318 return Ok(());
319 }
320
321 tracing::info!("Connecting dYdX data client");
322
323 self.bootstrap_instruments().await?;
325
326 if self.ws_client.is_some() {
328 let ws = self.ws_client_mut()?;
329
330 ws.connect()
331 .await
332 .context("failed to connect dYdX websocket")?;
333
334 ws.subscribe_markets()
335 .await
336 .context("failed to subscribe to markets channel")?;
337
338 if let Some(rx) = ws.take_receiver() {
340 let data_tx = self.data_sender.clone();
341 let instruments = self.instruments.clone();
342 let order_books = self.order_books.clone();
343 let last_quotes = self.last_quotes.clone();
344 let ws_client = self.ws_client.clone();
345 let active_orderbook_subs = self.active_orderbook_subs.clone();
346 let active_trade_subs = self.active_trade_subs.clone();
347 let active_bar_subs = self.active_bar_subs.clone();
348 let incomplete_bars = self.incomplete_bars.clone();
349
350 let task = get_runtime().spawn(async move {
351 let mut rx = rx;
352 while let Some(msg) = rx.recv().await {
353 let ctx = WsMessageContext {
354 data_sender: &data_tx,
355 instruments: &instruments,
356 order_books: &order_books,
357 last_quotes: &last_quotes,
358 ws_client: &ws_client,
359 active_orderbook_subs: &active_orderbook_subs,
360 active_trade_subs: &active_trade_subs,
361 active_bar_subs: &active_bar_subs,
362 incomplete_bars: &incomplete_bars,
363 };
364 Self::handle_ws_message(msg, &ctx);
365 }
366 });
367 self.tasks.push(task);
368 } else {
369 tracing::warn!("No inbound WS receiver available after connect");
370 }
371 }
372
373 self.start_orderbook_refresh_task()?;
375
376 self.start_instrument_refresh_task()?;
378
379 self.is_connected.store(true, Ordering::Relaxed);
380 tracing::info!("Connected dYdX data client");
381
382 Ok(())
383 }
384
385 async fn disconnect(&mut self) -> anyhow::Result<()> {
386 if !self.is_connected() {
387 return Ok(());
388 }
389
390 tracing::info!("Disconnecting dYdX data client");
391
392 if let Some(ref mut ws) = self.ws_client {
394 ws.disconnect()
395 .await
396 .context("failed to disconnect dYdX websocket")?;
397 }
398
399 self.is_connected.store(false, Ordering::Relaxed);
400 tracing::info!("Disconnected dYdX data client");
401
402 Ok(())
403 }
404
405 fn is_connected(&self) -> bool {
406 self.is_connected.load(Ordering::Relaxed)
407 }
408
409 fn is_disconnected(&self) -> bool {
410 !self.is_connected()
411 }
412
413 fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
414 tracing::debug!("unsubscribe_instruments: dYdX markets channel is global; no-op");
418 Ok(())
419 }
420
421 fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
422 tracing::debug!("unsubscribe_instrument: dYdX markets channel is global; no-op");
425 Ok(())
426 }
427
428 fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
429 tracing::debug!("subscribe_instruments: dYdX auto-subscribes via markets channel");
432 Ok(())
433 }
434
435 fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
436 tracing::debug!("subscribe_instrument: dYdX auto-subscribes via markets channel");
439 Ok(())
440 }
441
442 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
443 let ws = self.ws_client()?.clone();
444 let instrument_id = cmd.instrument_id;
445
446 self.active_trade_subs.insert(instrument_id, ());
448
449 self.spawn_ws(
450 async move {
451 ws.subscribe_trades(instrument_id)
452 .await
453 .context("trade subscription")
454 },
455 "dYdX trade subscription",
456 );
457
458 Ok(())
459 }
460
461 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
462 if cmd.book_type != BookType::L2_MBP {
463 anyhow::bail!(
464 "dYdX only supports L2_MBP order book deltas, received {:?}",
465 cmd.book_type
466 );
467 }
468
469 self.ensure_order_book(cmd.instrument_id, BookType::L2_MBP);
471
472 self.active_orderbook_subs.insert(cmd.instrument_id, ());
474
475 let ws = self.ws_client()?.clone();
476 let instrument_id = cmd.instrument_id;
477
478 self.spawn_ws(
479 async move {
480 ws.subscribe_orderbook(instrument_id)
481 .await
482 .context("orderbook subscription")
483 },
484 "dYdX orderbook subscription",
485 );
486
487 Ok(())
488 }
489
490 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
491 if cmd.book_type != BookType::L2_MBP {
492 anyhow::bail!(
493 "dYdX only supports L2_MBP order book snapshots, received {:?}",
494 cmd.book_type
495 );
496 }
497
498 self.active_orderbook_subs.insert(cmd.instrument_id, ());
500
501 let ws = self.ws_client()?.clone();
502 let instrument_id = cmd.instrument_id;
503
504 get_runtime().spawn(async move {
505 if let Err(e) = ws.subscribe_orderbook(instrument_id).await {
506 tracing::error!(
507 "Failed to subscribe to orderbook snapshot for {instrument_id}: {e:?}"
508 );
509 }
510 });
511
512 Ok(())
513 }
514
515 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
516 tracing::debug!(
519 "subscribe_quotes for {}: delegating to subscribe_book_deltas (no native quotes channel)",
520 cmd.instrument_id
521 );
522
523 let book_cmd = SubscribeBookDeltas {
525 client_id: cmd.client_id,
526 venue: cmd.venue,
527 instrument_id: cmd.instrument_id,
528 book_type: BookType::L2_MBP,
529 depth: None,
530 managed: false,
531 params: None,
532 command_id: cmd.command_id,
533 ts_init: cmd.ts_init,
534 };
535
536 self.subscribe_book_deltas(&book_cmd)
537 }
538
539 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
540 let ws = self.ws_client()?.clone();
541 let instrument_id = cmd.bar_type.instrument_id();
542 let spec = cmd.bar_type.spec();
543
544 let resolution = Self::map_bar_spec_to_resolution(&spec)?;
546
547 let bar_type = cmd.bar_type;
549 self.active_bar_subs
550 .insert((instrument_id, resolution.to_string()), bar_type);
551
552 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
554 let topic = format!("{ticker}/{resolution}");
555 self.bar_type_mappings.insert(topic.clone(), bar_type);
556
557 self.spawn_ws(
558 async move {
559 if let Err(e) =
561 ws.send_command(crate::websocket::handler::HandlerCommand::RegisterBarType {
562 topic,
563 bar_type,
564 })
565 {
566 anyhow::bail!("Failed to register bar type: {e}");
567 }
568
569 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
571
572 ws.subscribe_candles(instrument_id, resolution)
573 .await
574 .context("candles subscription")
575 },
576 "dYdX candles subscription",
577 );
578
579 Ok(())
580 }
581
582 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
583 self.active_trade_subs.remove(&cmd.instrument_id);
585
586 let ws = self.ws_client()?.clone();
587 let instrument_id = cmd.instrument_id;
588
589 self.spawn_ws(
590 async move {
591 ws.unsubscribe_trades(instrument_id)
592 .await
593 .context("trade unsubscription")
594 },
595 "dYdX trade unsubscription",
596 );
597
598 Ok(())
599 }
600
601 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
602 self.active_orderbook_subs.remove(&cmd.instrument_id);
604
605 let ws = self.ws_client()?.clone();
606 let instrument_id = cmd.instrument_id;
607
608 self.spawn_ws(
609 async move {
610 ws.unsubscribe_orderbook(instrument_id)
611 .await
612 .context("orderbook unsubscription")
613 },
614 "dYdX orderbook unsubscription",
615 );
616
617 Ok(())
618 }
619
620 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
621 self.active_orderbook_subs.remove(&cmd.instrument_id);
625
626 let ws = self.ws_client()?.clone();
627 let instrument_id = cmd.instrument_id;
628
629 self.spawn_ws(
630 async move {
631 ws.unsubscribe_orderbook(instrument_id)
632 .await
633 .context("orderbook snapshot unsubscription")
634 },
635 "dYdX orderbook snapshot unsubscription",
636 );
637
638 Ok(())
639 }
640
641 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
642 tracing::debug!(
644 "unsubscribe_quotes for {}: delegating to unsubscribe_book_deltas (no native quotes channel)",
645 cmd.instrument_id
646 );
647
648 let book_cmd = UnsubscribeBookDeltas {
649 instrument_id: cmd.instrument_id,
650 client_id: cmd.client_id,
651 venue: cmd.venue,
652 command_id: cmd.command_id,
653 ts_init: cmd.ts_init,
654 params: cmd.params.clone(),
655 };
656
657 self.unsubscribe_book_deltas(&book_cmd)
658 }
659
660 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
661 let ws = self.ws_client()?.clone();
662 let instrument_id = cmd.bar_type.instrument_id();
663 let spec = cmd.bar_type.spec();
664
665 let resolution = match spec.step.get() {
667 1 => match spec.aggregation {
668 BarAggregation::Minute => "1MIN",
669 BarAggregation::Hour => "1HOUR",
670 BarAggregation::Day => "1DAY",
671 _ => {
672 anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation);
673 }
674 },
675 5 => {
676 if spec.aggregation == BarAggregation::Minute {
677 "5MINS"
678 } else {
679 anyhow::bail!("Unsupported 5-step aggregation: {:?}", spec.aggregation);
680 }
681 }
682 15 => {
683 if spec.aggregation == BarAggregation::Minute {
684 "15MINS"
685 } else {
686 anyhow::bail!("Unsupported 15-step aggregation: {:?}", spec.aggregation);
687 }
688 }
689 30 => {
690 if spec.aggregation == BarAggregation::Minute {
691 "30MINS"
692 } else {
693 anyhow::bail!("Unsupported 30-step aggregation: {:?}", spec.aggregation);
694 }
695 }
696 4 => {
697 if spec.aggregation == BarAggregation::Hour {
698 "4HOURS"
699 } else {
700 anyhow::bail!("Unsupported 4-step aggregation: {:?}", spec.aggregation);
701 }
702 }
703 step => {
704 anyhow::bail!("Unsupported bar step: {step}");
705 }
706 };
707
708 self.active_bar_subs
710 .remove(&(instrument_id, resolution.to_string()));
711
712 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
714 let topic = format!("{ticker}/{resolution}");
715 self.bar_type_mappings.remove(&topic);
716
717 if let Err(e) =
718 ws.send_command(crate::websocket::handler::HandlerCommand::UnregisterBarType { topic })
719 {
720 tracing::warn!("Failed to unregister bar type: {e}");
721 }
722
723 self.spawn_ws(
724 async move {
725 ws.unsubscribe_candles(instrument_id, resolution)
726 .await
727 .context("candles unsubscription")
728 },
729 "dYdX candles unsubscription",
730 );
731
732 Ok(())
733 }
734
735 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
736 let instruments_cache = self.instruments.clone();
737 let sender = self.data_sender.clone();
738 let http = self.http_client.clone();
739 let instrument_id = request.instrument_id;
740 let request_id = request.request_id;
741 let client_id = request.client_id.unwrap_or(self.client_id);
742 let start = request.start;
743 let end = request.end;
744 let params = request.params.clone();
745 let clock = self.clock;
746 let start_nanos = datetime_to_unix_nanos(start);
747 let end_nanos = datetime_to_unix_nanos(end);
748
749 get_runtime().spawn(async move {
750 let symbol = Ustr::from(instrument_id.symbol.as_str());
752 let instrument = if let Some(cached) = instruments_cache.get(&symbol) {
753 tracing::debug!("Found instrument {instrument_id} in cache");
754 Some(cached.clone())
755 } else {
756 tracing::debug!("Instrument {instrument_id} not in cache, fetching from API");
758 match http.request_instruments(None, None, None).await {
759 Ok(instruments) => {
760 for inst in &instruments {
762 upsert_instrument(&instruments_cache, inst.clone());
763 }
764 instruments.into_iter().find(|i| i.id() == instrument_id)
766 }
767 Err(e) => {
768 tracing::error!("Failed to fetch instruments from dYdX: {e:?}");
769 None
770 }
771 }
772 };
773
774 if let Some(inst) = instrument {
775 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
776 request_id,
777 client_id,
778 instrument_id,
779 inst,
780 start_nanos,
781 end_nanos,
782 clock.get_time_ns(),
783 params,
784 )));
785
786 if let Err(e) = sender.send(DataEvent::Response(response)) {
787 tracing::error!("Failed to send instrument response: {e}");
788 }
789 } else {
790 tracing::error!("Instrument {instrument_id} not found");
791 }
792 });
793
794 Ok(())
795 }
796
797 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
798 let http = self.http_client.clone();
799 let sender = self.data_sender.clone();
800 let instruments_cache = self.instruments.clone();
801 let request_id = request.request_id;
802 let client_id = request.client_id.unwrap_or(self.client_id);
803 let venue = self.venue();
804 let start = request.start;
805 let end = request.end;
806 let params = request.params.clone();
807 let clock = self.clock;
808 let start_nanos = datetime_to_unix_nanos(start);
809 let end_nanos = datetime_to_unix_nanos(end);
810
811 get_runtime().spawn(async move {
812 match http.request_instruments(None, None, None).await {
813 Ok(instruments) => {
814 tracing::info!("Fetched {} instruments from dYdX", instruments.len());
815
816 for instrument in &instruments {
818 upsert_instrument(&instruments_cache, instrument.clone());
819 }
820
821 let response = DataResponse::Instruments(InstrumentsResponse::new(
822 request_id,
823 client_id,
824 venue,
825 instruments,
826 start_nanos,
827 end_nanos,
828 clock.get_time_ns(),
829 params,
830 ));
831
832 if let Err(e) = sender.send(DataEvent::Response(response)) {
833 tracing::error!("Failed to send instruments response: {e}");
834 }
835 }
836 Err(e) => {
837 tracing::error!("Failed to fetch instruments from dYdX: {e:?}");
838
839 let response = DataResponse::Instruments(InstrumentsResponse::new(
841 request_id,
842 client_id,
843 venue,
844 Vec::new(),
845 start_nanos,
846 end_nanos,
847 clock.get_time_ns(),
848 params,
849 ));
850
851 if let Err(e) = sender.send(DataEvent::Response(response)) {
852 tracing::error!("Failed to send empty instruments response: {e}");
853 }
854 }
855 }
856 });
857
858 Ok(())
859 }
860
861 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
862 use nautilus_model::{
863 data::TradeTick,
864 enums::{AggressorSide, OrderSide},
865 identifiers::TradeId,
866 };
867
868 let http = self.http_client.clone();
869 let instruments = self.instruments.clone();
870 let sender = self.data_sender.clone();
871 let instrument_id = request.instrument_id;
872 let start = request.start;
873 let end = request.end;
874 let limit = request.limit.map(|n| n.get() as u32);
875 let request_id = request.request_id;
876 let client_id = request.client_id.unwrap_or(self.client_id);
877 let params = request.params.clone();
878 let clock = self.clock;
879 let start_nanos = datetime_to_unix_nanos(start);
880 let end_nanos = datetime_to_unix_nanos(end);
881
882 get_runtime().spawn(async move {
883 let ticker = instrument_id
887 .symbol
888 .as_str()
889 .trim_end_matches("-PERP")
890 .to_string();
891
892 let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
894 Some(inst) => inst.clone(),
895 None => {
896 tracing::error!(
897 "request_trades: instrument {} not found in cache; cannot convert trades",
898 instrument_id
899 );
900 let ts_now = clock.get_time_ns();
901 let response = DataResponse::Trades(TradesResponse::new(
902 request_id,
903 client_id,
904 instrument_id,
905 Vec::new(),
906 start_nanos,
907 end_nanos,
908 ts_now,
909 params,
910 ));
911 if let Err(e) = sender.send(DataEvent::Response(response)) {
912 tracing::error!("Failed to send empty trades response: {e}");
913 }
914 return;
915 }
916 };
917
918 let price_precision = instrument.price_precision();
919 let size_precision = instrument.size_precision();
920
921 match http
922 .inner
923 .get_trades(&ticker, limit)
924 .await
925 .context("failed to request trades from dYdX")
926 {
927 Ok(trades_response) => {
928 let mut ticks = Vec::new();
929
930 for trade in trades_response.trades {
931 let aggressor_side = match trade.side {
932 OrderSide::Buy => AggressorSide::Buyer,
933 OrderSide::Sell => AggressorSide::Seller,
934 _ => continue, };
936
937 let price = match Price::from_decimal_dp(trade.price, price_precision) {
938 Ok(p) => p,
939 Err(e) => {
940 tracing::warn!(
941 "request_trades: failed to convert price for trade {}: {e}",
942 trade.id
943 );
944 continue;
945 }
946 };
947
948 let size = match Quantity::from_decimal_dp(trade.size, size_precision) {
949 Ok(q) => q,
950 Err(e) => {
951 tracing::warn!(
952 "request_trades: failed to convert size for trade {}: {e}",
953 trade.id
954 );
955 continue;
956 }
957 };
958
959 let ts_event = match trade.created_at.timestamp_nanos_opt() {
960 Some(ns) if ns >= 0 => UnixNanos::from(ns as u64),
961 _ => {
962 tracing::warn!(
963 "request_trades: timestamp out of range for trade {}",
964 trade.id
965 );
966 continue;
967 }
968 };
969
970 if let Some(start_ts) = start_nanos
972 && ts_event < start_ts
973 {
974 continue;
975 }
976 if let Some(end_ts) = end_nanos
977 && ts_event > end_ts
978 {
979 continue;
980 }
981
982 let tick = TradeTick::new(
983 instrument_id,
984 price,
985 size,
986 aggressor_side,
987 TradeId::new(&trade.id),
988 ts_event,
989 clock.get_time_ns(),
990 );
991 ticks.push(tick);
992 }
993
994 let response = DataResponse::Trades(TradesResponse::new(
995 request_id,
996 client_id,
997 instrument_id,
998 ticks,
999 start_nanos,
1000 end_nanos,
1001 clock.get_time_ns(),
1002 params,
1003 ));
1004
1005 if let Err(e) = sender.send(DataEvent::Response(response)) {
1006 tracing::error!("Failed to send trades response: {e}");
1007 }
1008 }
1009 Err(e) => {
1010 tracing::error!("Trade request failed for {}: {e:?}", instrument_id);
1011
1012 let response = DataResponse::Trades(TradesResponse::new(
1013 request_id,
1014 client_id,
1015 instrument_id,
1016 Vec::new(),
1017 start_nanos,
1018 end_nanos,
1019 clock.get_time_ns(),
1020 params,
1021 ));
1022
1023 if let Err(e) = sender.send(DataEvent::Response(response)) {
1024 tracing::error!("Failed to send empty trades response: {e}");
1025 }
1026 }
1027 }
1028 });
1029
1030 Ok(())
1031 }
1032
1033 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1034 use nautilus_model::enums::{AggregationSource, BarAggregation, PriceType};
1035
1036 const DYDX_MAX_BARS_PER_REQUEST: u32 = 1_000;
1037
1038 let bar_type = request.bar_type;
1039 let spec = bar_type.spec();
1040
1041 if bar_type.aggregation_source() != AggregationSource::External {
1043 anyhow::bail!(
1044 "dYdX only supports EXTERNAL aggregation, got {:?}",
1045 bar_type.aggregation_source()
1046 );
1047 }
1048
1049 if spec.price_type != PriceType::Last {
1050 anyhow::bail!(
1051 "dYdX only supports LAST price type, got {:?}",
1052 spec.price_type
1053 );
1054 }
1055
1056 let resolution = match spec.step.get() {
1058 1 => match spec.aggregation {
1059 BarAggregation::Minute => "1MIN",
1060 BarAggregation::Hour => "1HOUR",
1061 BarAggregation::Day => "1DAY",
1062 _ => {
1063 anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation);
1064 }
1065 },
1066 5 if spec.aggregation == BarAggregation::Minute => "5MINS",
1067 15 if spec.aggregation == BarAggregation::Minute => "15MINS",
1068 30 if spec.aggregation == BarAggregation::Minute => "30MINS",
1069 4 if spec.aggregation == BarAggregation::Hour => "4HOURS",
1070 step => {
1071 anyhow::bail!("Unsupported bar step: {step}");
1072 }
1073 };
1074
1075 let http = self.http_client.clone();
1076 let instruments = self.instruments.clone();
1077 let sender = self.data_sender.clone();
1078 let instrument_id = bar_type.instrument_id();
1079 let symbol = instrument_id
1081 .symbol
1082 .as_str()
1083 .trim_end_matches("-PERP")
1084 .to_string();
1085 let request_id = request.request_id;
1086 let client_id = request.client_id.unwrap_or(self.client_id);
1087 let params = request.params.clone();
1088 let clock = self.clock;
1089
1090 let start = request.start;
1091 let end = request.end;
1092 let overall_limit = request.limit.map(|n| n.get() as u32);
1093
1094 let start_nanos = datetime_to_unix_nanos(start);
1096 let end_nanos = datetime_to_unix_nanos(end);
1097
1098 let resolution_enum = match resolution {
1100 "1MIN" => crate::common::enums::DydxCandleResolution::OneMinute,
1101 "5MINS" => crate::common::enums::DydxCandleResolution::FiveMinutes,
1102 "15MINS" => crate::common::enums::DydxCandleResolution::FifteenMinutes,
1103 "30MINS" => crate::common::enums::DydxCandleResolution::ThirtyMinutes,
1104 "1HOUR" => crate::common::enums::DydxCandleResolution::OneHour,
1105 "4HOURS" => crate::common::enums::DydxCandleResolution::FourHours,
1106 "1DAY" => crate::common::enums::DydxCandleResolution::OneDay,
1107 _ => {
1108 anyhow::bail!("Unsupported resolution: {resolution}");
1109 }
1110 };
1111
1112 get_runtime().spawn(async move {
1113 let bar_secs: i64 = match spec.aggregation {
1115 BarAggregation::Minute => spec.step.get() as i64 * 60,
1116 BarAggregation::Hour => spec.step.get() as i64 * 3_600,
1117 BarAggregation::Day => spec.step.get() as i64 * 86_400,
1118 _ => {
1119 tracing::error!(
1120 "Unsupported aggregation for request_bars: {:?}",
1121 spec.aggregation
1122 );
1123 return;
1124 }
1125 };
1126
1127 let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
1129 Some(inst) => inst.clone(),
1130 None => {
1131 tracing::error!(
1132 "request_bars: instrument {} not found in cache; cannot convert candles",
1133 instrument_id
1134 );
1135 let ts_now = clock.get_time_ns();
1136 let response = DataResponse::Bars(BarsResponse::new(
1137 request_id,
1138 client_id,
1139 bar_type,
1140 Vec::new(),
1141 start_nanos,
1142 end_nanos,
1143 ts_now,
1144 params,
1145 ));
1146 if let Err(e) = sender.send(DataEvent::Response(response)) {
1147 tracing::error!("Failed to send empty bars response: {e}");
1148 }
1149 return;
1150 }
1151 };
1152
1153 let price_precision = instrument.price_precision();
1154 let size_precision = instrument.size_precision();
1155
1156 let mut all_bars: Vec<Bar> = Vec::new();
1157
1158 let (range_start, range_end) = match (start, end) {
1160 (Some(s), Some(e)) if e > s => (s, e),
1161 _ => {
1162 let limit = overall_limit.unwrap_or(DYDX_MAX_BARS_PER_REQUEST);
1163 match http
1164 .inner
1165 .get_candles(&symbol, resolution_enum, Some(limit), None, None)
1166 .await
1167 {
1168 Ok(candles_response) => {
1169 tracing::debug!(
1170 "request_bars fetched {} candles without explicit date range",
1171 candles_response.candles.len()
1172 );
1173
1174 for candle in &candles_response.candles {
1175 match Self::candle_to_bar(
1176 candle,
1177 bar_type,
1178 price_precision,
1179 size_precision,
1180 bar_secs,
1181 clock,
1182 ) {
1183 Ok(bar) => all_bars.push(bar),
1184 Err(e) => {
1185 tracing::warn!(
1186 "Failed to convert dYdX candle to bar for {}: {e}",
1187 instrument_id
1188 );
1189 }
1190 }
1191 }
1192
1193 let current_time_ns = clock.get_time_ns();
1194 all_bars.retain(|bar| bar.ts_event < current_time_ns);
1195
1196 let response = DataResponse::Bars(BarsResponse::new(
1197 request_id,
1198 client_id,
1199 bar_type,
1200 all_bars,
1201 start_nanos,
1202 end_nanos,
1203 current_time_ns,
1204 params,
1205 ));
1206
1207 if let Err(e) = sender.send(DataEvent::Response(response)) {
1208 tracing::error!("Failed to send bars response: {e}");
1209 }
1210 }
1211 Err(e) => {
1212 tracing::error!(
1213 "Failed to request candles for {symbol} without date range: {e:?}"
1214 );
1215 }
1216 }
1217 return;
1218 }
1219 };
1220
1221 let total_secs = (range_end - range_start).num_seconds().max(0);
1223 let expected_bars = (total_secs / bar_secs).max(1) as u64;
1224
1225 tracing::debug!(
1226 "request_bars range {:?} -> {:?}, expected_bars ~= {}",
1227 range_start,
1228 range_end,
1229 expected_bars
1230 );
1231
1232 let mut remaining = overall_limit.unwrap_or(u32::MAX);
1233
1234 let bars_per_call = DYDX_MAX_BARS_PER_REQUEST.min(remaining);
1236 let chunk_duration = chrono::Duration::seconds(bar_secs * bars_per_call as i64);
1237
1238 let mut chunk_start = range_start;
1239
1240 while chunk_start < range_end && remaining > 0 {
1241 let mut chunk_end = chunk_start + chunk_duration;
1242 if chunk_end > range_end {
1243 chunk_end = range_end;
1244 }
1245
1246 let per_call_limit = remaining.min(DYDX_MAX_BARS_PER_REQUEST);
1247
1248 tracing::debug!(
1249 "request_bars chunk: {} -> {}, limit={}",
1250 chunk_start,
1251 chunk_end,
1252 per_call_limit
1253 );
1254
1255 match http
1256 .inner
1257 .get_candles(
1258 &symbol,
1259 resolution_enum,
1260 Some(per_call_limit),
1261 Some(chunk_start),
1262 Some(chunk_end),
1263 )
1264 .await
1265 {
1266 Ok(candles_response) => {
1267 let count = candles_response.candles.len() as u32;
1268
1269 if count == 0 {
1270 break;
1272 }
1273
1274 for candle in &candles_response.candles {
1276 match Self::candle_to_bar(
1277 candle,
1278 bar_type,
1279 price_precision,
1280 size_precision,
1281 bar_secs,
1282 clock,
1283 ) {
1284 Ok(bar) => all_bars.push(bar),
1285 Err(e) => {
1286 tracing::warn!(
1287 "Failed to convert dYdX candle to bar for {}: {e}",
1288 instrument_id
1289 );
1290 }
1291 }
1292 }
1293
1294 if remaining <= count {
1295 break;
1296 } else {
1297 remaining -= count;
1298 }
1299 }
1300 Err(e) => {
1301 tracing::error!(
1302 "Failed to request candles for {symbol} in chunk {:?} -> {:?}: {e:?}",
1303 chunk_start,
1304 chunk_end
1305 );
1306 break;
1307 }
1308 }
1309
1310 chunk_start += chunk_duration;
1311 }
1312
1313 tracing::debug!("request_bars completed partitioned fetch for {}", bar_type);
1314
1315 let current_time_ns = clock.get_time_ns();
1317 all_bars.retain(|bar| bar.ts_event < current_time_ns);
1318
1319 tracing::debug!(
1320 "request_bars filtered to {} completed bars (current_time_ns={})",
1321 all_bars.len(),
1322 current_time_ns
1323 );
1324
1325 let response = DataResponse::Bars(BarsResponse::new(
1326 request_id,
1327 client_id,
1328 bar_type,
1329 all_bars,
1330 start_nanos,
1331 end_nanos,
1332 current_time_ns,
1333 params,
1334 ));
1335
1336 if let Err(e) = sender.send(DataEvent::Response(response)) {
1337 tracing::error!("Failed to send bars response: {e}");
1338 }
1339 });
1340
1341 Ok(())
1342 }
1343}
1344
1345fn upsert_instrument(cache: &Arc<DashMap<Ustr, InstrumentAny>>, instrument: InstrumentAny) {
1347 let symbol = Ustr::from(instrument.id().symbol.as_str());
1348 cache.insert(symbol, instrument);
1349}
1350
1351impl DydxDataClient {
1352 pub fn start_instrument_refresh_task(&mut self) -> anyhow::Result<()> {
1361 let interval_secs = match self.config.instrument_refresh_interval_secs {
1362 Some(secs) if secs > 0 => secs,
1363 _ => {
1364 tracing::info!("Instrument refresh disabled (interval not configured)");
1365 return Ok(());
1366 }
1367 };
1368
1369 let interval = Duration::from_secs(interval_secs);
1370 let http_client = self.http_client.clone();
1371 let instruments_cache = self.instruments.clone();
1372 let ws_client = self.ws_client.clone();
1373 let cancellation_token = self.cancellation_token.clone();
1374
1375 tracing::info!(
1376 "Starting instrument refresh task (interval: {}s)",
1377 interval_secs
1378 );
1379
1380 let task = get_runtime().spawn(async move {
1381 let mut interval_timer = tokio::time::interval(interval);
1382 interval_timer.tick().await; loop {
1385 tokio::select! {
1386 _ = cancellation_token.cancelled() => {
1387 tracing::info!("Instrument refresh task cancelled");
1388 break;
1389 }
1390 _ = interval_timer.tick() => {
1391 tracing::debug!("Refreshing instruments");
1392
1393 match http_client.fetch_and_cache_instruments().await {
1395 Ok(()) => {
1396 let instruments: Vec<_> = http_client
1397 .instruments()
1398 .iter()
1399 .map(|entry| entry.value().clone())
1400 .collect();
1401
1402 tracing::debug!("Refreshed {} instruments", instruments.len());
1403
1404 for instrument in &instruments {
1405 upsert_instrument(&instruments_cache, instrument.clone());
1406 }
1407
1408 if let Some(ref ws) = ws_client {
1410 ws.cache_instruments(instruments);
1411 }
1412 }
1413 Err(e) => {
1414 tracing::error!("Failed to refresh instruments: {}", e);
1415 }
1416 }
1417 }
1418 }
1419 }
1420 });
1421
1422 self.tasks.push(task);
1423 Ok(())
1424 }
1425
1426 fn start_orderbook_refresh_task(&mut self) -> anyhow::Result<()> {
1436 let interval_secs = match self.config.orderbook_refresh_interval_secs {
1437 Some(secs) if secs > 0 => secs,
1438 _ => {
1439 tracing::info!("Orderbook snapshot refresh disabled (interval not configured)");
1440 return Ok(());
1441 }
1442 };
1443
1444 let interval = Duration::from_secs(interval_secs);
1445 let http_client = self.http_client.clone();
1446 let instruments = self.instruments.clone();
1447 let order_books = self.order_books.clone();
1448 let active_subs = self.active_orderbook_subs.clone();
1449 let cancellation_token = self.cancellation_token.clone();
1450 let data_sender = self.data_sender.clone();
1451
1452 tracing::info!(
1453 "Starting orderbook snapshot refresh task (interval: {}s)",
1454 interval_secs
1455 );
1456
1457 let task = get_runtime().spawn(async move {
1458 let mut interval_timer = tokio::time::interval(interval);
1459 interval_timer.tick().await; loop {
1462 tokio::select! {
1463 _ = cancellation_token.cancelled() => {
1464 tracing::info!("Orderbook refresh task cancelled");
1465 break;
1466 }
1467 _ = interval_timer.tick() => {
1468 let active_instruments: Vec<InstrumentId> = active_subs
1469 .iter()
1470 .map(|entry| *entry.key())
1471 .collect();
1472
1473 if active_instruments.is_empty() {
1474 tracing::debug!("No active orderbook subscriptions to refresh");
1475 continue;
1476 }
1477
1478 tracing::debug!(
1479 "Refreshing {} orderbook snapshots",
1480 active_instruments.len()
1481 );
1482
1483 for instrument_id in active_instruments {
1484 let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
1486 Some(inst) => inst.clone(),
1487 None => {
1488 tracing::warn!(
1489 "Cannot refresh orderbook: no instrument for {}",
1490 instrument_id
1491 );
1492 continue;
1493 }
1494 };
1495
1496 let symbol = instrument_id.symbol.as_str().trim_end_matches("-PERP");
1498 let snapshot_result = http_client.inner.get_orderbook(symbol).await;
1499
1500 let snapshot = match snapshot_result {
1501 Ok(s) => s,
1502 Err(e) => {
1503 tracing::error!(
1504 "Failed to fetch orderbook snapshot for {}: {}",
1505 instrument_id,
1506 e
1507 );
1508 continue;
1509 }
1510 };
1511
1512 let deltas_result = Self::parse_orderbook_snapshot(
1514 instrument_id,
1515 &snapshot,
1516 &instrument,
1517 );
1518
1519 let deltas = match deltas_result {
1520 Ok(d) => d,
1521 Err(e) => {
1522 tracing::error!(
1523 "Failed to parse orderbook snapshot for {}: {}",
1524 instrument_id,
1525 e
1526 );
1527 continue;
1528 }
1529 };
1530
1531 if let Some(mut book) = order_books.get_mut(&instrument_id) {
1533 if let Err(e) = book.apply_deltas(&deltas) {
1534 tracing::error!(
1535 "Failed to apply orderbook snapshot for {}: {}",
1536 instrument_id,
1537 e
1538 );
1539 continue;
1540 }
1541
1542 tracing::debug!(
1543 "Refreshed orderbook snapshot for {} (bid={:?}, ask={:?})",
1544 instrument_id,
1545 book.best_bid_price(),
1546 book.best_ask_price()
1547 );
1548 }
1549
1550 let data = NautilusData::from(OrderBookDeltas_API::new(deltas));
1552 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1553 tracing::error!("Failed to emit orderbook snapshot: {}", e);
1554 }
1555 }
1556 }
1557 }
1558 }
1559 });
1560
1561 self.tasks.push(task);
1562 Ok(())
1563 }
1564
1565 fn parse_orderbook_snapshot(
1569 instrument_id: InstrumentId,
1570 snapshot: &crate::http::models::OrderbookResponse,
1571 instrument: &InstrumentAny,
1572 ) -> anyhow::Result<OrderBookDeltas> {
1573 use nautilus_model::{
1574 data::{BookOrder, OrderBookDelta},
1575 enums::{BookAction, OrderSide, RecordFlag},
1576 instruments::Instrument,
1577 types::{Price, Quantity},
1578 };
1579
1580 let ts_init = get_atomic_clock_realtime().get_time_ns();
1581 let mut deltas = Vec::new();
1582
1583 deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init));
1585
1586 let price_precision = instrument.price_precision();
1587 let size_precision = instrument.size_precision();
1588
1589 let bids_len = snapshot.bids.len();
1590 let asks_len = snapshot.asks.len();
1591
1592 for (idx, bid) in snapshot.bids.iter().enumerate() {
1594 let is_last = idx == bids_len - 1 && asks_len == 0;
1595 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1596
1597 let price = Price::from_decimal_dp(bid.price, price_precision)
1598 .context("failed to parse bid price")?;
1599 let size = Quantity::from_decimal_dp(bid.size, size_precision)
1600 .context("failed to parse bid size")?;
1601
1602 let order = BookOrder::new(OrderSide::Buy, price, size, 0);
1603 deltas.push(OrderBookDelta::new(
1604 instrument_id,
1605 BookAction::Add,
1606 order,
1607 flags,
1608 0,
1609 ts_init,
1610 ts_init,
1611 ));
1612 }
1613
1614 for (idx, ask) in snapshot.asks.iter().enumerate() {
1616 let is_last = idx == asks_len - 1;
1617 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1618
1619 let price = Price::from_decimal_dp(ask.price, price_precision)
1620 .context("failed to parse ask price")?;
1621 let size = Quantity::from_decimal_dp(ask.size, size_precision)
1622 .context("failed to parse ask size")?;
1623
1624 let order = BookOrder::new(OrderSide::Sell, price, size, 0);
1625 deltas.push(OrderBookDelta::new(
1626 instrument_id,
1627 BookAction::Add,
1628 order,
1629 flags,
1630 0,
1631 ts_init,
1632 ts_init,
1633 ));
1634 }
1635
1636 Ok(OrderBookDeltas::new(instrument_id, deltas))
1637 }
1638
1639 #[must_use]
1641 pub fn get_instrument(&self, symbol: &str) -> Option<InstrumentAny> {
1642 self.instruments.get(&Ustr::from(symbol)).map(|i| i.clone())
1643 }
1644
1645 #[must_use]
1647 pub fn get_instruments(&self) -> Vec<InstrumentAny> {
1648 self.instruments.iter().map(|i| i.clone()).collect()
1649 }
1650
1651 fn ensure_order_book(&self, instrument_id: InstrumentId, book_type: BookType) {
1652 self.order_books
1653 .entry(instrument_id)
1654 .or_insert_with(|| OrderBook::new(instrument_id, book_type));
1655 }
1656
1657 #[must_use]
1659 pub fn get_bar_type_for_topic(&self, topic: &str) -> Option<BarType> {
1660 self.bar_type_mappings
1661 .get(topic)
1662 .map(|entry| *entry.value())
1663 }
1664
1665 #[must_use]
1667 pub fn get_bar_topics(&self) -> Vec<String> {
1668 self.bar_type_mappings
1669 .iter()
1670 .map(|entry| entry.key().clone())
1671 .collect()
1672 }
1673
1674 fn candle_to_bar(
1681 candle: &crate::http::models::Candle,
1682 bar_type: BarType,
1683 price_precision: u8,
1684 size_precision: u8,
1685 bar_secs: i64,
1686 clock: &AtomicTime,
1687 ) -> anyhow::Result<Bar> {
1688 use anyhow::Context;
1689
1690 let ts_init =
1692 datetime_to_unix_nanos(Some(candle.started_at)).unwrap_or_else(|| clock.get_time_ns());
1693
1694 let ts_event_ns = ts_init
1696 .as_u64()
1697 .saturating_add((bar_secs as u64).saturating_mul(1_000_000_000));
1698 let ts_event = UnixNanos::from(ts_event_ns);
1699
1700 let open = Price::from_decimal_dp(candle.open, price_precision)
1701 .context("failed to parse candle open price")?;
1702 let high = Price::from_decimal_dp(candle.high, price_precision)
1703 .context("failed to parse candle high price")?;
1704 let low = Price::from_decimal_dp(candle.low, price_precision)
1705 .context("failed to parse candle low price")?;
1706 let close = Price::from_decimal_dp(candle.close, price_precision)
1707 .context("failed to parse candle close price")?;
1708
1709 let volume = Quantity::from_decimal_dp(candle.base_token_volume, size_precision)
1711 .context("failed to parse candle base_token_volume")?;
1712
1713 Ok(Bar::new(
1714 bar_type, open, high, low, close, volume, ts_event, ts_init,
1715 ))
1716 }
1717
1718 fn handle_ws_message(
1719 message: crate::websocket::enums::NautilusWsMessage,
1720 ctx: &WsMessageContext,
1721 ) {
1722 match message {
1723 crate::websocket::enums::NautilusWsMessage::Data(payloads) => {
1724 Self::handle_data_message(payloads, ctx.data_sender, ctx.incomplete_bars);
1725 }
1726 crate::websocket::enums::NautilusWsMessage::Deltas(deltas) => {
1727 Self::handle_deltas_message(
1728 *deltas,
1729 ctx.data_sender,
1730 ctx.order_books,
1731 ctx.last_quotes,
1732 ctx.instruments,
1733 );
1734 }
1735 crate::websocket::enums::NautilusWsMessage::OraclePrices(oracle_prices) => {
1736 Self::handle_oracle_prices(oracle_prices, ctx.instruments, ctx.data_sender);
1737 }
1738 crate::websocket::enums::NautilusWsMessage::Error(err) => {
1739 tracing::error!("dYdX WS error: {err}");
1740 }
1741 crate::websocket::enums::NautilusWsMessage::Reconnected => {
1742 tracing::info!("dYdX WS reconnected - re-subscribing to active subscriptions");
1743
1744 if let Some(ws) = ctx.ws_client {
1746 let total_subs = ctx.active_orderbook_subs.len()
1747 + ctx.active_trade_subs.len()
1748 + ctx.active_bar_subs.len();
1749
1750 if total_subs == 0 {
1751 tracing::debug!("No active subscriptions to restore");
1752 return;
1753 }
1754
1755 tracing::info!(
1756 "Restoring {} subscriptions (orderbook={}, trades={}, bars={})",
1757 total_subs,
1758 ctx.active_orderbook_subs.len(),
1759 ctx.active_trade_subs.len(),
1760 ctx.active_bar_subs.len()
1761 );
1762
1763 for entry in ctx.active_orderbook_subs.iter() {
1765 let instrument_id = *entry.key();
1766 let ws_clone = ws.clone();
1767 get_runtime().spawn(async move {
1768 if let Err(e) = ws_clone.subscribe_orderbook(instrument_id).await {
1769 tracing::error!(
1770 "Failed to re-subscribe to orderbook for {instrument_id}: {e:?}"
1771 );
1772 } else {
1773 tracing::debug!("Re-subscribed to orderbook for {instrument_id}");
1774 }
1775 });
1776 }
1777
1778 for entry in ctx.active_trade_subs.iter() {
1780 let instrument_id = *entry.key();
1781 let ws_clone = ws.clone();
1782 get_runtime().spawn(async move {
1783 if let Err(e) = ws_clone.subscribe_trades(instrument_id).await {
1784 tracing::error!(
1785 "Failed to re-subscribe to trades for {instrument_id}: {e:?}"
1786 );
1787 } else {
1788 tracing::debug!("Re-subscribed to trades for {instrument_id}");
1789 }
1790 });
1791 }
1792
1793 for entry in ctx.active_bar_subs.iter() {
1795 let (instrument_id, resolution) = entry.key();
1796 let instrument_id = *instrument_id;
1797 let resolution = resolution.clone();
1798 let bar_type = *entry.value();
1799 let ws_clone = ws.clone();
1800
1801 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1803 let topic = format!("{ticker}/{resolution}");
1804 if let Err(e) = ws.send_command(
1805 crate::websocket::handler::HandlerCommand::RegisterBarType {
1806 topic,
1807 bar_type,
1808 },
1809 ) {
1810 tracing::warn!(
1811 "Failed to re-register bar type for {instrument_id} ({resolution}): {e}"
1812 );
1813 }
1814
1815 get_runtime().spawn(async move {
1816 if let Err(e) =
1817 ws_clone.subscribe_candles(instrument_id, &resolution).await
1818 {
1819 tracing::error!(
1820 "Failed to re-subscribe to candles for {instrument_id} ({resolution}): {e:?}"
1821 );
1822 } else {
1823 tracing::debug!(
1824 "Re-subscribed to candles for {instrument_id} ({resolution})"
1825 );
1826 }
1827 });
1828 }
1829
1830 tracing::info!("Completed re-subscription requests after reconnection");
1831 } else {
1832 tracing::warn!("WebSocket client not available for re-subscription");
1833 }
1834 }
1835 crate::websocket::enums::NautilusWsMessage::BlockHeight(_) => {
1836 tracing::debug!(
1837 "Ignoring block height message on dYdX data client (handled by execution adapter)"
1838 );
1839 }
1840 crate::websocket::enums::NautilusWsMessage::Order(_)
1841 | crate::websocket::enums::NautilusWsMessage::Fill(_)
1842 | crate::websocket::enums::NautilusWsMessage::Position(_)
1843 | crate::websocket::enums::NautilusWsMessage::AccountState(_)
1844 | crate::websocket::enums::NautilusWsMessage::SubaccountSubscribed(_)
1845 | crate::websocket::enums::NautilusWsMessage::SubaccountsChannelData(_) => {
1846 tracing::debug!(
1847 "Ignoring execution/subaccount message on dYdX data client (handled by execution adapter)"
1848 );
1849 }
1850 }
1851 }
1852
1853 fn handle_data_message(
1854 payloads: Vec<NautilusData>,
1855 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1856 incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1857 ) {
1858 for data in payloads {
1859 if let NautilusData::Bar(bar) = data {
1861 Self::handle_bar_message(bar, data_sender, incomplete_bars);
1862 } else if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1863 tracing::error!("Failed to emit data event: {e}");
1864 }
1865 }
1866 }
1867
1868 fn handle_bar_message(
1875 bar: Bar,
1876 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1877 incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1878 ) {
1879 let current_time_ns = get_atomic_clock_realtime().get_time_ns();
1880 let bar_type = bar.bar_type;
1881
1882 if bar.ts_event <= current_time_ns {
1883 incomplete_bars.remove(&bar_type);
1885 if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Bar(bar))) {
1886 tracing::error!("Failed to emit completed bar: {e}");
1887 }
1888 } else {
1889 tracing::trace!(
1891 "Caching incomplete bar for {} (ts_event={}, current={})",
1892 bar_type,
1893 bar.ts_event,
1894 current_time_ns
1895 );
1896 incomplete_bars.insert(bar_type, bar);
1897 }
1898 }
1899
1900 fn resolve_crossed_order_book(
1918 book: &mut OrderBook,
1919 venue_deltas: OrderBookDeltas,
1920 instrument: &InstrumentAny,
1921 ) -> anyhow::Result<OrderBookDeltas> {
1922 let instrument_id = venue_deltas.instrument_id;
1923 let ts_init = venue_deltas.ts_init;
1924 let mut all_deltas = venue_deltas.deltas.clone();
1925
1926 book.apply_deltas(&venue_deltas)?;
1928
1929 let mut is_crossed = if let (Some(bid_price), Some(ask_price)) =
1931 (book.best_bid_price(), book.best_ask_price())
1932 {
1933 bid_price >= ask_price
1934 } else {
1935 false
1936 };
1937
1938 while is_crossed {
1940 tracing::debug!(
1941 "Resolving crossed order book for {}: bid={:?} >= ask={:?}",
1942 instrument_id,
1943 book.best_bid_price(),
1944 book.best_ask_price()
1945 );
1946
1947 let bid_price = match book.best_bid_price() {
1948 Some(p) => p,
1949 None => break,
1950 };
1951 let ask_price = match book.best_ask_price() {
1952 Some(p) => p,
1953 None => break,
1954 };
1955 let bid_size = match book.best_bid_size() {
1956 Some(s) => s,
1957 None => break,
1958 };
1959 let ask_size = match book.best_ask_size() {
1960 Some(s) => s,
1961 None => break,
1962 };
1963
1964 let mut temp_deltas = Vec::new();
1965
1966 if bid_size > ask_size {
1967 let new_bid_size = Quantity::new(
1969 bid_size.as_f64() - ask_size.as_f64(),
1970 instrument.size_precision(),
1971 );
1972 temp_deltas.push(OrderBookDelta::new(
1973 instrument_id,
1974 BookAction::Update,
1975 BookOrder::new(OrderSide::Buy, bid_price, new_bid_size, 0),
1976 0,
1977 0,
1978 ts_init,
1979 ts_init,
1980 ));
1981 temp_deltas.push(OrderBookDelta::new(
1982 instrument_id,
1983 BookAction::Delete,
1984 BookOrder::new(
1985 OrderSide::Sell,
1986 ask_price,
1987 Quantity::new(0.0, instrument.size_precision()),
1988 0,
1989 ),
1990 0,
1991 0,
1992 ts_init,
1993 ts_init,
1994 ));
1995 } else if bid_size < ask_size {
1996 let new_ask_size = Quantity::new(
1998 ask_size.as_f64() - bid_size.as_f64(),
1999 instrument.size_precision(),
2000 );
2001 temp_deltas.push(OrderBookDelta::new(
2002 instrument_id,
2003 BookAction::Update,
2004 BookOrder::new(OrderSide::Sell, ask_price, new_ask_size, 0),
2005 0,
2006 0,
2007 ts_init,
2008 ts_init,
2009 ));
2010 temp_deltas.push(OrderBookDelta::new(
2011 instrument_id,
2012 BookAction::Delete,
2013 BookOrder::new(
2014 OrderSide::Buy,
2015 bid_price,
2016 Quantity::new(0.0, instrument.size_precision()),
2017 0,
2018 ),
2019 0,
2020 0,
2021 ts_init,
2022 ts_init,
2023 ));
2024 } else {
2025 temp_deltas.push(OrderBookDelta::new(
2027 instrument_id,
2028 BookAction::Delete,
2029 BookOrder::new(
2030 OrderSide::Buy,
2031 bid_price,
2032 Quantity::new(0.0, instrument.size_precision()),
2033 0,
2034 ),
2035 0,
2036 0,
2037 ts_init,
2038 ts_init,
2039 ));
2040 temp_deltas.push(OrderBookDelta::new(
2041 instrument_id,
2042 BookAction::Delete,
2043 BookOrder::new(
2044 OrderSide::Sell,
2045 ask_price,
2046 Quantity::new(0.0, instrument.size_precision()),
2047 0,
2048 ),
2049 0,
2050 0,
2051 ts_init,
2052 ts_init,
2053 ));
2054 }
2055
2056 let temp_deltas_obj = OrderBookDeltas::new(instrument_id, temp_deltas.clone());
2058 book.apply_deltas(&temp_deltas_obj)?;
2059 all_deltas.extend(temp_deltas);
2060
2061 is_crossed = if let (Some(bid_price), Some(ask_price)) =
2063 (book.best_bid_price(), book.best_ask_price())
2064 {
2065 bid_price >= ask_price
2066 } else {
2067 false
2068 };
2069 }
2070
2071 if let Some(last_delta) = all_deltas.last_mut() {
2073 last_delta.flags = RecordFlag::F_LAST as u8;
2074 }
2075
2076 Ok(OrderBookDeltas::new(instrument_id, all_deltas))
2077 }
2078
2079 fn handle_deltas_message(
2080 deltas: OrderBookDeltas,
2081 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
2082 order_books: &Arc<DashMap<InstrumentId, OrderBook>>,
2083 last_quotes: &Arc<DashMap<InstrumentId, QuoteTick>>,
2084 instruments: &Arc<DashMap<Ustr, InstrumentAny>>,
2085 ) {
2086 let instrument_id = deltas.instrument_id;
2087
2088 let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
2090 Some(inst) => inst.clone(),
2091 None => {
2092 tracing::error!(
2093 "Cannot resolve crossed order book: no instrument for {instrument_id}"
2094 );
2095 if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::from(
2097 OrderBookDeltas_API::new(deltas),
2098 ))) {
2099 tracing::error!("Failed to emit order book deltas: {e}");
2100 }
2101 return;
2102 }
2103 };
2104
2105 let mut book = order_books
2107 .entry(instrument_id)
2108 .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
2109
2110 let resolved_deltas = match Self::resolve_crossed_order_book(&mut book, deltas, &instrument)
2112 {
2113 Ok(d) => d,
2114 Err(e) => {
2115 tracing::error!("Failed to resolve crossed order book for {instrument_id}: {e}");
2116 return;
2117 }
2118 };
2119
2120 let quote_opt = if let (Some(bid_price), Some(ask_price)) =
2123 (book.best_bid_price(), book.best_ask_price())
2124 && let (Some(bid_size), Some(ask_size)) = (book.best_bid_size(), book.best_ask_size())
2125 {
2126 Some(QuoteTick::new(
2127 instrument_id,
2128 bid_price,
2129 ask_price,
2130 bid_size,
2131 ask_size,
2132 resolved_deltas.ts_event,
2133 resolved_deltas.ts_init,
2134 ))
2135 } else {
2136 if book.best_bid_price().is_none() && book.best_ask_price().is_none() {
2138 tracing::debug!(
2139 "Empty orderbook for {instrument_id} after applying deltas, using last quote"
2140 );
2141 last_quotes.get(&instrument_id).map(|q| *q)
2142 } else {
2143 None
2144 }
2145 };
2146
2147 if let Some(quote) = quote_opt {
2148 let emit_quote =
2150 !matches!(last_quotes.get(&instrument_id), Some(existing) if *existing == quote);
2151
2152 if emit_quote {
2153 last_quotes.insert(instrument_id, quote);
2154 if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Quote(quote))) {
2155 tracing::error!("Failed to emit quote tick: {e}");
2156 }
2157 }
2158 } else if book.best_bid_price().is_some() || book.best_ask_price().is_some() {
2159 tracing::debug!(
2161 "Incomplete top-of-book for {instrument_id} (bid={:?}, ask={:?})",
2162 book.best_bid_price(),
2163 book.best_ask_price()
2164 );
2165 }
2166
2167 let data: NautilusData = OrderBookDeltas_API::new(resolved_deltas).into();
2169 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
2170 tracing::error!("Failed to emit order book deltas event: {e}");
2171 }
2172 }
2173
2174 fn handle_oracle_prices(
2175 oracle_prices: std::collections::HashMap<
2176 String,
2177 crate::websocket::messages::DydxOraclePriceMarket,
2178 >,
2179 instruments: &Arc<DashMap<Ustr, InstrumentAny>>,
2180 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
2181 ) {
2182 let ts_init = get_atomic_clock_realtime().get_time_ns();
2183
2184 for (symbol_str, oracle_market) in oracle_prices {
2185 let symbol = Ustr::from(&symbol_str);
2186
2187 let Some(instrument) = instruments.get(&symbol) else {
2189 tracing::debug!(
2190 symbol = %symbol,
2191 "Received oracle price for unknown instrument (not cached yet)"
2192 );
2193 continue;
2194 };
2195
2196 let instrument_id = instrument.id();
2197
2198 let oracle_price_str = &oracle_market.oracle_price;
2200 let Ok(oracle_price_f64) = oracle_price_str.parse::<f64>() else {
2201 tracing::error!(
2202 symbol = %symbol,
2203 price_str = %oracle_price_str,
2204 "Failed to parse oracle price as f64"
2205 );
2206 continue;
2207 };
2208
2209 let price_precision = instrument.price_precision();
2210 let oracle_price = Price::from_raw(
2211 (oracle_price_f64 * 10_f64.powi(price_precision as i32)) as PriceRaw,
2212 price_precision,
2213 );
2214
2215 let oracle_price_event = DydxOraclePrice::new(
2216 instrument_id,
2217 oracle_price,
2218 ts_init, ts_init,
2220 );
2221
2222 tracing::debug!(
2223 instrument_id = %instrument_id,
2224 oracle_price = %oracle_price,
2225 "Received dYdX oracle price: {oracle_price_event:?}"
2226 );
2227
2228 let data = NautilusData::IndexPriceUpdate(IndexPriceUpdate::new(
2229 instrument_id,
2230 oracle_price,
2231 ts_init, ts_init,
2233 ));
2234
2235 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
2236 tracing::error!("Failed to emit oracle price: {e}");
2237 }
2238 }
2239 }
2240}
2241
2242#[cfg(test)]
2243mod tests {
2244 use std::{collections::HashMap, net::SocketAddr, time::Duration};
2245
2246 use axum::{
2247 Router,
2248 extract::{Path, Query, State},
2249 response::Json,
2250 routing::get,
2251 };
2252 use chrono::Utc;
2253 use indexmap::IndexMap;
2254 use nautilus_common::{
2255 live::runner::set_data_event_sender,
2256 messages::{DataEvent, data::DataResponse},
2257 testing::wait_until_async,
2258 };
2259 use nautilus_core::UUID4;
2260 use nautilus_model::{
2261 data::{
2262 BarSpecification, BarType, Data as NautilusData, OrderBookDelta, OrderBookDeltas,
2263 TradeTick, order::BookOrder,
2264 },
2265 enums::{
2266 AggregationSource, AggressorSide, BarAggregation, BookAction, BookType, OrderSide,
2267 PriceType,
2268 },
2269 identifiers::{ClientId, InstrumentId, Symbol, Venue},
2270 instruments::{CryptoPerpetual, Instrument, InstrumentAny},
2271 orderbook::OrderBook,
2272 types::{Currency, Price, Quantity},
2273 };
2274 use rstest::rstest;
2275 use rust_decimal::Decimal;
2276 use rust_decimal_macros::dec;
2277 use tokio::net::{TcpListener, TcpStream};
2278
2279 use super::*;
2280 use crate::http::models::{Candle, CandlesResponse};
2281
2282 fn setup_test_env() {
2283 let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel();
2285 set_data_event_sender(sender);
2286 }
2287
2288 async fn wait_for_server(addr: SocketAddr) {
2289 wait_until_async(
2290 || async move { TcpStream::connect(addr).await.is_ok() },
2291 Duration::from_secs(5),
2292 )
2293 .await;
2294 }
2295
2296 #[rstest]
2297 fn test_new_data_client() {
2298 setup_test_env();
2299
2300 let client_id = ClientId::from("DYDX-001");
2301 let config = DydxDataClientConfig::default();
2302 let http_client = DydxHttpClient::default();
2303
2304 let client = DydxDataClient::new(client_id, config, http_client, None);
2305 assert!(client.is_ok());
2306
2307 let client = client.unwrap();
2308 assert_eq!(client.client_id(), client_id);
2309 assert_eq!(client.venue(), *DYDX_VENUE);
2310 assert!(!client.is_connected());
2311 }
2312
2313 #[tokio::test]
2314 async fn test_data_client_lifecycle() {
2315 setup_test_env();
2316
2317 let client_id = ClientId::from("DYDX-001");
2318 let config = DydxDataClientConfig::default();
2319 let http_client = DydxHttpClient::default();
2320
2321 let mut client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2322
2323 assert!(client.start().is_ok());
2325
2326 assert!(client.stop().is_ok());
2328 assert!(!client.is_connected());
2329
2330 assert!(client.reset().is_ok());
2332
2333 assert!(client.dispose().is_ok());
2335 }
2336
2337 #[rstest]
2338 fn test_subscribe_unsubscribe_instruments_noop() {
2339 setup_test_env();
2340
2341 let client_id = ClientId::from("DYDX-TEST");
2342 let config = DydxDataClientConfig::default();
2343 let http_client = DydxHttpClient::default();
2344
2345 let mut client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2346
2347 let venue = *DYDX_VENUE;
2348 let command_id = UUID4::new();
2349 let ts_init = get_atomic_clock_realtime().get_time_ns();
2350
2351 let subscribe = SubscribeInstruments {
2352 client_id: Some(client_id),
2353 venue,
2354 command_id,
2355 ts_init,
2356 params: None,
2357 };
2358 let unsubscribe = UnsubscribeInstruments::new(None, venue, command_id, ts_init, None);
2359
2360 assert!(client.subscribe_instruments(&subscribe).is_ok());
2362 assert!(client.unsubscribe_instruments(&unsubscribe).is_ok());
2363 }
2364
2365 #[rstest]
2366 fn test_bar_type_mappings_registration() {
2367 setup_test_env();
2368
2369 let client_id = ClientId::from("DYDX-TEST");
2370 let config = DydxDataClientConfig::default();
2371 let http_client = DydxHttpClient::default();
2372
2373 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2374
2375 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2376 let spec = BarSpecification {
2377 step: std::num::NonZeroUsize::new(1).unwrap(),
2378 aggregation: BarAggregation::Minute,
2379 price_type: PriceType::Last,
2380 };
2381 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2382
2383 assert!(client.get_bar_topics().is_empty());
2385 assert!(client.get_bar_type_for_topic("BTC-USD/1MIN").is_none());
2386
2387 client
2389 .bar_type_mappings
2390 .insert("BTC-USD/1MIN".to_string(), bar_type);
2391
2392 assert_eq!(client.get_bar_topics().len(), 1);
2394 assert!(
2395 client
2396 .get_bar_topics()
2397 .contains(&"BTC-USD/1MIN".to_string())
2398 );
2399 assert_eq!(
2400 client.get_bar_type_for_topic("BTC-USD/1MIN"),
2401 Some(bar_type)
2402 );
2403
2404 let spec_5min = BarSpecification {
2406 step: std::num::NonZeroUsize::new(5).unwrap(),
2407 aggregation: BarAggregation::Minute,
2408 price_type: PriceType::Last,
2409 };
2410 let bar_type_5min = BarType::new(instrument_id, spec_5min, AggregationSource::External);
2411 client
2412 .bar_type_mappings
2413 .insert("BTC-USD/5MINS".to_string(), bar_type_5min);
2414
2415 assert_eq!(client.get_bar_topics().len(), 2);
2417 assert_eq!(
2418 client.get_bar_type_for_topic("BTC-USD/5MINS"),
2419 Some(bar_type_5min)
2420 );
2421 }
2422
2423 #[rstest]
2424 fn test_bar_type_mappings_unregistration() {
2425 setup_test_env();
2426
2427 let client_id = ClientId::from("DYDX-TEST");
2428 let config = DydxDataClientConfig::default();
2429 let http_client = DydxHttpClient::default();
2430
2431 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2432
2433 let instrument_id = InstrumentId::from("ETH-USD-PERP.DYDX");
2434 let spec = BarSpecification {
2435 step: std::num::NonZeroUsize::new(1).unwrap(),
2436 aggregation: BarAggregation::Hour,
2437 price_type: PriceType::Last,
2438 };
2439 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2440
2441 client
2443 .bar_type_mappings
2444 .insert("ETH-USD/1HOUR".to_string(), bar_type);
2445 assert_eq!(client.get_bar_topics().len(), 1);
2446
2447 client.bar_type_mappings.remove("ETH-USD/1HOUR");
2449
2450 assert!(client.get_bar_topics().is_empty());
2452 assert!(client.get_bar_type_for_topic("ETH-USD/1HOUR").is_none());
2453 }
2454
2455 #[rstest]
2456 fn test_bar_type_mappings_lookup_nonexistent() {
2457 setup_test_env();
2458
2459 let client_id = ClientId::from("DYDX-TEST");
2460 let config = DydxDataClientConfig::default();
2461 let http_client = DydxHttpClient::default();
2462
2463 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2464
2465 assert!(client.get_bar_type_for_topic("NONEXISTENT/1MIN").is_none());
2467 assert!(client.get_bar_topics().is_empty());
2468 }
2469
2470 #[tokio::test]
2471 async fn test_handle_ws_message_deltas_updates_orderbook_and_emits_quote() {
2472 setup_test_env();
2473
2474 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2475 let instruments = Arc::new(DashMap::new());
2476 let order_books = Arc::new(DashMap::new());
2477 let last_quotes = Arc::new(DashMap::new());
2478 let ws_client: Option<DydxWebSocketClient> = None;
2479 let active_orderbook_subs = Arc::new(DashMap::new());
2480 let active_trade_subs = Arc::new(DashMap::new());
2481 let active_bar_subs = Arc::new(DashMap::new());
2482
2483 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2484 let bar_ts = get_atomic_clock_realtime().get_time_ns();
2485
2486 use nautilus_model::{identifiers::Symbol, instruments::CryptoPerpetual, types::Currency};
2488 let symbol = Symbol::from("BTC-USD-PERP");
2489 let instrument = CryptoPerpetual::new(
2490 instrument_id,
2491 symbol,
2492 Currency::BTC(),
2493 Currency::USD(),
2494 Currency::USD(),
2495 false,
2496 2,
2497 4,
2498 Price::from("0.01"),
2499 Quantity::from("0.0001"),
2500 None,
2501 None,
2502 None,
2503 None,
2504 None,
2505 None,
2506 None,
2507 None,
2508 None,
2509 None,
2510 None,
2511 None,
2512 bar_ts,
2513 bar_ts,
2514 );
2515 instruments.insert(
2516 Ustr::from("BTC-USD-PERP"),
2517 InstrumentAny::CryptoPerpetual(instrument),
2518 );
2519
2520 let price = Price::from("100.00");
2521 let size = Quantity::from("1.0");
2522
2523 let bid_delta = OrderBookDelta::new(
2525 instrument_id,
2526 BookAction::Add,
2527 BookOrder::new(OrderSide::Buy, price, size, 1),
2528 0,
2529 1,
2530 bar_ts,
2531 bar_ts,
2532 );
2533 let ask_delta = OrderBookDelta::new(
2534 instrument_id,
2535 BookAction::Add,
2536 BookOrder::new(OrderSide::Sell, Price::from("101.00"), size, 1),
2537 0,
2538 1,
2539 bar_ts,
2540 bar_ts,
2541 );
2542 let deltas = OrderBookDeltas::new(instrument_id, vec![bid_delta, ask_delta]);
2543
2544 let message = crate::websocket::enums::NautilusWsMessage::Deltas(Box::new(deltas));
2545
2546 let incomplete_bars = Arc::new(DashMap::new());
2547 let ctx = WsMessageContext {
2548 data_sender: &sender,
2549 instruments: &instruments,
2550 order_books: &order_books,
2551 last_quotes: &last_quotes,
2552 ws_client: &ws_client,
2553 active_orderbook_subs: &active_orderbook_subs,
2554 active_trade_subs: &active_trade_subs,
2555 active_bar_subs: &active_bar_subs,
2556 incomplete_bars: &incomplete_bars,
2557 };
2558 DydxDataClient::handle_ws_message(message, &ctx);
2559
2560 assert!(order_books.get(&instrument_id).is_some());
2562 assert!(last_quotes.get(&instrument_id).is_some());
2563
2564 let mut saw_quote = false;
2566 let mut saw_deltas = false;
2567
2568 while let Ok(event) = rx.try_recv() {
2569 if let DataEvent::Data(data) = event {
2570 match data {
2571 NautilusData::Quote(_) => saw_quote = true,
2572 NautilusData::Deltas(_) => saw_deltas = true,
2573 _ => {}
2574 }
2575 }
2576 }
2577
2578 assert!(saw_quote);
2579 assert!(saw_deltas);
2580 }
2581
2582 #[rstest]
2583 fn test_handle_ws_message_error_does_not_panic() {
2584 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2587 let instruments = Arc::new(DashMap::new());
2588 let order_books = Arc::new(DashMap::new());
2589 let last_quotes = Arc::new(DashMap::new());
2590 let ws_client: Option<DydxWebSocketClient> = None;
2591 let active_orderbook_subs = Arc::new(DashMap::new());
2592 let active_trade_subs = Arc::new(DashMap::new());
2593 let active_bar_subs = Arc::new(DashMap::new());
2594 let incomplete_bars = Arc::new(DashMap::new());
2595
2596 let ctx = WsMessageContext {
2597 data_sender: &sender,
2598 instruments: &instruments,
2599 order_books: &order_books,
2600 last_quotes: &last_quotes,
2601 ws_client: &ws_client,
2602 active_orderbook_subs: &active_orderbook_subs,
2603 active_trade_subs: &active_trade_subs,
2604 active_bar_subs: &active_bar_subs,
2605 incomplete_bars: &incomplete_bars,
2606 };
2607
2608 let err = crate::websocket::error::DydxWebSocketError::from_message(
2609 "malformed WebSocket payload".to_string(),
2610 );
2611
2612 DydxDataClient::handle_ws_message(
2613 crate::websocket::enums::NautilusWsMessage::Error(err),
2614 &ctx,
2615 );
2616 }
2617
2618 #[tokio::test]
2619 async fn test_request_bars_partitioning_math_does_not_panic() {
2620 setup_test_env();
2621
2622 let client_id = ClientId::from("DYDX-BARS");
2623 let config = DydxDataClientConfig::default();
2624 let http_client = DydxHttpClient::default();
2625
2626 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2627
2628 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2629 let spec = BarSpecification {
2630 step: std::num::NonZeroUsize::new(1).unwrap(),
2631 aggregation: BarAggregation::Minute,
2632 price_type: PriceType::Last,
2633 };
2634 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2635
2636 let now = Utc::now();
2637 let start = Some(now - chrono::Duration::hours(10));
2638 let end = Some(now);
2639
2640 let request = RequestBars::new(
2641 bar_type,
2642 start,
2643 end,
2644 None,
2645 Some(client_id),
2646 UUID4::new(),
2647 get_atomic_clock_realtime().get_time_ns(),
2648 None,
2649 );
2650
2651 assert!(client.request_bars(&request).is_ok());
2654 }
2655
2656 #[tokio::test]
2657 async fn test_request_bars_partitioning_months_range_does_not_overflow() {
2658 setup_test_env();
2659
2660 let now = Utc::now();
2662 let candle = crate::http::models::Candle {
2663 started_at: now - chrono::Duration::minutes(1),
2664 ticker: "BTC-USD".to_string(),
2665 resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2666 open: dec!(100.0),
2667 high: dec!(101.0),
2668 low: dec!(99.0),
2669 close: dec!(100.5),
2670 base_token_volume: dec!(1.0),
2671 usd_volume: dec!(100.0),
2672 trades: 10,
2673 starting_open_interest: dec!(1000.0),
2674 };
2675 let candles_response = crate::http::models::CandlesResponse {
2676 candles: vec![candle],
2677 };
2678 let state = CandlesTestState {
2679 response: Arc::new(candles_response),
2680 };
2681 let addr = start_candles_test_server(state).await;
2682 let base_url = format!("http://{addr}");
2683
2684 let client_id = ClientId::from("DYDX-BARS-MONTHS");
2685 let config = DydxDataClientConfig {
2686 base_url_http: Some(base_url),
2687 is_testnet: true,
2688 ..Default::default()
2689 };
2690
2691 let http_client = DydxHttpClient::new(
2692 config.base_url_http.clone(),
2693 config.http_timeout_secs,
2694 config.http_proxy_url.clone(),
2695 config.is_testnet,
2696 None,
2697 )
2698 .unwrap();
2699
2700 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2701
2702 let instrument = create_test_instrument_any();
2704 let instrument_id = instrument.id();
2705 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
2706 client.instruments.insert(symbol_key, instrument);
2707
2708 let spec = BarSpecification {
2709 step: std::num::NonZeroUsize::new(1).unwrap(),
2710 aggregation: BarAggregation::Minute,
2711 price_type: PriceType::Last,
2712 };
2713 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2714
2715 let start = Some(now - chrono::Duration::days(90));
2717 let end = Some(now);
2718
2719 let limit = Some(std::num::NonZeroUsize::new(10).unwrap());
2721
2722 let request = RequestBars::new(
2723 bar_type,
2724 start,
2725 end,
2726 limit,
2727 Some(client_id),
2728 UUID4::new(),
2729 get_atomic_clock_realtime().get_time_ns(),
2730 None,
2731 );
2732
2733 assert!(client.request_bars(&request).is_ok());
2734 }
2735
2736 #[derive(Clone)]
2737 struct OrderbookTestState {
2738 snapshot: Arc<crate::http::models::OrderbookResponse>,
2739 }
2740
2741 #[derive(Clone)]
2742 struct TradesTestState {
2743 response: Arc<crate::http::models::TradesResponse>,
2744 last_ticker: Arc<tokio::sync::Mutex<Option<String>>>,
2745 last_limit: Arc<tokio::sync::Mutex<Option<Option<u32>>>>,
2746 }
2747
2748 #[derive(Clone)]
2749 struct CandlesTestState {
2750 response: Arc<crate::http::models::CandlesResponse>,
2751 }
2752
2753 async fn start_orderbook_test_server(state: OrderbookTestState) -> SocketAddr {
2754 async fn handle_orderbook(
2755 Path(_ticker): Path<String>,
2756 State(state): State<OrderbookTestState>,
2757 ) -> Json<crate::http::models::OrderbookResponse> {
2758 Json((*state.snapshot).clone())
2759 }
2760
2761 let router = Router::new().route(
2762 "/v4/orderbooks/perpetualMarket/{ticker}",
2763 get(handle_orderbook).with_state(state),
2764 );
2765
2766 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2767 let addr = listener.local_addr().unwrap();
2768
2769 get_runtime().spawn(async move {
2770 axum::serve(listener, router.into_make_service())
2771 .await
2772 .unwrap();
2773 });
2774
2775 wait_for_server(addr).await;
2776 addr
2777 }
2778
2779 async fn start_trades_test_server(state: TradesTestState) -> SocketAddr {
2780 async fn handle_trades(
2781 Path(ticker): Path<String>,
2782 Query(params): Query<HashMap<String, String>>,
2783 State(state): State<TradesTestState>,
2784 ) -> Json<crate::http::models::TradesResponse> {
2785 {
2786 let mut last_ticker = state.last_ticker.lock().await;
2787 *last_ticker = Some(ticker);
2788 }
2789
2790 let limit = params
2791 .get("limit")
2792 .and_then(|value| value.parse::<u32>().ok());
2793 {
2794 let mut last_limit = state.last_limit.lock().await;
2795 *last_limit = Some(limit);
2796 }
2797
2798 Json((*state.response).clone())
2799 }
2800
2801 let router = Router::new().route(
2802 "/v4/trades/perpetualMarket/{ticker}",
2803 get(handle_trades).with_state(state),
2804 );
2805
2806 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2807 let addr = listener.local_addr().unwrap();
2808
2809 get_runtime().spawn(async move {
2810 axum::serve(listener, router.into_make_service())
2811 .await
2812 .unwrap();
2813 });
2814
2815 wait_for_server(addr).await;
2816 addr
2817 }
2818
2819 async fn start_candles_test_server(state: CandlesTestState) -> SocketAddr {
2820 async fn handle_candles(
2821 Path(_ticker): Path<String>,
2822 Query(_params): Query<HashMap<String, String>>,
2823 State(state): State<CandlesTestState>,
2824 ) -> Json<crate::http::models::CandlesResponse> {
2825 Json((*state.response).clone())
2826 }
2827
2828 let router = Router::new().route(
2829 "/v4/candles/perpetualMarkets/{ticker}",
2830 get(handle_candles).with_state(state),
2831 );
2832
2833 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2834 let addr = listener.local_addr().unwrap();
2835
2836 get_runtime().spawn(async move {
2837 axum::serve(listener, router.into_make_service())
2838 .await
2839 .unwrap();
2840 });
2841
2842 wait_for_server(addr).await;
2843 addr
2844 }
2845
2846 fn create_test_instrument_any() -> InstrumentAny {
2847 let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
2848
2849 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
2850 instrument_id,
2851 instrument_id.symbol,
2852 Currency::BTC(),
2853 Currency::USD(),
2854 Currency::USD(),
2855 false,
2856 2, 8, Price::new(0.01, 2), Quantity::new(0.001, 8), Some(Quantity::new(1.0, 0)), Some(Quantity::new(0.001, 8)), Some(Quantity::new(100000.0, 8)), Some(Quantity::new(0.001, 8)), None, None, Some(Price::new(1000000.0, 2)), Some(Price::new(0.01, 2)), Some(dec!(0.05)), Some(dec!(0.03)), Some(dec!(0.0002)), Some(dec!(0.0005)), UnixNanos::default(), UnixNanos::default(), ))
2875 }
2876
2877 #[tokio::test]
2882 async fn test_candle_to_bar_price_size_edge_cases() {
2883 setup_test_env();
2884
2885 let clock = get_atomic_clock_realtime();
2886 let now = Utc::now();
2887
2888 let candle = Candle {
2890 started_at: now,
2891 ticker: "BTC-USD".to_string(),
2892 resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2893 open: dec!(123456789.123456),
2894 high: dec!(987654321.987654), low: dec!(123456.789), close: dec!(223456789.123456), base_token_volume: dec!(0.00000001),
2898 usd_volume: dec!(1234500.0),
2899 trades: 42,
2900 starting_open_interest: dec!(1000.0),
2901 };
2902
2903 let instrument = create_test_instrument_any();
2904 let instrument_id = instrument.id();
2905 let spec = BarSpecification {
2906 step: std::num::NonZeroUsize::new(1).unwrap(),
2907 aggregation: BarAggregation::Minute,
2908 price_type: PriceType::Last,
2909 };
2910 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2911
2912 let bar = DydxDataClient::candle_to_bar(
2913 &candle,
2914 bar_type,
2915 instrument.price_precision(),
2916 instrument.size_precision(),
2917 60,
2918 clock,
2919 )
2920 .expect("candle_to_bar should handle large/scientific values");
2921
2922 assert!(bar.open.as_f64() > 0.0);
2923 assert!(bar.high.as_f64() >= bar.low.as_f64());
2924 assert!(bar.volume.as_f64() > 0.0);
2925 }
2926
2927 #[tokio::test]
2928 async fn test_candle_to_bar_ts_event_overflow_safe() {
2929 setup_test_env();
2930
2931 let clock = get_atomic_clock_realtime();
2932 let now = Utc::now();
2933
2934 let candle = Candle {
2935 started_at: now,
2936 ticker: "BTC-USD".to_string(),
2937 resolution: crate::common::enums::DydxCandleResolution::OneDay,
2938 open: Decimal::from(1),
2939 high: Decimal::from(1),
2940 low: Decimal::from(1),
2941 close: Decimal::from(1),
2942 base_token_volume: Decimal::from(1),
2943 usd_volume: Decimal::from(1),
2944 trades: 1,
2945 starting_open_interest: Decimal::from(1),
2946 };
2947
2948 let instrument = create_test_instrument_any();
2949 let instrument_id = instrument.id();
2950 let spec = BarSpecification {
2951 step: std::num::NonZeroUsize::new(1).unwrap(),
2952 aggregation: BarAggregation::Day,
2953 price_type: PriceType::Last,
2954 };
2955 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2956
2957 let bar_secs = i64::MAX / 1_000_000_000;
2959 let bar = DydxDataClient::candle_to_bar(
2960 &candle,
2961 bar_type,
2962 instrument.price_precision(),
2963 instrument.size_precision(),
2964 bar_secs,
2965 clock,
2966 )
2967 .expect("candle_to_bar should not overflow on ts_event");
2968
2969 assert!(bar.ts_event.as_u64() >= bar.ts_init.as_u64());
2970 }
2971
2972 #[tokio::test]
2973 async fn test_request_bars_incomplete_bar_filtering_with_clock_skew() {
2974 let clock = get_atomic_clock_realtime();
2977 let now = Utc::now();
2978
2979 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2982 set_data_event_sender(sender);
2983
2984 let candle_past = Candle {
2986 started_at: now - chrono::Duration::minutes(2),
2987 ticker: "BTC-USD".to_string(),
2988 resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2989 open: Decimal::from(1),
2990 high: Decimal::from(2),
2991 low: Decimal::from(1),
2992 close: Decimal::from(1),
2993 base_token_volume: Decimal::from(1),
2994 usd_volume: Decimal::from(1),
2995 trades: 1,
2996 starting_open_interest: Decimal::from(1),
2997 };
2998 let candle_future = Candle {
2999 started_at: now + chrono::Duration::minutes(2),
3000 ..candle_past.clone()
3001 };
3002
3003 let candles_response = CandlesResponse {
3004 candles: vec![candle_past, candle_future],
3005 };
3006
3007 let state = CandlesTestState {
3008 response: Arc::new(candles_response),
3009 };
3010 let addr = start_candles_test_server(state).await;
3011 let base_url = format!("http://{addr}");
3012
3013 let client_id = ClientId::from("DYDX-BARS-SKEW");
3014 let config = DydxDataClientConfig {
3015 base_url_http: Some(base_url),
3016 is_testnet: true,
3017 ..Default::default()
3018 };
3019
3020 let http_client = DydxHttpClient::new(
3021 config.base_url_http.clone(),
3022 config.http_timeout_secs,
3023 config.http_proxy_url.clone(),
3024 config.is_testnet,
3025 None,
3026 )
3027 .unwrap();
3028
3029 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3030
3031 let instrument = create_test_instrument_any();
3032 let instrument_id = instrument.id();
3033 let symbol_key = Ustr::from(instrument_id.symbol.as_ref());
3034 client.instruments.insert(symbol_key, instrument);
3035
3036 let spec = BarSpecification {
3037 step: std::num::NonZeroUsize::new(1).unwrap(),
3038 aggregation: BarAggregation::Minute,
3039 price_type: PriceType::Last,
3040 };
3041 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
3042
3043 let request = RequestBars::new(
3044 bar_type,
3045 Some(now - chrono::Duration::minutes(5)),
3046 Some(now + chrono::Duration::minutes(5)),
3047 None,
3048 Some(client_id),
3049 UUID4::new(),
3050 clock.get_time_ns(),
3051 None,
3052 );
3053
3054 assert!(client.request_bars(&request).is_ok());
3055
3056 let timeout = tokio::time::Duration::from_secs(3);
3057 if let Ok(Some(DataEvent::Response(DataResponse::Bars(resp)))) =
3058 tokio::time::timeout(timeout, rx.recv()).await
3059 {
3060 assert_eq!(resp.data.len(), 1);
3062 }
3063 }
3064
3065 #[rstest]
3066 fn test_decimal_to_f64_precision_loss_within_tolerance() {
3067 let price_value = 12345.125_f64;
3069 let qty_value = 0.00012345_f64;
3070
3071 let price = Price::new(price_value, 6);
3072 let qty = Quantity::new(qty_value, 8);
3073
3074 let price_diff = (price.as_f64() - price_value).abs();
3075 let qty_diff = (qty.as_f64() - qty_value).abs();
3076
3077 assert!(price_diff < 1e-10);
3079 assert!(qty_diff < 1e-12);
3080 }
3081
3082 #[tokio::test]
3083 async fn test_orderbook_refresh_task_applies_http_snapshot_and_emits_event() {
3084 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3086 set_data_event_sender(sender);
3087
3088 let snapshot = crate::http::models::OrderbookResponse {
3090 bids: vec![crate::http::models::OrderbookLevel {
3091 price: dec!(100.0),
3092 size: dec!(1.0),
3093 }],
3094 asks: vec![crate::http::models::OrderbookLevel {
3095 price: dec!(101.0),
3096 size: dec!(2.0),
3097 }],
3098 };
3099 let state = OrderbookTestState {
3100 snapshot: Arc::new(snapshot),
3101 };
3102 let addr = start_orderbook_test_server(state).await;
3103 let base_url = format!("http://{addr}");
3104
3105 let client_id = ClientId::from("DYDX-REFRESH");
3107 let config = DydxDataClientConfig {
3108 is_testnet: true,
3109 base_url_http: Some(base_url),
3110 orderbook_refresh_interval_secs: Some(1),
3111 instrument_refresh_interval_secs: None,
3112 ..Default::default()
3113 };
3114
3115 let http_client = DydxHttpClient::new(
3116 config.base_url_http.clone(),
3117 config.http_timeout_secs,
3118 config.http_proxy_url.clone(),
3119 config.is_testnet,
3120 None,
3121 )
3122 .unwrap();
3123
3124 let mut client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3125
3126 let instrument = create_test_instrument_any();
3128 let instrument_id = instrument.id();
3129 let symbol_key = Ustr::from(instrument_id.symbol.as_ref());
3130 client.instruments.insert(symbol_key, instrument);
3131 client.order_books.insert(
3132 instrument_id,
3133 OrderBook::new(instrument_id, BookType::L2_MBP),
3134 );
3135 client.active_orderbook_subs.insert(instrument_id, ());
3136
3137 client.start_orderbook_refresh_task().unwrap();
3139
3140 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
3141 let mut saw_snapshot_event = false;
3142
3143 while std::time::Instant::now() < deadline {
3144 if let Ok(Some(DataEvent::Data(NautilusData::Deltas(_)))) =
3145 tokio::time::timeout(std::time::Duration::from_millis(250), rx.recv()).await
3146 {
3147 saw_snapshot_event = true;
3148 break;
3149 }
3150 }
3151
3152 assert!(
3153 saw_snapshot_event,
3154 "expected at least one snapshot deltas event from refresh task"
3155 );
3156
3157 let book = client
3159 .order_books
3160 .get(&instrument_id)
3161 .expect("orderbook should exist after refresh");
3162 let best_bid = book.best_bid_price().expect("best bid should be set");
3163 let best_ask = book.best_ask_price().expect("best ask should be set");
3164
3165 assert_eq!(best_bid, Price::from("100.00"));
3166 assert_eq!(best_ask, Price::from("101.00"));
3167 }
3168
3169 #[rstest]
3170 fn test_resolve_crossed_order_book_bid_larger_than_ask() {
3171 let instrument = create_test_instrument_any();
3174 let instrument_id = instrument.id();
3175 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3176 let ts_init = get_atomic_clock_realtime().get_time_ns();
3177
3178 let initial_deltas = vec![
3180 OrderBookDelta::new(
3181 instrument_id,
3182 BookAction::Add,
3183 BookOrder::new(
3184 OrderSide::Buy,
3185 Price::from("99.00"),
3186 Quantity::from("1.0"),
3187 0,
3188 ),
3189 0,
3190 0,
3191 ts_init,
3192 ts_init,
3193 ),
3194 OrderBookDelta::new(
3195 instrument_id,
3196 BookAction::Add,
3197 BookOrder::new(
3198 OrderSide::Sell,
3199 Price::from("101.00"),
3200 Quantity::from("2.0"),
3201 0,
3202 ),
3203 0,
3204 0,
3205 ts_init,
3206 ts_init,
3207 ),
3208 ];
3209 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3210 .unwrap();
3211
3212 let crossed_deltas = vec![OrderBookDelta::new(
3214 instrument_id,
3215 BookAction::Add,
3216 BookOrder::new(
3217 OrderSide::Buy,
3218 Price::from("102.00"),
3219 Quantity::from("5.0"),
3220 0,
3221 ),
3222 0,
3223 0,
3224 ts_init,
3225 ts_init,
3226 )];
3227 let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3228
3229 let resolved =
3230 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3231 .unwrap();
3232
3233 assert_eq!(book.best_bid_price(), Some(Price::from("102.00")));
3236 assert!(book.best_bid_size().unwrap().as_f64() < 5.0); assert!(
3238 book.best_ask_price().is_none()
3239 || book.best_ask_price().unwrap() > book.best_bid_price().unwrap()
3240 ); assert!(resolved.deltas.len() > 1); }
3245
3246 #[rstest]
3247 fn test_resolve_crossed_order_book_ask_larger_than_bid() {
3248 let instrument = create_test_instrument_any();
3251 let instrument_id = instrument.id();
3252 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3253 let ts_init = get_atomic_clock_realtime().get_time_ns();
3254
3255 let initial_deltas = vec![
3257 OrderBookDelta::new(
3258 instrument_id,
3259 BookAction::Add,
3260 BookOrder::new(
3261 OrderSide::Buy,
3262 Price::from("99.00"),
3263 Quantity::from("1.0"),
3264 0,
3265 ),
3266 0,
3267 0,
3268 ts_init,
3269 ts_init,
3270 ),
3271 OrderBookDelta::new(
3272 instrument_id,
3273 BookAction::Add,
3274 BookOrder::new(
3275 OrderSide::Sell,
3276 Price::from("101.00"),
3277 Quantity::from("5.0"),
3278 0,
3279 ),
3280 0,
3281 0,
3282 ts_init,
3283 ts_init,
3284 ),
3285 ];
3286 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3287 .unwrap();
3288
3289 let crossed_deltas = vec![OrderBookDelta::new(
3291 instrument_id,
3292 BookAction::Add,
3293 BookOrder::new(
3294 OrderSide::Buy,
3295 Price::from("102.00"),
3296 Quantity::from("2.0"),
3297 0,
3298 ),
3299 0,
3300 0,
3301 ts_init,
3302 ts_init,
3303 )];
3304 let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3305
3306 let resolved =
3307 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3308 .unwrap();
3309
3310 assert_eq!(book.best_ask_price(), Some(Price::from("101.00")));
3312 assert!(book.best_ask_size().unwrap().as_f64() < 5.0); assert_eq!(book.best_bid_price(), Some(Price::from("99.00"))); assert!(book.best_ask_price().unwrap() > book.best_bid_price().unwrap()); assert!(resolved.deltas.len() > 1);
3318 }
3319
3320 #[rstest]
3321 fn test_resolve_crossed_order_book_equal_sizes() {
3322 let instrument = create_test_instrument_any();
3325 let instrument_id = instrument.id();
3326 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3327 let ts_init = get_atomic_clock_realtime().get_time_ns();
3328
3329 let initial_deltas = vec![
3331 OrderBookDelta::new(
3332 instrument_id,
3333 BookAction::Add,
3334 BookOrder::new(
3335 OrderSide::Buy,
3336 Price::from("99.00"),
3337 Quantity::from("1.0"),
3338 0,
3339 ),
3340 0,
3341 0,
3342 ts_init,
3343 ts_init,
3344 ),
3345 OrderBookDelta::new(
3346 instrument_id,
3347 BookAction::Add,
3348 BookOrder::new(
3349 OrderSide::Sell,
3350 Price::from("101.00"),
3351 Quantity::from("3.0"),
3352 0,
3353 ),
3354 0,
3355 0,
3356 ts_init,
3357 ts_init,
3358 ),
3359 ];
3360 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3361 .unwrap();
3362
3363 let crossed_deltas = vec![OrderBookDelta::new(
3365 instrument_id,
3366 BookAction::Add,
3367 BookOrder::new(
3368 OrderSide::Buy,
3369 Price::from("102.00"),
3370 Quantity::from("3.0"),
3371 0,
3372 ),
3373 0,
3374 0,
3375 ts_init,
3376 ts_init,
3377 )];
3378 let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3379
3380 let resolved =
3381 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3382 .unwrap();
3383
3384 assert_eq!(book.best_bid_price(), Some(Price::from("99.00"))); if let Some(ask_price) = book.best_ask_price() {
3388 assert!(ask_price > book.best_bid_price().unwrap()); }
3390
3391 assert!(resolved.deltas.len() > 1);
3393 }
3394
3395 #[rstest]
3396 fn test_resolve_crossed_order_book_multiple_iterations() {
3397 let instrument = create_test_instrument_any();
3399 let instrument_id = instrument.id();
3400 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3401 let ts_init = get_atomic_clock_realtime().get_time_ns();
3402
3403 let initial_deltas = vec![
3405 OrderBookDelta::new(
3406 instrument_id,
3407 BookAction::Add,
3408 BookOrder::new(
3409 OrderSide::Buy,
3410 Price::from("98.00"),
3411 Quantity::from("1.0"),
3412 0,
3413 ),
3414 0,
3415 0,
3416 ts_init,
3417 ts_init,
3418 ),
3419 OrderBookDelta::new(
3420 instrument_id,
3421 BookAction::Add,
3422 BookOrder::new(
3423 OrderSide::Sell,
3424 Price::from("100.00"),
3425 Quantity::from("1.0"),
3426 0,
3427 ),
3428 0,
3429 0,
3430 ts_init,
3431 ts_init,
3432 ),
3433 OrderBookDelta::new(
3434 instrument_id,
3435 BookAction::Add,
3436 BookOrder::new(
3437 OrderSide::Sell,
3438 Price::from("101.00"),
3439 Quantity::from("1.0"),
3440 0,
3441 ),
3442 0,
3443 0,
3444 ts_init,
3445 ts_init,
3446 ),
3447 ];
3448 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3449 .unwrap();
3450
3451 let crossed_deltas = vec![
3453 OrderBookDelta::new(
3454 instrument_id,
3455 BookAction::Add,
3456 BookOrder::new(
3457 OrderSide::Buy,
3458 Price::from("102.00"),
3459 Quantity::from("1.0"),
3460 0,
3461 ),
3462 0,
3463 0,
3464 ts_init,
3465 ts_init,
3466 ),
3467 OrderBookDelta::new(
3468 instrument_id,
3469 BookAction::Add,
3470 BookOrder::new(
3471 OrderSide::Buy,
3472 Price::from("103.00"),
3473 Quantity::from("1.0"),
3474 0,
3475 ),
3476 0,
3477 0,
3478 ts_init,
3479 ts_init,
3480 ),
3481 ];
3482 let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3483
3484 let resolved =
3485 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3486 .unwrap();
3487
3488 if let (Some(bid_price), Some(ask_price)) = (book.best_bid_price(), book.best_ask_price()) {
3490 assert!(ask_price > bid_price, "Book should be uncrossed");
3491 }
3492
3493 assert!(resolved.deltas.len() > 2); }
3496
3497 #[rstest]
3498 fn test_resolve_crossed_order_book_non_crossed_passthrough() {
3499 let instrument = create_test_instrument_any();
3501 let instrument_id = instrument.id();
3502 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3503 let ts_init = get_atomic_clock_realtime().get_time_ns();
3504
3505 let initial_deltas = vec![
3507 OrderBookDelta::new(
3508 instrument_id,
3509 BookAction::Add,
3510 BookOrder::new(
3511 OrderSide::Buy,
3512 Price::from("99.00"),
3513 Quantity::from("1.0"),
3514 0,
3515 ),
3516 0,
3517 0,
3518 ts_init,
3519 ts_init,
3520 ),
3521 OrderBookDelta::new(
3522 instrument_id,
3523 BookAction::Add,
3524 BookOrder::new(
3525 OrderSide::Sell,
3526 Price::from("101.00"),
3527 Quantity::from("1.0"),
3528 0,
3529 ),
3530 0,
3531 0,
3532 ts_init,
3533 ts_init,
3534 ),
3535 ];
3536 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3537 .unwrap();
3538
3539 let new_deltas = vec![OrderBookDelta::new(
3541 instrument_id,
3542 BookAction::Add,
3543 BookOrder::new(
3544 OrderSide::Buy,
3545 Price::from("98.50"),
3546 Quantity::from("2.0"),
3547 0,
3548 ),
3549 0,
3550 0,
3551 ts_init,
3552 ts_init,
3553 )];
3554 let venue_deltas = OrderBookDeltas::new(instrument_id, new_deltas.clone());
3555
3556 let original_bid = book.best_bid_price();
3557 let original_ask = book.best_ask_price();
3558
3559 let resolved =
3560 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3561 .unwrap();
3562
3563 assert_eq!(resolved.deltas.len(), new_deltas.len());
3565 assert_eq!(book.best_bid_price(), original_bid);
3566 assert_eq!(book.best_ask_price(), original_ask);
3567 assert!(book.best_ask_price().unwrap() > book.best_bid_price().unwrap());
3568 }
3569
3570 #[tokio::test]
3571 async fn test_request_instruments_successful_fetch() {
3572 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3574 set_data_event_sender(sender);
3575
3576 let client_id = ClientId::from("DYDX-TEST");
3577 let config = DydxDataClientConfig::default();
3578 let http_client = DydxHttpClient::default();
3579 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3580
3581 let request = RequestInstruments::new(
3582 None,
3583 None,
3584 Some(client_id),
3585 Some(*DYDX_VENUE),
3586 UUID4::new(),
3587 get_atomic_clock_realtime().get_time_ns(),
3588 None,
3589 );
3590
3591 assert!(client.request_instruments(&request).is_ok());
3593
3594 let timeout = tokio::time::Duration::from_secs(5);
3596 let result = tokio::time::timeout(timeout, rx.recv()).await;
3597
3598 match result {
3599 Ok(Some(DataEvent::Response(resp))) => {
3600 if let DataResponse::Instruments(inst_resp) = resp {
3601 assert_eq!(inst_resp.correlation_id, request.request_id);
3603 assert_eq!(inst_resp.client_id, client_id);
3604 assert_eq!(inst_resp.venue, *DYDX_VENUE);
3605 assert!(inst_resp.start.is_none());
3606 assert!(inst_resp.end.is_none());
3607 }
3609 }
3610 Ok(Some(_)) => panic!("Expected InstrumentsResponse"),
3611 Ok(None) => panic!("Channel closed unexpectedly"),
3612 Err(_) => {
3613 println!("Test timed out - testnet may be unreachable");
3615 }
3616 }
3617 }
3618
3619 #[tokio::test]
3620 async fn test_request_instruments_empty_response_on_http_error() {
3621 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3623 set_data_event_sender(sender);
3624
3625 let client_id = ClientId::from("DYDX-ERROR-TEST");
3626 let config = DydxDataClientConfig {
3627 base_url_http: Some("http://invalid-url-does-not-exist.local".to_string()),
3628 ..Default::default()
3629 };
3630 let http_client = DydxHttpClient::new(
3631 config.base_url_http.clone(),
3632 config.http_timeout_secs,
3633 config.http_proxy_url.clone(),
3634 config.is_testnet,
3635 None,
3636 )
3637 .unwrap();
3638
3639 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3640
3641 let request = RequestInstruments::new(
3642 None,
3643 None,
3644 Some(client_id),
3645 Some(*DYDX_VENUE),
3646 UUID4::new(),
3647 get_atomic_clock_realtime().get_time_ns(),
3648 None,
3649 );
3650
3651 assert!(client.request_instruments(&request).is_ok());
3652
3653 let timeout = tokio::time::Duration::from_secs(3);
3655 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3656 tokio::time::timeout(timeout, rx.recv()).await
3657 {
3658 assert!(
3659 resp.data.is_empty(),
3660 "Expected empty instruments on HTTP error"
3661 );
3662 assert_eq!(resp.correlation_id, request.request_id);
3663 assert_eq!(resp.client_id, client_id);
3664 }
3665 }
3666
3667 #[tokio::test]
3668 async fn test_request_instruments_caching() {
3669 setup_test_env();
3671
3672 let client_id = ClientId::from("DYDX-CACHE-TEST");
3673 let config = DydxDataClientConfig::default();
3674 let http_client = DydxHttpClient::default();
3675 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3676
3677 let initial_cache_size = client.instruments.len();
3678
3679 let request = RequestInstruments::new(
3680 None,
3681 None,
3682 Some(client_id),
3683 Some(*DYDX_VENUE),
3684 UUID4::new(),
3685 get_atomic_clock_realtime().get_time_ns(),
3686 None,
3687 );
3688
3689 assert!(client.request_instruments(&request).is_ok());
3690
3691 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
3693
3694 let final_cache_size = client.instruments.len();
3696 assert!(final_cache_size >= initial_cache_size);
3699 }
3700
3701 #[tokio::test]
3702 async fn test_request_instruments_correlation_id_matching() {
3703 setup_test_env();
3705
3706 let client_id = ClientId::from("DYDX-CORR-TEST");
3707 let config = DydxDataClientConfig::default();
3708 let http_client = DydxHttpClient::default();
3709 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3710
3711 let request_id = UUID4::new();
3712 let request = RequestInstruments::new(
3713 None,
3714 None,
3715 Some(client_id),
3716 Some(*DYDX_VENUE),
3717 request_id,
3718 get_atomic_clock_realtime().get_time_ns(),
3719 None,
3720 );
3721
3722 assert!(client.request_instruments(&request).is_ok());
3724 }
3725
3726 #[tokio::test]
3727 async fn test_request_instruments_venue_assignment() {
3728 setup_test_env();
3730
3731 let client_id = ClientId::from("DYDX-VENUE-TEST");
3732 let config = DydxDataClientConfig::default();
3733 let http_client = DydxHttpClient::default();
3734 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3735
3736 assert_eq!(client.venue(), *DYDX_VENUE);
3737
3738 let request = RequestInstruments::new(
3739 None,
3740 None,
3741 Some(client_id),
3742 Some(*DYDX_VENUE),
3743 UUID4::new(),
3744 get_atomic_clock_realtime().get_time_ns(),
3745 None,
3746 );
3747
3748 assert!(client.request_instruments(&request).is_ok());
3749 }
3750
3751 #[tokio::test]
3752 async fn test_request_instruments_timestamp_handling() {
3753 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3755 set_data_event_sender(sender);
3756
3757 let client_id = ClientId::from("DYDX-TS-TEST");
3758 let config = DydxDataClientConfig::default();
3759 let http_client = DydxHttpClient::default();
3760 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3761
3762 let now = Utc::now();
3763 let start = Some(now - chrono::Duration::hours(24));
3764 let end = Some(now);
3765
3766 let request = RequestInstruments::new(
3767 start,
3768 end,
3769 Some(client_id),
3770 Some(*DYDX_VENUE),
3771 UUID4::new(),
3772 get_atomic_clock_realtime().get_time_ns(),
3773 None,
3774 );
3775
3776 assert!(client.request_instruments(&request).is_ok());
3777
3778 let timeout = tokio::time::Duration::from_secs(3);
3780 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3781 tokio::time::timeout(timeout, rx.recv()).await
3782 {
3783 assert!(resp.start.unwrap() > 0);
3785 assert!(resp.end.unwrap() > 0);
3786 assert!(resp.start.unwrap() <= resp.end.unwrap());
3787 assert!(resp.ts_init > 0);
3788 }
3789 }
3790
3791 #[tokio::test]
3792 async fn test_request_instruments_with_start_only() {
3793 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3795 set_data_event_sender(sender);
3796
3797 let client_id = ClientId::from("DYDX-TS-START-ONLY");
3798 let config = DydxDataClientConfig::default();
3799 let http_client = DydxHttpClient::default();
3800 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3801
3802 let now = Utc::now();
3803 let start = Some(now - chrono::Duration::hours(24));
3804
3805 let request = RequestInstruments::new(
3806 start,
3807 None,
3808 Some(client_id),
3809 Some(*DYDX_VENUE),
3810 UUID4::new(),
3811 get_atomic_clock_realtime().get_time_ns(),
3812 None,
3813 );
3814
3815 assert!(client.request_instruments(&request).is_ok());
3816
3817 let timeout = tokio::time::Duration::from_secs(3);
3818 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3819 tokio::time::timeout(timeout, rx.recv()).await
3820 {
3821 assert!(resp.start.is_some());
3822 assert!(resp.end.is_none());
3823 assert!(resp.ts_init > 0);
3824 }
3825 }
3826
3827 #[tokio::test]
3828 async fn test_request_instruments_with_end_only() {
3829 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3831 set_data_event_sender(sender);
3832
3833 let client_id = ClientId::from("DYDX-TS-END-ONLY");
3834 let config = DydxDataClientConfig::default();
3835 let http_client = DydxHttpClient::default();
3836 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3837
3838 let now = Utc::now();
3839 let end = Some(now);
3840
3841 let request = RequestInstruments::new(
3842 None,
3843 end,
3844 Some(client_id),
3845 Some(*DYDX_VENUE),
3846 UUID4::new(),
3847 get_atomic_clock_realtime().get_time_ns(),
3848 None,
3849 );
3850
3851 assert!(client.request_instruments(&request).is_ok());
3852
3853 let timeout = tokio::time::Duration::from_secs(3);
3854 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3855 tokio::time::timeout(timeout, rx.recv()).await
3856 {
3857 assert!(resp.start.is_none());
3858 assert!(resp.end.is_some());
3859 assert!(resp.ts_init > 0);
3860 }
3861 }
3862
3863 #[tokio::test]
3864 async fn test_request_instruments_client_id_fallback() {
3865 setup_test_env();
3867
3868 let client_id = ClientId::from("DYDX-FALLBACK-TEST");
3869 let config = DydxDataClientConfig::default();
3870 let http_client = DydxHttpClient::default();
3871 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3872
3873 let request = RequestInstruments::new(
3874 None,
3875 None,
3876 None, Some(*DYDX_VENUE),
3878 UUID4::new(),
3879 get_atomic_clock_realtime().get_time_ns(),
3880 None,
3881 );
3882
3883 assert!(client.request_instruments(&request).is_ok());
3885 }
3886
3887 #[tokio::test]
3888 async fn test_request_instruments_with_params() {
3889 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3891 set_data_event_sender(sender);
3892
3893 let client_id = ClientId::from("DYDX-PARAMS-TEST");
3894 let config = DydxDataClientConfig::default();
3895 let http_client = DydxHttpClient::default();
3896 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3897
3898 let mut params_map = IndexMap::new();
3900 params_map.insert("test_key".to_string(), "test_value".to_string());
3901
3902 let request = RequestInstruments::new(
3903 None,
3904 None,
3905 Some(client_id),
3906 Some(*DYDX_VENUE),
3907 UUID4::new(),
3908 get_atomic_clock_realtime().get_time_ns(),
3909 Some(params_map),
3910 );
3911
3912 assert!(client.request_instruments(&request).is_ok());
3913
3914 let timeout = tokio::time::Duration::from_secs(3);
3916 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3917 tokio::time::timeout(timeout, rx.recv()).await
3918 {
3919 assert_eq!(resp.client_id, client_id);
3921 let params = resp
3922 .params
3923 .expect("expected params to be present in InstrumentsResponse");
3924 assert_eq!(
3925 params.get("test_key").map(String::as_str),
3926 Some("test_value")
3927 );
3928 }
3929 }
3930
3931 #[tokio::test]
3932 async fn test_request_instruments_with_start_and_end_range() {
3933 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3935 set_data_event_sender(sender);
3936
3937 let client_id = ClientId::from("DYDX-START-END-RANGE");
3938 let config = DydxDataClientConfig::default();
3939 let http_client = DydxHttpClient::default();
3940 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3941
3942 let now = Utc::now();
3943 let start = Some(now - chrono::Duration::hours(48));
3944 let end = Some(now - chrono::Duration::hours(24));
3945
3946 let request = RequestInstruments::new(
3947 start,
3948 end,
3949 Some(client_id),
3950 Some(*DYDX_VENUE),
3951 UUID4::new(),
3952 get_atomic_clock_realtime().get_time_ns(),
3953 None,
3954 );
3955
3956 assert!(client.request_instruments(&request).is_ok());
3957
3958 let timeout = tokio::time::Duration::from_secs(3);
3959 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3960 tokio::time::timeout(timeout, rx.recv()).await
3961 {
3962 assert!(
3964 resp.start.is_some(),
3965 "start timestamp should be present when provided"
3966 );
3967 assert!(
3968 resp.end.is_some(),
3969 "end timestamp should be present when provided"
3970 );
3971 assert!(resp.ts_init > 0, "ts_init should always be set");
3972
3973 if let (Some(start_ts), Some(end_ts)) = (resp.start, resp.end) {
3975 assert!(
3976 start_ts < end_ts,
3977 "start timestamp should be before end timestamp"
3978 );
3979 }
3980 }
3981 }
3982
3983 #[tokio::test]
3984 async fn test_request_instruments_different_client_ids() {
3985 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3987 set_data_event_sender(sender);
3988
3989 let timeout = tokio::time::Duration::from_secs(3);
3990
3991 let client_id_1 = ClientId::from("DYDX-CLIENT-1");
3993 let config1 = DydxDataClientConfig::default();
3994 let http_client1 = DydxHttpClient::default();
3995 let client1 = DydxDataClient::new(client_id_1, config1, http_client1, None).unwrap();
3996
3997 let request1 = RequestInstruments::new(
3998 None,
3999 None,
4000 Some(client_id_1),
4001 Some(*DYDX_VENUE),
4002 UUID4::new(),
4003 get_atomic_clock_realtime().get_time_ns(),
4004 None,
4005 );
4006
4007 assert!(client1.request_instruments(&request1).is_ok());
4008
4009 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp1)))) =
4010 tokio::time::timeout(timeout, rx.recv()).await
4011 {
4012 assert_eq!(
4013 resp1.client_id, client_id_1,
4014 "Response should contain client_id_1"
4015 );
4016 }
4017
4018 let client_id_2 = ClientId::from("DYDX-CLIENT-2");
4020 let config2 = DydxDataClientConfig::default();
4021 let http_client2 = DydxHttpClient::default();
4022 let client2 = DydxDataClient::new(client_id_2, config2, http_client2, None).unwrap();
4023
4024 let request2 = RequestInstruments::new(
4025 None,
4026 None,
4027 Some(client_id_2),
4028 Some(*DYDX_VENUE),
4029 UUID4::new(),
4030 get_atomic_clock_realtime().get_time_ns(),
4031 None,
4032 );
4033
4034 assert!(client2.request_instruments(&request2).is_ok());
4035
4036 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp2)))) =
4037 tokio::time::timeout(timeout, rx.recv()).await
4038 {
4039 assert_eq!(
4040 resp2.client_id, client_id_2,
4041 "Response should contain client_id_2"
4042 );
4043 assert_ne!(
4044 resp2.client_id, client_id_1,
4045 "Different clients should have different client_ids"
4046 );
4047 }
4048 }
4049
4050 #[tokio::test]
4051 async fn test_request_instruments_no_timestamps() {
4052 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4054 set_data_event_sender(sender);
4055
4056 let client_id = ClientId::from("DYDX-NO-TIMESTAMPS");
4057 let config = DydxDataClientConfig::default();
4058 let http_client = DydxHttpClient::default();
4059 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4060
4061 let request = RequestInstruments::new(
4062 None, None, Some(client_id),
4065 Some(*DYDX_VENUE),
4066 UUID4::new(),
4067 get_atomic_clock_realtime().get_time_ns(),
4068 None,
4069 );
4070
4071 assert!(client.request_instruments(&request).is_ok());
4072
4073 let timeout = tokio::time::Duration::from_secs(5);
4074 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
4075 tokio::time::timeout(timeout, rx.recv()).await
4076 {
4077 assert!(
4079 resp.start.is_none(),
4080 "start should be None when not provided"
4081 );
4082 assert!(resp.end.is_none(), "end should be None when not provided");
4083
4084 assert_eq!(resp.venue, *DYDX_VENUE);
4086 assert_eq!(resp.client_id, client_id);
4087 assert!(resp.ts_init > 0);
4088 }
4089 }
4090
4091 #[tokio::test]
4092 async fn test_request_instrument_cache_hit() {
4093 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4095 set_data_event_sender(sender);
4096
4097 let client_id = ClientId::from("DYDX-CACHE-HIT");
4098 let config = DydxDataClientConfig::default();
4099 let http_client = DydxHttpClient::default();
4100 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4101
4102 let instrument = create_test_instrument_any();
4104 let instrument_id = instrument.id();
4105 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4106 client.instruments.insert(symbol_key, instrument.clone());
4107
4108 let request = RequestInstrument::new(
4109 instrument_id,
4110 None,
4111 None,
4112 Some(client_id),
4113 UUID4::new(),
4114 get_atomic_clock_realtime().get_time_ns(),
4115 None,
4116 );
4117
4118 assert!(client.request_instrument(&request).is_ok());
4119
4120 let timeout = tokio::time::Duration::from_millis(500);
4122 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4123 tokio::time::timeout(timeout, rx.recv()).await
4124 {
4125 assert_eq!(resp.instrument_id, instrument_id);
4126 assert_eq!(resp.client_id, client_id);
4127 assert_eq!(resp.data.id(), instrument_id);
4128 }
4129 }
4130
4131 #[tokio::test]
4132 async fn test_request_instrument_cache_miss() {
4133 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4135 set_data_event_sender(sender);
4136
4137 let client_id = ClientId::from("DYDX-CACHE-MISS");
4138 let config = DydxDataClientConfig::default();
4139 let http_client = DydxHttpClient::default();
4140 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4141
4142 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
4143
4144 let request = RequestInstrument::new(
4145 instrument_id,
4146 None,
4147 None,
4148 Some(client_id),
4149 UUID4::new(),
4150 get_atomic_clock_realtime().get_time_ns(),
4151 None,
4152 );
4153
4154 assert!(client.request_instrument(&request).is_ok());
4155
4156 let timeout = tokio::time::Duration::from_secs(5);
4158 let result = tokio::time::timeout(timeout, rx.recv()).await;
4159
4160 match result {
4162 Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) => {
4163 assert_eq!(resp.instrument_id, instrument_id);
4164 assert_eq!(resp.client_id, client_id);
4165 }
4166 Ok(Some(_)) => panic!("Expected InstrumentResponse"),
4167 Ok(None) => panic!("Channel closed unexpectedly"),
4168 Err(_) => {
4169 println!("Test timed out - testnet may be unreachable");
4170 }
4171 }
4172 }
4173
4174 #[tokio::test]
4175 async fn test_request_instrument_not_found() {
4176 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4178 set_data_event_sender(sender);
4179
4180 let client_id = ClientId::from("DYDX-NOT-FOUND");
4181 let config = DydxDataClientConfig {
4182 base_url_http: Some("http://invalid-url.local".to_string()),
4183 ..Default::default()
4184 };
4185 let http_client = DydxHttpClient::new(
4186 config.base_url_http.clone(),
4187 config.http_timeout_secs,
4188 config.http_proxy_url.clone(),
4189 config.is_testnet,
4190 None,
4191 )
4192 .unwrap();
4193
4194 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4195
4196 let instrument_id = InstrumentId::from("INVALID-SYMBOL.DYDX");
4197
4198 let request = RequestInstrument::new(
4199 instrument_id,
4200 None,
4201 None,
4202 Some(client_id),
4203 UUID4::new(),
4204 get_atomic_clock_realtime().get_time_ns(),
4205 None,
4206 );
4207
4208 assert!(client.request_instrument(&request).is_ok());
4210
4211 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
4213 }
4214
4215 #[tokio::test]
4216 async fn test_request_instrument_bulk_caching() {
4217 setup_test_env();
4219
4220 let client_id = ClientId::from("DYDX-BULK-CACHE");
4221 let config = DydxDataClientConfig::default();
4222 let http_client = DydxHttpClient::default();
4223 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4224
4225 let initial_cache_size = client.instruments.len();
4226
4227 let instrument_id = InstrumentId::from("ETH-USD-PERP.DYDX");
4228
4229 let request = RequestInstrument::new(
4230 instrument_id,
4231 None,
4232 None,
4233 Some(client_id),
4234 UUID4::new(),
4235 get_atomic_clock_realtime().get_time_ns(),
4236 None,
4237 );
4238
4239 assert!(client.request_instrument(&request).is_ok());
4240
4241 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
4243
4244 let final_cache_size = client.instruments.len();
4246 assert!(final_cache_size >= initial_cache_size);
4247 }
4249
4250 #[tokio::test]
4251 async fn test_request_instrument_correlation_id() {
4252 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4254 set_data_event_sender(sender);
4255
4256 let client_id = ClientId::from("DYDX-CORR-ID");
4257 let config = DydxDataClientConfig::default();
4258 let http_client = DydxHttpClient::default();
4259 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4260
4261 let instrument = create_test_instrument_any();
4263 let instrument_id = instrument.id();
4264 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4265 client.instruments.insert(symbol_key, instrument.clone());
4266
4267 let request_id = UUID4::new();
4268 let request = RequestInstrument::new(
4269 instrument_id,
4270 None,
4271 None,
4272 Some(client_id),
4273 request_id,
4274 get_atomic_clock_realtime().get_time_ns(),
4275 None,
4276 );
4277
4278 assert!(client.request_instrument(&request).is_ok());
4279
4280 let timeout = tokio::time::Duration::from_millis(500);
4282 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4283 tokio::time::timeout(timeout, rx.recv()).await
4284 {
4285 assert_eq!(resp.correlation_id, request_id);
4286 }
4287 }
4288
4289 #[tokio::test]
4290 async fn test_request_instrument_response_format_boxed() {
4291 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4293 set_data_event_sender(sender);
4294
4295 let client_id = ClientId::from("DYDX-BOXED");
4296 let config = DydxDataClientConfig::default();
4297 let http_client = DydxHttpClient::default();
4298 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4299
4300 let instrument = create_test_instrument_any();
4302 let instrument_id = instrument.id();
4303 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4304 client.instruments.insert(symbol_key, instrument.clone());
4305
4306 let request = RequestInstrument::new(
4307 instrument_id,
4308 None,
4309 None,
4310 Some(client_id),
4311 UUID4::new(),
4312 get_atomic_clock_realtime().get_time_ns(),
4313 None,
4314 );
4315
4316 assert!(client.request_instrument(&request).is_ok());
4317
4318 let timeout = tokio::time::Duration::from_millis(500);
4320 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(boxed_resp)))) =
4321 tokio::time::timeout(timeout, rx.recv()).await
4322 {
4323 assert_eq!(boxed_resp.instrument_id, instrument_id);
4325 assert_eq!(boxed_resp.client_id, client_id);
4326 assert!(boxed_resp.start.is_none());
4327 assert!(boxed_resp.end.is_none());
4328 assert!(boxed_resp.ts_init > 0);
4329 }
4330 }
4331
4332 #[rstest]
4333 fn test_request_instrument_symbol_extraction() {
4334 setup_test_env();
4336
4337 let client_id = ClientId::from("DYDX-SYMBOL");
4338 let config = DydxDataClientConfig::default();
4339 let http_client = DydxHttpClient::default();
4340 let _client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4341
4342 let test_cases = vec![
4345 ("BTC-USD-PERP.DYDX", "BTC-USD-PERP"),
4346 ("ETH-USD-PERP.DYDX", "ETH-USD-PERP"),
4347 ("SOL-USD-PERP.DYDX", "SOL-USD-PERP"),
4348 ];
4349
4350 for (instrument_id_str, expected_symbol) in test_cases {
4351 let instrument_id = InstrumentId::from(instrument_id_str);
4352 let symbol = Ustr::from(instrument_id.symbol.as_str());
4353 assert_eq!(symbol.as_str(), expected_symbol);
4354 }
4355 }
4356
4357 #[tokio::test]
4358 async fn test_request_instrument_client_id_fallback() {
4359 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4361 set_data_event_sender(sender);
4362
4363 let client_id = ClientId::from("DYDX-FALLBACK");
4364 let config = DydxDataClientConfig::default();
4365 let http_client = DydxHttpClient::default();
4366 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4367
4368 let instrument = create_test_instrument_any();
4370 let instrument_id = instrument.id();
4371 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4372 client.instruments.insert(symbol_key, instrument.clone());
4373
4374 let request = RequestInstrument::new(
4375 instrument_id,
4376 None,
4377 None,
4378 None, UUID4::new(),
4380 get_atomic_clock_realtime().get_time_ns(),
4381 None,
4382 );
4383
4384 assert!(client.request_instrument(&request).is_ok());
4385
4386 let timeout = tokio::time::Duration::from_millis(500);
4388 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4389 tokio::time::timeout(timeout, rx.recv()).await
4390 {
4391 assert_eq!(resp.client_id, client_id);
4392 }
4393 }
4394
4395 #[tokio::test]
4396 async fn test_request_trades_success_with_limit_and_symbol_conversion() {
4397 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4398 set_data_event_sender(sender);
4399
4400 let created_at = Utc::now();
4401
4402 let http_trade = crate::http::models::Trade {
4403 id: "trade-1".to_string(),
4404 side: OrderSide::Buy,
4405 size: dec!(1.5),
4406 price: dec!(100.25),
4407 created_at,
4408 created_at_height: 1,
4409 trade_type: crate::common::enums::DydxTradeType::Limit,
4410 };
4411
4412 let trades_response = crate::http::models::TradesResponse {
4413 trades: vec![http_trade],
4414 };
4415
4416 let state = TradesTestState {
4417 response: Arc::new(trades_response),
4418 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4419 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4420 };
4421
4422 let addr = start_trades_test_server(state.clone()).await;
4423 let base_url = format!("http://{addr}");
4424
4425 let client_id = ClientId::from("DYDX-TRADES-SUCCESS");
4426 let config = DydxDataClientConfig {
4427 base_url_http: Some(base_url),
4428 is_testnet: true,
4429 ..Default::default()
4430 };
4431
4432 let http_client = DydxHttpClient::new(
4433 config.base_url_http.clone(),
4434 config.http_timeout_secs,
4435 config.http_proxy_url.clone(),
4436 config.is_testnet,
4437 None,
4438 )
4439 .unwrap();
4440
4441 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4442
4443 let instrument = create_test_instrument_any();
4444 let instrument_id = instrument.id();
4445 let price_precision = instrument.price_precision();
4446 let size_precision = instrument.size_precision();
4447 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4448 client.instruments.insert(symbol_key, instrument);
4449
4450 let request_id = UUID4::new();
4451 let now = Utc::now();
4452 let start = Some(now - chrono::Duration::seconds(10));
4453 let end = Some(now + chrono::Duration::seconds(10));
4454 let limit = std::num::NonZeroUsize::new(100).unwrap();
4455
4456 let request = RequestTrades::new(
4457 instrument_id,
4458 start,
4459 end,
4460 Some(limit),
4461 Some(client_id),
4462 request_id,
4463 get_atomic_clock_realtime().get_time_ns(),
4464 None,
4465 );
4466
4467 assert!(client.request_trades(&request).is_ok());
4468
4469 let timeout = tokio::time::Duration::from_secs(1);
4470 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4471 tokio::time::timeout(timeout, rx.recv()).await
4472 {
4473 assert_eq!(resp.correlation_id, request_id);
4474 assert_eq!(resp.client_id, client_id);
4475 assert_eq!(resp.instrument_id, instrument_id);
4476 assert_eq!(resp.data.len(), 1);
4477
4478 let tick = &resp.data[0];
4479 assert_eq!(tick.instrument_id, instrument_id);
4480 assert_eq!(tick.price, Price::new(100.25, price_precision));
4481 assert_eq!(tick.size, Quantity::new(1.5, size_precision));
4482 assert_eq!(tick.trade_id.to_string(), "trade-1");
4483
4484 use nautilus_model::enums::AggressorSide;
4485 assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
4486 } else {
4487 panic!("did not receive trades response in time");
4488 }
4489
4490 let last_ticker = state.last_ticker.lock().await.clone();
4492 assert_eq!(last_ticker.as_deref(), Some("BTC-USD"));
4493
4494 let last_limit = *state.last_limit.lock().await;
4495 assert_eq!(last_limit, Some(Some(100)));
4496 }
4497
4498 #[tokio::test]
4499 async fn test_request_trades_empty_response_and_no_limit() {
4500 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4501 set_data_event_sender(sender);
4502
4503 let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4504
4505 let state = TradesTestState {
4506 response: Arc::new(trades_response),
4507 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4508 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4509 };
4510
4511 let addr = start_trades_test_server(state.clone()).await;
4512 let base_url = format!("http://{addr}");
4513
4514 let client_id = ClientId::from("DYDX-TRADES-EMPTY");
4515 let config = DydxDataClientConfig {
4516 base_url_http: Some(base_url),
4517 is_testnet: true,
4518 ..Default::default()
4519 };
4520
4521 let http_client = DydxHttpClient::new(
4522 config.base_url_http.clone(),
4523 config.http_timeout_secs,
4524 config.http_proxy_url.clone(),
4525 config.is_testnet,
4526 None,
4527 )
4528 .unwrap();
4529
4530 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4531
4532 let instrument = create_test_instrument_any();
4533 let instrument_id = instrument.id();
4534 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4535 client.instruments.insert(symbol_key, instrument);
4536
4537 let request_id = UUID4::new();
4538
4539 let request = RequestTrades::new(
4540 instrument_id,
4541 None,
4542 None,
4543 None, Some(client_id),
4545 request_id,
4546 get_atomic_clock_realtime().get_time_ns(),
4547 None,
4548 );
4549
4550 assert!(client.request_trades(&request).is_ok());
4551
4552 let timeout = tokio::time::Duration::from_secs(1);
4553 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4554 tokio::time::timeout(timeout, rx.recv()).await
4555 {
4556 assert_eq!(resp.correlation_id, request_id);
4557 assert_eq!(resp.client_id, client_id);
4558 assert_eq!(resp.instrument_id, instrument_id);
4559 assert!(resp.data.is_empty());
4560 } else {
4561 panic!("did not receive trades response in time");
4562 }
4563
4564 let last_limit = *state.last_limit.lock().await;
4566 assert_eq!(last_limit, Some(None));
4567 }
4568
4569 #[tokio::test]
4570 async fn test_request_trades_timestamp_filtering() {
4571 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4572 set_data_event_sender(sender);
4573
4574 let now = Utc::now();
4575 let trade_before = crate::http::models::Trade {
4576 id: "before".to_string(),
4577 side: OrderSide::Buy,
4578 size: dec!(1.0),
4579 price: dec!(100.0),
4580 created_at: now - chrono::Duration::seconds(60),
4581 created_at_height: 1,
4582 trade_type: crate::common::enums::DydxTradeType::Limit,
4583 };
4584 let trade_inside = crate::http::models::Trade {
4585 id: "inside".to_string(),
4586 side: OrderSide::Sell,
4587 size: dec!(2.0),
4588 price: dec!(101.0),
4589 created_at: now,
4590 created_at_height: 2,
4591 trade_type: crate::common::enums::DydxTradeType::Limit,
4592 };
4593 let trade_after = crate::http::models::Trade {
4594 id: "after".to_string(),
4595 side: OrderSide::Buy,
4596 size: dec!(3.0),
4597 price: dec!(102.0),
4598 created_at: now + chrono::Duration::seconds(60),
4599 created_at_height: 3,
4600 trade_type: crate::common::enums::DydxTradeType::Limit,
4601 };
4602
4603 let trades_response = crate::http::models::TradesResponse {
4604 trades: vec![trade_before, trade_inside.clone(), trade_after],
4605 };
4606
4607 let state = TradesTestState {
4608 response: Arc::new(trades_response),
4609 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4610 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4611 };
4612
4613 let addr = start_trades_test_server(state).await;
4614 let base_url = format!("http://{addr}");
4615
4616 let client_id = ClientId::from("DYDX-TRADES-FILTER");
4617 let config = DydxDataClientConfig {
4618 base_url_http: Some(base_url),
4619 is_testnet: true,
4620 ..Default::default()
4621 };
4622
4623 let http_client = DydxHttpClient::new(
4624 config.base_url_http.clone(),
4625 config.http_timeout_secs,
4626 config.http_proxy_url.clone(),
4627 config.is_testnet,
4628 None,
4629 )
4630 .unwrap();
4631
4632 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4633
4634 let instrument = create_test_instrument_any();
4635 let instrument_id = instrument.id();
4636 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4637 client.instruments.insert(symbol_key, instrument);
4638
4639 let request_id = UUID4::new();
4640
4641 let start = Some(now - chrono::Duration::seconds(10));
4643 let end = Some(now + chrono::Duration::seconds(10));
4644
4645 let request = RequestTrades::new(
4646 instrument_id,
4647 start,
4648 end,
4649 None,
4650 Some(client_id),
4651 request_id,
4652 get_atomic_clock_realtime().get_time_ns(),
4653 None,
4654 );
4655
4656 assert!(client.request_trades(&request).is_ok());
4657
4658 let timeout = tokio::time::Duration::from_secs(1);
4659 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4660 tokio::time::timeout(timeout, rx.recv()).await
4661 {
4662 assert_eq!(resp.correlation_id, request_id);
4663 assert_eq!(resp.client_id, client_id);
4664 assert_eq!(resp.instrument_id, instrument_id);
4665 assert_eq!(resp.data.len(), 1);
4666
4667 let tick = &resp.data[0];
4668 assert_eq!(tick.trade_id.to_string(), "inside");
4669 assert_eq!(tick.price.as_decimal(), dec!(101.0));
4670 } else {
4671 panic!("did not receive trades response in time");
4672 }
4673 }
4674
4675 #[tokio::test]
4676 async fn test_request_trades_correlation_id_matching() {
4677 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4679 set_data_event_sender(sender);
4680
4681 let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4682
4683 let state = TradesTestState {
4684 response: Arc::new(trades_response),
4685 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4686 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4687 };
4688
4689 let addr = start_trades_test_server(state).await;
4690 let base_url = format!("http://{addr}");
4691
4692 let client_id = ClientId::from("DYDX-TRADES-CORR");
4693 let config = DydxDataClientConfig {
4694 base_url_http: Some(base_url),
4695 is_testnet: true,
4696 ..Default::default()
4697 };
4698
4699 let http_client = DydxHttpClient::new(
4700 config.base_url_http.clone(),
4701 config.http_timeout_secs,
4702 config.http_proxy_url.clone(),
4703 config.is_testnet,
4704 None,
4705 )
4706 .unwrap();
4707
4708 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4709
4710 let instrument = create_test_instrument_any();
4711 let instrument_id = instrument.id();
4712 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4713 client.instruments.insert(symbol_key, instrument);
4714
4715 let request_id = UUID4::new();
4716 let request = RequestTrades::new(
4717 instrument_id,
4718 None,
4719 None,
4720 None,
4721 Some(client_id),
4722 request_id,
4723 get_atomic_clock_realtime().get_time_ns(),
4724 None,
4725 );
4726
4727 assert!(client.request_trades(&request).is_ok());
4728
4729 let timeout = tokio::time::Duration::from_millis(500);
4730 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4731 tokio::time::timeout(timeout, rx.recv()).await
4732 {
4733 assert_eq!(resp.correlation_id, request_id);
4734 }
4735 }
4736
4737 #[tokio::test]
4738 async fn test_request_trades_response_format() {
4739 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4741 set_data_event_sender(sender);
4742
4743 let created_at = Utc::now();
4744 let http_trade = crate::http::models::Trade {
4745 id: "format-test".to_string(),
4746 side: OrderSide::Sell,
4747 size: dec!(5.0),
4748 price: dec!(200.0),
4749 created_at,
4750 created_at_height: 100,
4751 trade_type: crate::common::enums::DydxTradeType::Limit,
4752 };
4753
4754 let trades_response = crate::http::models::TradesResponse {
4755 trades: vec![http_trade],
4756 };
4757
4758 let state = TradesTestState {
4759 response: Arc::new(trades_response),
4760 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4761 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4762 };
4763
4764 let addr = start_trades_test_server(state).await;
4765 let base_url = format!("http://{addr}");
4766
4767 let client_id = ClientId::from("DYDX-TRADES-FORMAT");
4768 let config = DydxDataClientConfig {
4769 base_url_http: Some(base_url),
4770 is_testnet: true,
4771 ..Default::default()
4772 };
4773
4774 let http_client = DydxHttpClient::new(
4775 config.base_url_http.clone(),
4776 config.http_timeout_secs,
4777 config.http_proxy_url.clone(),
4778 config.is_testnet,
4779 None,
4780 )
4781 .unwrap();
4782
4783 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4784
4785 let instrument = create_test_instrument_any();
4786 let instrument_id = instrument.id();
4787 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4788 client.instruments.insert(symbol_key, instrument);
4789
4790 let request = RequestTrades::new(
4791 instrument_id,
4792 None,
4793 None,
4794 None,
4795 Some(client_id),
4796 UUID4::new(),
4797 get_atomic_clock_realtime().get_time_ns(),
4798 None,
4799 );
4800
4801 assert!(client.request_trades(&request).is_ok());
4802
4803 let timeout = tokio::time::Duration::from_millis(500);
4804 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4805 tokio::time::timeout(timeout, rx.recv()).await
4806 {
4807 assert_eq!(resp.client_id, client_id);
4809 assert_eq!(resp.instrument_id, instrument_id);
4810 assert!(resp.data.len() == 1);
4811 assert!(resp.ts_init > 0);
4812
4813 let tick = &resp.data[0];
4815 assert_eq!(tick.instrument_id, instrument_id);
4816 assert!(tick.ts_event > 0);
4817 assert!(tick.ts_init > 0);
4818 }
4819 }
4820
4821 #[tokio::test]
4822 async fn test_request_trades_no_instrument_in_cache() {
4823 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4825 set_data_event_sender(sender);
4826
4827 let client_id = ClientId::from("DYDX-TRADES-NO-INST");
4828 let config = DydxDataClientConfig::default();
4829 let http_client = DydxHttpClient::default();
4830 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4831
4832 let instrument_id = InstrumentId::from("UNKNOWN-SYMBOL.DYDX");
4834
4835 let request = RequestTrades::new(
4836 instrument_id,
4837 None,
4838 None,
4839 None,
4840 Some(client_id),
4841 UUID4::new(),
4842 get_atomic_clock_realtime().get_time_ns(),
4843 None,
4844 );
4845
4846 assert!(client.request_trades(&request).is_ok());
4847
4848 let timeout = tokio::time::Duration::from_millis(500);
4850 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4851 tokio::time::timeout(timeout, rx.recv()).await
4852 {
4853 assert!(resp.data.is_empty());
4854 }
4855 }
4856
4857 #[tokio::test]
4858 async fn test_request_trades_limit_parameter() {
4859 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4861 set_data_event_sender(sender);
4862
4863 let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4864
4865 let state = TradesTestState {
4866 response: Arc::new(trades_response),
4867 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4868 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4869 };
4870
4871 let addr = start_trades_test_server(state.clone()).await;
4872 let base_url = format!("http://{addr}");
4873
4874 let client_id = ClientId::from("DYDX-TRADES-LIMIT");
4875 let config = DydxDataClientConfig {
4876 base_url_http: Some(base_url),
4877 is_testnet: true,
4878 ..Default::default()
4879 };
4880
4881 let http_client = DydxHttpClient::new(
4882 config.base_url_http.clone(),
4883 config.http_timeout_secs,
4884 config.http_proxy_url.clone(),
4885 config.is_testnet,
4886 None,
4887 )
4888 .unwrap();
4889
4890 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4891
4892 let instrument = create_test_instrument_any();
4893 let instrument_id = instrument.id();
4894 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4895 client.instruments.insert(symbol_key, instrument);
4896
4897 let limit = std::num::NonZeroUsize::new(500).unwrap();
4899 let request = RequestTrades::new(
4900 instrument_id,
4901 None,
4902 None,
4903 Some(limit),
4904 Some(client_id),
4905 UUID4::new(),
4906 get_atomic_clock_realtime().get_time_ns(),
4907 None,
4908 );
4909
4910 assert!(client.request_trades(&request).is_ok());
4911
4912 let state_clone = state.clone();
4913 wait_until_async(
4914 || async { state_clone.last_limit.lock().await.is_some() },
4915 Duration::from_secs(5),
4916 )
4917 .await;
4918
4919 let last_limit = *state.last_limit.lock().await;
4921 assert_eq!(last_limit, Some(Some(500)));
4922 }
4923
4924 #[rstest]
4925 fn test_request_trades_symbol_conversion() {
4926 setup_test_env();
4928
4929 let client_id = ClientId::from("DYDX-SYMBOL-CONV");
4930 let config = DydxDataClientConfig::default();
4931 let http_client = DydxHttpClient::default();
4932 let _client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4933
4934 let test_cases = vec![
4936 ("BTC-USD-PERP.DYDX", "BTC-USD"),
4937 ("ETH-USD-PERP.DYDX", "ETH-USD"),
4938 ("SOL-USD-PERP.DYDX", "SOL-USD"),
4939 ];
4940
4941 for (instrument_id_str, expected_ticker) in test_cases {
4942 let instrument_id = InstrumentId::from(instrument_id_str);
4943 let ticker = instrument_id
4944 .symbol
4945 .as_str()
4946 .trim_end_matches("-PERP")
4947 .to_string();
4948 assert_eq!(ticker, expected_ticker);
4949 }
4950 }
4951
4952 #[tokio::test]
4953 async fn test_http_404_handling() {
4954 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4956 set_data_event_sender(sender);
4957
4958 let client_id = ClientId::from("DYDX-404");
4959 let config = DydxDataClientConfig {
4960 base_url_http: Some("http://localhost:1/nonexistent".to_string()),
4961 http_timeout_secs: Some(1),
4962 ..Default::default()
4963 };
4964
4965 let http_client = DydxHttpClient::new(
4966 config.base_url_http.clone(),
4967 config.http_timeout_secs,
4968 config.http_proxy_url.clone(),
4969 config.is_testnet,
4970 None,
4971 )
4972 .unwrap();
4973
4974 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4975
4976 let request = RequestInstruments::new(
4977 None,
4978 None,
4979 Some(client_id),
4980 Some(*DYDX_VENUE),
4981 UUID4::new(),
4982 get_atomic_clock_realtime().get_time_ns(),
4983 None,
4984 );
4985
4986 assert!(client.request_instruments(&request).is_ok());
4987
4988 let timeout = tokio::time::Duration::from_secs(2);
4990 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
4991 tokio::time::timeout(timeout, rx.recv()).await
4992 {
4993 assert!(resp.data.is_empty(), "Expected empty response on 404");
4994 }
4995 }
4996
4997 #[tokio::test]
4998 async fn test_http_500_handling() {
4999 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5001 set_data_event_sender(sender);
5002
5003 let client_id = ClientId::from("DYDX-500");
5004 let config = DydxDataClientConfig {
5005 base_url_http: Some("http://httpstat.us/500".to_string()),
5006 http_timeout_secs: Some(2),
5007 ..Default::default()
5008 };
5009
5010 let http_client = DydxHttpClient::new(
5011 config.base_url_http.clone(),
5012 config.http_timeout_secs,
5013 config.http_proxy_url.clone(),
5014 config.is_testnet,
5015 None,
5016 )
5017 .unwrap();
5018
5019 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5020
5021 let instrument = create_test_instrument_any();
5022 let instrument_id = instrument.id();
5023 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5024 client.instruments.insert(symbol_key, instrument);
5025
5026 let request = RequestTrades::new(
5027 instrument_id,
5028 None,
5029 None,
5030 None,
5031 Some(client_id),
5032 UUID4::new(),
5033 get_atomic_clock_realtime().get_time_ns(),
5034 None,
5035 );
5036
5037 assert!(client.request_trades(&request).is_ok());
5038
5039 let timeout = tokio::time::Duration::from_secs(3);
5041 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5042 tokio::time::timeout(timeout, rx.recv()).await
5043 {
5044 assert!(resp.data.is_empty(), "Expected empty response on 500");
5045 }
5046 }
5047
5048 #[tokio::test]
5049 async fn test_network_timeout_handling() {
5050 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5052 set_data_event_sender(sender);
5053
5054 let client_id = ClientId::from("DYDX-TIMEOUT");
5055 let config = DydxDataClientConfig {
5056 base_url_http: Some("http://10.255.255.1:81".to_string()), http_timeout_secs: Some(1), ..Default::default()
5059 };
5060
5061 let http_client = DydxHttpClient::new(
5062 config.base_url_http.clone(),
5063 config.http_timeout_secs,
5064 config.http_proxy_url.clone(),
5065 config.is_testnet,
5066 None,
5067 )
5068 .unwrap();
5069
5070 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5071
5072 let request = RequestInstruments::new(
5073 None,
5074 None,
5075 Some(client_id),
5076 Some(*DYDX_VENUE),
5077 UUID4::new(),
5078 get_atomic_clock_realtime().get_time_ns(),
5079 None,
5080 );
5081
5082 assert!(client.request_instruments(&request).is_ok());
5083
5084 let timeout = tokio::time::Duration::from_secs(3);
5086 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5087 tokio::time::timeout(timeout, rx.recv()).await
5088 {
5089 assert!(resp.data.is_empty(), "Expected empty response on timeout");
5090 }
5091 }
5092
5093 #[tokio::test]
5094 async fn test_connection_refused_handling() {
5095 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5097 set_data_event_sender(sender);
5098
5099 let client_id = ClientId::from("DYDX-REFUSED");
5100 let config = DydxDataClientConfig {
5101 base_url_http: Some("http://localhost:9999".to_string()), http_timeout_secs: Some(1),
5103 ..Default::default()
5104 };
5105
5106 let http_client = DydxHttpClient::new(
5107 config.base_url_http.clone(),
5108 config.http_timeout_secs,
5109 config.http_proxy_url.clone(),
5110 config.is_testnet,
5111 None,
5112 )
5113 .unwrap();
5114
5115 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5116
5117 let instrument = create_test_instrument_any();
5118 let instrument_id = instrument.id();
5119 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5120 client.instruments.insert(symbol_key, instrument);
5121
5122 let request = RequestInstrument::new(
5123 instrument_id,
5124 None,
5125 None,
5126 Some(client_id),
5127 UUID4::new(),
5128 get_atomic_clock_realtime().get_time_ns(),
5129 None,
5130 );
5131
5132 assert!(client.request_instrument(&request).is_ok());
5133
5134 let timeout = tokio::time::Duration::from_secs(2);
5136 let result = tokio::time::timeout(timeout, rx.recv()).await;
5137
5138 match result {
5141 Ok(Some(DataEvent::Response(_))) => {
5142 }
5144 Ok(None) | Err(_) => {
5145 }
5147 _ => {}
5148 }
5149 }
5150
5151 #[tokio::test]
5152 async fn test_dns_resolution_failure_handling() {
5153 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5155 set_data_event_sender(sender);
5156
5157 let client_id = ClientId::from("DYDX-DNS");
5158 let config = DydxDataClientConfig {
5159 base_url_http: Some(
5160 "http://this-domain-definitely-does-not-exist-12345.invalid".to_string(),
5161 ),
5162 http_timeout_secs: Some(2),
5163 ..Default::default()
5164 };
5165
5166 let http_client = DydxHttpClient::new(
5167 config.base_url_http.clone(),
5168 config.http_timeout_secs,
5169 config.http_proxy_url.clone(),
5170 config.is_testnet,
5171 None,
5172 )
5173 .unwrap();
5174
5175 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5176
5177 let request = RequestInstruments::new(
5178 None,
5179 None,
5180 Some(client_id),
5181 Some(*DYDX_VENUE),
5182 UUID4::new(),
5183 get_atomic_clock_realtime().get_time_ns(),
5184 None,
5185 );
5186
5187 assert!(client.request_instruments(&request).is_ok());
5188
5189 let timeout = tokio::time::Duration::from_secs(3);
5191 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5192 tokio::time::timeout(timeout, rx.recv()).await
5193 {
5194 assert!(
5195 resp.data.is_empty(),
5196 "Expected empty response on DNS failure"
5197 );
5198 }
5199 }
5200
5201 #[tokio::test]
5202 async fn test_http_502_503_handling() {
5203 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5205 set_data_event_sender(sender);
5206
5207 let client_id = ClientId::from("DYDX-503");
5208 let config = DydxDataClientConfig {
5209 base_url_http: Some("http://httpstat.us/503".to_string()),
5210 http_timeout_secs: Some(2),
5211 ..Default::default()
5212 };
5213
5214 let http_client = DydxHttpClient::new(
5215 config.base_url_http.clone(),
5216 config.http_timeout_secs,
5217 config.http_proxy_url.clone(),
5218 config.is_testnet,
5219 None,
5220 )
5221 .unwrap();
5222
5223 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5224
5225 let request = RequestInstruments::new(
5226 None,
5227 None,
5228 Some(client_id),
5229 Some(*DYDX_VENUE),
5230 UUID4::new(),
5231 get_atomic_clock_realtime().get_time_ns(),
5232 None,
5233 );
5234
5235 assert!(client.request_instruments(&request).is_ok());
5236
5237 let timeout = tokio::time::Duration::from_secs(3);
5239 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5240 tokio::time::timeout(timeout, rx.recv()).await
5241 {
5242 assert!(resp.data.is_empty(), "Expected empty response on 503");
5243 }
5244 }
5245
5246 #[tokio::test]
5247 async fn test_http_429_rate_limit_handling() {
5248 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5250 set_data_event_sender(sender);
5251
5252 let client_id = ClientId::from("DYDX-429");
5253 let config = DydxDataClientConfig {
5254 base_url_http: Some("http://httpstat.us/429".to_string()),
5255 http_timeout_secs: Some(2),
5256 ..Default::default()
5257 };
5258
5259 let http_client = DydxHttpClient::new(
5260 config.base_url_http.clone(),
5261 config.http_timeout_secs,
5262 config.http_proxy_url.clone(),
5263 config.is_testnet,
5264 None,
5265 )
5266 .unwrap();
5267
5268 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5269
5270 let instrument = create_test_instrument_any();
5271 let instrument_id = instrument.id();
5272 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5273 client.instruments.insert(symbol_key, instrument);
5274
5275 let request = RequestTrades::new(
5276 instrument_id,
5277 None,
5278 None,
5279 None,
5280 Some(client_id),
5281 UUID4::new(),
5282 get_atomic_clock_realtime().get_time_ns(),
5283 None,
5284 );
5285
5286 assert!(client.request_trades(&request).is_ok());
5287
5288 let timeout = tokio::time::Duration::from_secs(3);
5290 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5291 tokio::time::timeout(timeout, rx.recv()).await
5292 {
5293 assert!(
5294 resp.data.is_empty(),
5295 "Expected empty response on rate limit"
5296 );
5297 }
5298 }
5299
5300 #[tokio::test]
5301 async fn test_error_handling_does_not_panic() {
5302 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5304 set_data_event_sender(sender);
5305
5306 let client_id = ClientId::from("DYDX-NO-PANIC");
5307 let config = DydxDataClientConfig {
5308 base_url_http: Some("http://invalid".to_string()),
5309 ..Default::default()
5310 };
5311
5312 let http_client = DydxHttpClient::new(
5313 config.base_url_http.clone(),
5314 config.http_timeout_secs,
5315 config.http_proxy_url.clone(),
5316 config.is_testnet,
5317 None,
5318 )
5319 .unwrap();
5320
5321 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5322
5323 let request_instruments = RequestInstruments::new(
5325 None,
5326 None,
5327 Some(client_id),
5328 Some(*DYDX_VENUE),
5329 UUID4::new(),
5330 get_atomic_clock_realtime().get_time_ns(),
5331 None,
5332 );
5333 assert!(client.request_instruments(&request_instruments).is_ok());
5334
5335 let instrument_id = InstrumentId::from("INVALID.DYDX");
5336 let request_instrument = RequestInstrument::new(
5337 instrument_id,
5338 None,
5339 None,
5340 Some(client_id),
5341 UUID4::new(),
5342 get_atomic_clock_realtime().get_time_ns(),
5343 None,
5344 );
5345 assert!(client.request_instrument(&request_instrument).is_ok());
5346
5347 let request_trades = RequestTrades::new(
5348 instrument_id,
5349 None,
5350 None,
5351 None,
5352 Some(client_id),
5353 UUID4::new(),
5354 get_atomic_clock_realtime().get_time_ns(),
5355 None,
5356 );
5357 assert!(client.request_trades(&request_trades).is_ok());
5358 }
5359
5360 #[tokio::test]
5361 async fn test_malformed_json_response() {
5362 use axum::{Router, routing::get};
5364
5365 #[derive(Clone)]
5366 struct MalformedState;
5367
5368 async fn malformed_markets_handler() -> String {
5369 r#"{"markets": {"BTC-USD": {"ticker": "BTC-USD""#.to_string()
5371 }
5372
5373 let app = Router::new()
5374 .route("/v4/markets", get(malformed_markets_handler))
5375 .with_state(MalformedState);
5376
5377 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5378 let server_addr = listener.local_addr().unwrap();
5379 let port = server_addr.port();
5380
5381 tokio::spawn(async move {
5382 axum::serve(listener, app).await.unwrap();
5383 });
5384
5385 wait_for_server(server_addr).await;
5386
5387 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5388 set_data_event_sender(sender);
5389
5390 let client_id = ClientId::from("DYDX-MALFORMED");
5391 let config = DydxDataClientConfig {
5392 base_url_http: Some(format!("http://127.0.0.1:{port}")),
5393 http_timeout_secs: Some(2),
5394 ..Default::default()
5395 };
5396
5397 let http_client = DydxHttpClient::new(
5398 config.base_url_http.clone(),
5399 config.http_timeout_secs,
5400 config.http_proxy_url.clone(),
5401 config.is_testnet,
5402 None,
5403 )
5404 .unwrap();
5405
5406 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5407
5408 let request = RequestInstruments::new(
5409 None,
5410 None,
5411 Some(client_id),
5412 Some(*DYDX_VENUE),
5413 UUID4::new(),
5414 get_atomic_clock_realtime().get_time_ns(),
5415 None,
5416 );
5417
5418 assert!(client.request_instruments(&request).is_ok());
5419
5420 let timeout = tokio::time::Duration::from_secs(3);
5422 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5423 tokio::time::timeout(timeout, rx.recv()).await
5424 {
5425 assert!(
5426 resp.data.is_empty(),
5427 "Expected empty response on malformed JSON"
5428 );
5429 }
5430 }
5431
5432 #[tokio::test]
5433 async fn test_missing_required_fields_in_response() {
5434 use axum::{Json, Router, routing::get};
5436 use serde_json::{Value, json};
5437
5438 #[derive(Clone)]
5439 struct MissingFieldsState;
5440
5441 async fn missing_fields_handler() -> Json<Value> {
5442 Json(json!({
5444 "markets": {
5445 "BTC-USD": {
5446 "status": "ACTIVE",
5448 "baseAsset": "BTC",
5449 "quoteAsset": "USD",
5450 }
5452 }
5453 }))
5454 }
5455
5456 let app = Router::new()
5457 .route("/v4/markets", get(missing_fields_handler))
5458 .with_state(MissingFieldsState);
5459
5460 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5461 let server_addr = listener.local_addr().unwrap();
5462 let port = server_addr.port();
5463
5464 tokio::spawn(async move {
5465 axum::serve(listener, app).await.unwrap();
5466 });
5467
5468 wait_for_server(server_addr).await;
5469
5470 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5471 set_data_event_sender(sender);
5472
5473 let client_id = ClientId::from("DYDX-MISSING");
5474 let config = DydxDataClientConfig {
5475 base_url_http: Some(format!("http://127.0.0.1:{port}")),
5476 http_timeout_secs: Some(2),
5477 ..Default::default()
5478 };
5479
5480 let http_client = DydxHttpClient::new(
5481 config.base_url_http.clone(),
5482 config.http_timeout_secs,
5483 config.http_proxy_url.clone(),
5484 config.is_testnet,
5485 None,
5486 )
5487 .unwrap();
5488
5489 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5490
5491 let request = RequestInstruments::new(
5492 None,
5493 None,
5494 Some(client_id),
5495 Some(*DYDX_VENUE),
5496 UUID4::new(),
5497 get_atomic_clock_realtime().get_time_ns(),
5498 None,
5499 );
5500
5501 assert!(client.request_instruments(&request).is_ok());
5502
5503 let timeout = tokio::time::Duration::from_secs(3);
5505 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5506 tokio::time::timeout(timeout, rx.recv()).await
5507 {
5508 assert!(resp.correlation_id == request.request_id);
5511 }
5512 }
5513
5514 #[tokio::test]
5515 async fn test_invalid_data_types_in_response() {
5516 use axum::{Json, Router, routing::get};
5518 use serde_json::{Value, json};
5519
5520 #[derive(Clone)]
5521 struct InvalidTypesState;
5522
5523 async fn invalid_types_handler() -> Json<Value> {
5524 Json(json!({
5526 "markets": {
5527 "BTC-USD": {
5528 "ticker": "BTC-USD",
5529 "status": "ACTIVE",
5530 "baseAsset": "BTC",
5531 "quoteAsset": "USD",
5532 "stepSize": "not_a_number", "tickSize": true, "minOrderSize": ["array"], "market": 12345, }
5537 }
5538 }))
5539 }
5540
5541 let app = Router::new()
5542 .route("/v4/markets", get(invalid_types_handler))
5543 .with_state(InvalidTypesState);
5544
5545 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5546 let server_addr = listener.local_addr().unwrap();
5547 let port = server_addr.port();
5548
5549 tokio::spawn(async move {
5550 axum::serve(listener, app).await.unwrap();
5551 });
5552
5553 wait_for_server(server_addr).await;
5554
5555 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5556 set_data_event_sender(sender);
5557
5558 let client_id = ClientId::from("DYDX-TYPES");
5559 let config = DydxDataClientConfig {
5560 base_url_http: Some(format!("http://127.0.0.1:{port}")),
5561 http_timeout_secs: Some(2),
5562 ..Default::default()
5563 };
5564
5565 let http_client = DydxHttpClient::new(
5566 config.base_url_http.clone(),
5567 config.http_timeout_secs,
5568 config.http_proxy_url.clone(),
5569 config.is_testnet,
5570 None,
5571 )
5572 .unwrap();
5573
5574 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5575
5576 let request = RequestInstruments::new(
5577 None,
5578 None,
5579 Some(client_id),
5580 Some(*DYDX_VENUE),
5581 UUID4::new(),
5582 get_atomic_clock_realtime().get_time_ns(),
5583 None,
5584 );
5585
5586 assert!(client.request_instruments(&request).is_ok());
5587
5588 let timeout = tokio::time::Duration::from_secs(3);
5590 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5591 tokio::time::timeout(timeout, rx.recv()).await
5592 {
5593 assert!(resp.correlation_id == request.request_id);
5595 }
5596 }
5597
5598 #[tokio::test]
5599 async fn test_unexpected_response_structure() {
5600 use axum::{Json, Router, routing::get};
5602 use serde_json::{Value, json};
5603
5604 #[derive(Clone)]
5605 struct UnexpectedState;
5606
5607 async fn unexpected_structure_handler() -> Json<Value> {
5608 Json(json!({
5610 "error": "Something went wrong",
5611 "code": 500,
5612 "data": null,
5613 "unexpected_field": {
5614 "nested": {
5615 "deeply": [1, 2, 3]
5616 }
5617 }
5618 }))
5619 }
5620
5621 let app = Router::new()
5622 .route("/v4/markets", get(unexpected_structure_handler))
5623 .with_state(UnexpectedState);
5624
5625 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5626 let server_addr = listener.local_addr().unwrap();
5627 let port = server_addr.port();
5628
5629 tokio::spawn(async move {
5630 axum::serve(listener, app).await.unwrap();
5631 });
5632
5633 wait_for_server(server_addr).await;
5634
5635 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5636 set_data_event_sender(sender);
5637
5638 let client_id = ClientId::from("DYDX-STRUCT");
5639 let config = DydxDataClientConfig {
5640 base_url_http: Some(format!("http://127.0.0.1:{port}")),
5641 http_timeout_secs: Some(2),
5642 ..Default::default()
5643 };
5644
5645 let http_client = DydxHttpClient::new(
5646 config.base_url_http.clone(),
5647 config.http_timeout_secs,
5648 config.http_proxy_url.clone(),
5649 config.is_testnet,
5650 None,
5651 )
5652 .unwrap();
5653
5654 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5655
5656 let request = RequestInstruments::new(
5657 None,
5658 None,
5659 Some(client_id),
5660 Some(*DYDX_VENUE),
5661 UUID4::new(),
5662 get_atomic_clock_realtime().get_time_ns(),
5663 None,
5664 );
5665
5666 assert!(client.request_instruments(&request).is_ok());
5667
5668 let timeout = tokio::time::Duration::from_secs(3);
5670 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5671 tokio::time::timeout(timeout, rx.recv()).await
5672 {
5673 assert!(
5674 resp.data.is_empty(),
5675 "Expected empty response on unexpected structure"
5676 );
5677 assert!(resp.correlation_id == request.request_id);
5678 }
5679 }
5680
5681 #[tokio::test]
5682 async fn test_empty_markets_object_in_response() {
5683 use axum::{Json, Router, routing::get};
5685 use serde_json::{Value, json};
5686
5687 #[derive(Clone)]
5688 struct EmptyMarketsState;
5689
5690 async fn empty_markets_handler() -> Json<Value> {
5691 Json(json!({
5692 "markets": {}
5693 }))
5694 }
5695
5696 let app = Router::new()
5697 .route("/v4/markets", get(empty_markets_handler))
5698 .with_state(EmptyMarketsState);
5699
5700 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5701 let server_addr = listener.local_addr().unwrap();
5702 let port = server_addr.port();
5703
5704 tokio::spawn(async move {
5705 axum::serve(listener, app).await.unwrap();
5706 });
5707
5708 wait_for_server(server_addr).await;
5709
5710 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5711 set_data_event_sender(sender);
5712
5713 let client_id = ClientId::from("DYDX-EMPTY");
5714 let config = DydxDataClientConfig {
5715 base_url_http: Some(format!("http://127.0.0.1:{port}")),
5716 http_timeout_secs: Some(2),
5717 ..Default::default()
5718 };
5719
5720 let http_client = DydxHttpClient::new(
5721 config.base_url_http.clone(),
5722 config.http_timeout_secs,
5723 config.http_proxy_url.clone(),
5724 config.is_testnet,
5725 None,
5726 )
5727 .unwrap();
5728
5729 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5730
5731 let request = RequestInstruments::new(
5732 None,
5733 None,
5734 Some(client_id),
5735 Some(*DYDX_VENUE),
5736 UUID4::new(),
5737 get_atomic_clock_realtime().get_time_ns(),
5738 None,
5739 );
5740
5741 assert!(client.request_instruments(&request).is_ok());
5742
5743 let timeout = tokio::time::Duration::from_secs(3);
5745 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5746 tokio::time::timeout(timeout, rx.recv()).await
5747 {
5748 assert!(
5749 resp.data.is_empty(),
5750 "Expected empty response for empty markets"
5751 );
5752 assert!(resp.correlation_id == request.request_id);
5753 }
5754 }
5755
5756 #[tokio::test]
5757 async fn test_null_values_in_response() {
5758 use axum::{Json, Router, routing::get};
5760 use serde_json::{Value, json};
5761
5762 #[derive(Clone)]
5763 struct NullValuesState;
5764
5765 async fn null_values_handler() -> Json<Value> {
5766 Json(json!({
5767 "markets": {
5768 "BTC-USD": {
5769 "ticker": null,
5770 "status": "ACTIVE",
5771 "baseAsset": null,
5772 "quoteAsset": "USD",
5773 "stepSize": null,
5774 }
5775 }
5776 }))
5777 }
5778
5779 let app = Router::new()
5780 .route("/v4/markets", get(null_values_handler))
5781 .with_state(NullValuesState);
5782
5783 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
5784 let server_addr = listener.local_addr().unwrap();
5785 let port = server_addr.port();
5786
5787 tokio::spawn(async move {
5788 axum::serve(listener, app).await.unwrap();
5789 });
5790
5791 wait_for_server(server_addr).await;
5792
5793 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5794 set_data_event_sender(sender);
5795
5796 let client_id = ClientId::from("DYDX-NULL");
5797 let config = DydxDataClientConfig {
5798 base_url_http: Some(format!("http://127.0.0.1:{port}")),
5799 http_timeout_secs: Some(2),
5800 ..Default::default()
5801 };
5802
5803 let http_client = DydxHttpClient::new(
5804 config.base_url_http.clone(),
5805 config.http_timeout_secs,
5806 config.http_proxy_url.clone(),
5807 config.is_testnet,
5808 None,
5809 )
5810 .unwrap();
5811
5812 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5813
5814 let request = RequestInstruments::new(
5815 None,
5816 None,
5817 Some(client_id),
5818 Some(*DYDX_VENUE),
5819 UUID4::new(),
5820 get_atomic_clock_realtime().get_time_ns(),
5821 None,
5822 );
5823
5824 assert!(client.request_instruments(&request).is_ok());
5825
5826 let timeout = tokio::time::Duration::from_secs(3);
5828 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5829 tokio::time::timeout(timeout, rx.recv()).await
5830 {
5831 assert!(resp.correlation_id == request.request_id);
5833 }
5834 }
5835
5836 #[tokio::test]
5837 async fn test_invalid_instrument_id_format() {
5838 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5840 set_data_event_sender(sender);
5841
5842 let client_id = ClientId::from("DYDX-INVALID-ID");
5843 let config = DydxDataClientConfig::default();
5844
5845 let http_client = DydxHttpClient::new(
5846 config.base_url_http.clone(),
5847 config.http_timeout_secs,
5848 config.http_proxy_url.clone(),
5849 config.is_testnet,
5850 None,
5851 )
5852 .unwrap();
5853
5854 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5855
5856 let non_existent_id = InstrumentId::from("NONEXISTENT-USD.DYDX");
5858
5859 let request = RequestInstrument::new(
5860 non_existent_id,
5861 None,
5862 None,
5863 Some(client_id),
5864 UUID4::new(),
5865 get_atomic_clock_realtime().get_time_ns(),
5866 None,
5867 );
5868
5869 assert!(client.request_instrument(&request).is_ok());
5870
5871 let timeout = tokio::time::Duration::from_secs(2);
5873 let result = tokio::time::timeout(timeout, rx.recv()).await;
5874
5875 match result {
5877 Ok(Some(DataEvent::Response(DataResponse::Instrument(_)))) => {
5878 }
5880 Ok(None) | Err(_) => {
5881 }
5883 _ => {}
5884 }
5885 }
5886
5887 #[tokio::test]
5888 async fn test_invalid_date_range_end_before_start() {
5889 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5891 set_data_event_sender(sender);
5892
5893 let client_id = ClientId::from("DYDX-DATE-RANGE");
5894 let config = DydxDataClientConfig::default();
5895
5896 let http_client = DydxHttpClient::new(
5897 config.base_url_http.clone(),
5898 config.http_timeout_secs,
5899 config.http_proxy_url.clone(),
5900 config.is_testnet,
5901 None,
5902 )
5903 .unwrap();
5904
5905 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5906
5907 let instrument = create_test_instrument_any();
5908 let instrument_id = instrument.id();
5909 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5910 client.instruments.insert(symbol_key, instrument);
5911
5912 let start = Utc::now();
5914 let end = start - chrono::Duration::hours(24); let request = RequestTrades::new(
5917 instrument_id,
5918 Some(start),
5919 Some(end),
5920 None,
5921 Some(client_id),
5922 UUID4::new(),
5923 get_atomic_clock_realtime().get_time_ns(),
5924 None,
5925 );
5926
5927 assert!(client.request_trades(&request).is_ok());
5928
5929 let timeout = tokio::time::Duration::from_secs(2);
5931 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5932 tokio::time::timeout(timeout, rx.recv()).await
5933 {
5934 assert!(resp.correlation_id == request.request_id);
5936 }
5937 }
5938
5939 #[tokio::test]
5940 async fn test_negative_limit_value() {
5941 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5944 set_data_event_sender(sender);
5945
5946 let client_id = ClientId::from("DYDX-NEG-LIMIT");
5947 let config = DydxDataClientConfig::default();
5948
5949 let http_client = DydxHttpClient::new(
5950 config.base_url_http.clone(),
5951 config.http_timeout_secs,
5952 config.http_proxy_url.clone(),
5953 config.is_testnet,
5954 None,
5955 )
5956 .unwrap();
5957
5958 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5959
5960 let instrument = create_test_instrument_any();
5961 let instrument_id = instrument.id();
5962 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5963 client.instruments.insert(symbol_key, instrument);
5964
5965 let request = RequestTrades::new(
5967 instrument_id,
5968 None,
5969 None,
5970 std::num::NonZeroUsize::new(1), Some(client_id),
5972 UUID4::new(),
5973 get_atomic_clock_realtime().get_time_ns(),
5974 None,
5975 );
5976
5977 assert!(client.request_trades(&request).is_ok());
5979 }
5980
5981 #[tokio::test]
5982 async fn test_zero_limit_value() {
5983 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5986 set_data_event_sender(sender);
5987
5988 let client_id = ClientId::from("DYDX-ZERO-LIMIT");
5989 let config = DydxDataClientConfig::default();
5990
5991 let http_client = DydxHttpClient::new(
5992 config.base_url_http.clone(),
5993 config.http_timeout_secs,
5994 config.http_proxy_url.clone(),
5995 config.is_testnet,
5996 None,
5997 )
5998 .unwrap();
5999
6000 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6001
6002 let instrument = create_test_instrument_any();
6003 let instrument_id = instrument.id();
6004 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6005 client.instruments.insert(symbol_key, instrument);
6006
6007 let request = RequestTrades::new(
6008 instrument_id,
6009 None,
6010 None,
6011 None, Some(client_id),
6013 UUID4::new(),
6014 get_atomic_clock_realtime().get_time_ns(),
6015 None,
6016 );
6017
6018 assert!(client.request_trades(&request).is_ok());
6019
6020 let timeout = tokio::time::Duration::from_secs(2);
6022 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6023 tokio::time::timeout(timeout, rx.recv()).await
6024 {
6025 assert!(resp.correlation_id == request.request_id);
6026 }
6027 }
6028
6029 #[tokio::test]
6030 async fn test_very_large_limit_value() {
6031 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6033 set_data_event_sender(sender);
6034
6035 let client_id = ClientId::from("DYDX-LARGE-LIMIT");
6036 let config = DydxDataClientConfig::default();
6037
6038 let http_client = DydxHttpClient::new(
6039 config.base_url_http.clone(),
6040 config.http_timeout_secs,
6041 config.http_proxy_url.clone(),
6042 config.is_testnet,
6043 None,
6044 )
6045 .unwrap();
6046
6047 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6048
6049 let instrument = create_test_instrument_any();
6050 let instrument_id = instrument.id();
6051 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6052 client.instruments.insert(symbol_key, instrument);
6053
6054 let request = RequestTrades::new(
6055 instrument_id,
6056 None,
6057 None,
6058 std::num::NonZeroUsize::new(1_000_000), Some(client_id),
6060 UUID4::new(),
6061 get_atomic_clock_realtime().get_time_ns(),
6062 None,
6063 );
6064
6065 assert!(client.request_trades(&request).is_ok());
6067
6068 let timeout = tokio::time::Duration::from_secs(2);
6070 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6071 tokio::time::timeout(timeout, rx.recv()).await
6072 {
6073 assert!(resp.correlation_id == request.request_id);
6074 }
6075 }
6076
6077 #[tokio::test]
6078 async fn test_none_limit_uses_default() {
6079 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6081 set_data_event_sender(sender);
6082
6083 let client_id = ClientId::from("DYDX-NONE-LIMIT");
6084 let config = DydxDataClientConfig::default();
6085
6086 let http_client = DydxHttpClient::new(
6087 config.base_url_http.clone(),
6088 config.http_timeout_secs,
6089 config.http_proxy_url.clone(),
6090 config.is_testnet,
6091 None,
6092 )
6093 .unwrap();
6094
6095 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6096
6097 let instrument = create_test_instrument_any();
6098 let instrument_id = instrument.id();
6099 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6100 client.instruments.insert(symbol_key, instrument);
6101
6102 let request = RequestTrades::new(
6103 instrument_id,
6104 None,
6105 None,
6106 None, Some(client_id),
6108 UUID4::new(),
6109 get_atomic_clock_realtime().get_time_ns(),
6110 None,
6111 );
6112
6113 assert!(client.request_trades(&request).is_ok());
6115
6116 let timeout = tokio::time::Duration::from_secs(2);
6117 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6118 tokio::time::timeout(timeout, rx.recv()).await
6119 {
6120 assert!(resp.correlation_id == request.request_id);
6121 }
6122 }
6123
6124 #[tokio::test]
6125 async fn test_validation_does_not_panic() {
6126 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6128 set_data_event_sender(sender);
6129
6130 let client_id = ClientId::from("DYDX-VALIDATION");
6131 let config = DydxDataClientConfig::default();
6132
6133 let http_client = DydxHttpClient::new(
6134 config.base_url_http.clone(),
6135 config.http_timeout_secs,
6136 config.http_proxy_url.clone(),
6137 config.is_testnet,
6138 None,
6139 )
6140 .unwrap();
6141
6142 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6143
6144 let instrument = create_test_instrument_any();
6145 let instrument_id = instrument.id();
6146 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6147 client.instruments.insert(symbol_key, instrument);
6148
6149 let invalid_id = InstrumentId::from("INVALID.WRONG");
6151 let req1 = RequestInstrument::new(
6152 invalid_id,
6153 None,
6154 None,
6155 Some(client_id),
6156 UUID4::new(),
6157 get_atomic_clock_realtime().get_time_ns(),
6158 None,
6159 );
6160 assert!(client.request_instrument(&req1).is_ok());
6161
6162 let start = Utc::now();
6164 let end = start - chrono::Duration::hours(1);
6165 let req2 = RequestTrades::new(
6166 instrument_id,
6167 Some(start),
6168 Some(end),
6169 None,
6170 Some(client_id),
6171 UUID4::new(),
6172 get_atomic_clock_realtime().get_time_ns(),
6173 None,
6174 );
6175 assert!(client.request_trades(&req2).is_ok());
6176
6177 let req3 = RequestTrades::new(
6179 instrument_id,
6180 None,
6181 None,
6182 std::num::NonZeroUsize::new(1),
6183 Some(client_id),
6184 UUID4::new(),
6185 get_atomic_clock_realtime().get_time_ns(),
6186 None,
6187 );
6188 assert!(client.request_trades(&req3).is_ok());
6189
6190 let req4 = RequestTrades::new(
6192 instrument_id,
6193 None,
6194 None,
6195 std::num::NonZeroUsize::new(usize::MAX),
6196 Some(client_id),
6197 UUID4::new(),
6198 get_atomic_clock_realtime().get_time_ns(),
6199 None,
6200 );
6201 assert!(client.request_trades(&req4).is_ok());
6202
6203 }
6205
6206 #[tokio::test]
6207 async fn test_instruments_response_has_correct_venue() {
6208 use axum::{Json, Router, routing::get};
6210 use serde_json::{Value, json};
6211
6212 #[derive(Clone)]
6213 struct VenueTestState;
6214
6215 async fn venue_handler() -> Json<Value> {
6216 Json(json!({
6217 "markets": {
6218 "BTC-USD": {
6219 "ticker": "BTC-USD",
6220 "status": "ACTIVE",
6221 "baseAsset": "BTC",
6222 "quoteAsset": "USD",
6223 "stepSize": "0.0001",
6224 "tickSize": "1",
6225 "indexPrice": "50000",
6226 "oraclePrice": "50000",
6227 "priceChange24H": "1000",
6228 "nextFundingRate": "0.0001",
6229 "nextFundingAt": "2024-01-01T00:00:00.000Z",
6230 "minOrderSize": "0.001",
6231 "type": "PERPETUAL",
6232 "initialMarginFraction": "0.05",
6233 "maintenanceMarginFraction": "0.03",
6234 "volume24H": "1000000",
6235 "trades24H": "10000",
6236 "openInterest": "5000000",
6237 "incrementalInitialMarginFraction": "0.01",
6238 "incrementalPositionSize": "10",
6239 "maxPositionSize": "1000",
6240 "baselinePositionSize": "100",
6241 "assetResolution": "10000000000"
6242 }
6243 }
6244 }))
6245 }
6246
6247 let app = Router::new()
6248 .route("/v4/markets", get(venue_handler))
6249 .with_state(VenueTestState);
6250
6251 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
6252 let server_addr = listener.local_addr().unwrap();
6253 let port = server_addr.port();
6254
6255 tokio::spawn(async move {
6256 axum::serve(listener, app).await.unwrap();
6257 });
6258
6259 wait_for_server(server_addr).await;
6260
6261 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6262 set_data_event_sender(sender);
6263
6264 let client_id = ClientId::from("DYDX-VENUE-TEST");
6265 let config = DydxDataClientConfig {
6266 base_url_http: Some(format!("http://127.0.0.1:{port}")),
6267 http_timeout_secs: Some(2),
6268 ..Default::default()
6269 };
6270
6271 let http_client = DydxHttpClient::new(
6272 config.base_url_http.clone(),
6273 config.http_timeout_secs,
6274 config.http_proxy_url.clone(),
6275 config.is_testnet,
6276 None,
6277 )
6278 .unwrap();
6279
6280 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6281
6282 let request = RequestInstruments::new(
6283 None,
6284 None,
6285 Some(client_id),
6286 Some(*DYDX_VENUE),
6287 UUID4::new(),
6288 get_atomic_clock_realtime().get_time_ns(),
6289 None,
6290 );
6291
6292 assert!(client.request_instruments(&request).is_ok());
6293
6294 let timeout = tokio::time::Duration::from_secs(3);
6295 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6296 tokio::time::timeout(timeout, rx.recv()).await
6297 {
6298 assert_eq!(resp.venue, *DYDX_VENUE, "Response should have DYDX venue");
6300 }
6301 }
6302
6303 #[tokio::test]
6304 async fn test_instruments_response_contains_vec_instrument_any() {
6305 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6307 set_data_event_sender(sender);
6308
6309 let client_id = ClientId::from("DYDX-VEC-TEST");
6310 let config = DydxDataClientConfig::default();
6311
6312 let http_client = DydxHttpClient::new(
6313 config.base_url_http.clone(),
6314 config.http_timeout_secs,
6315 config.http_proxy_url.clone(),
6316 config.is_testnet,
6317 None,
6318 )
6319 .unwrap();
6320
6321 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6322
6323 let request = RequestInstruments::new(
6324 None,
6325 None,
6326 Some(client_id),
6327 Some(*DYDX_VENUE),
6328 UUID4::new(),
6329 get_atomic_clock_realtime().get_time_ns(),
6330 None,
6331 );
6332
6333 assert!(client.request_instruments(&request).is_ok());
6334
6335 let timeout = tokio::time::Duration::from_secs(2);
6336 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6337 tokio::time::timeout(timeout, rx.recv()).await
6338 {
6339 assert!(
6341 resp.data.is_empty() || !resp.data.is_empty(),
6342 "data should be Vec<InstrumentAny>"
6343 );
6344 }
6345 }
6346
6347 #[tokio::test]
6348 async fn test_instruments_response_includes_correlation_id() {
6349 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6351 set_data_event_sender(sender);
6352
6353 let client_id = ClientId::from("DYDX-CORR-TEST");
6354 let config = DydxDataClientConfig::default();
6355
6356 let http_client = DydxHttpClient::new(
6357 config.base_url_http.clone(),
6358 config.http_timeout_secs,
6359 config.http_proxy_url.clone(),
6360 config.is_testnet,
6361 None,
6362 )
6363 .unwrap();
6364
6365 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6366
6367 let request_id = UUID4::new();
6368 let request = RequestInstruments::new(
6369 None,
6370 None,
6371 Some(client_id),
6372 Some(*DYDX_VENUE),
6373 request_id,
6374 get_atomic_clock_realtime().get_time_ns(),
6375 None,
6376 );
6377
6378 assert!(client.request_instruments(&request).is_ok());
6379
6380 let timeout = tokio::time::Duration::from_secs(2);
6381 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6382 tokio::time::timeout(timeout, rx.recv()).await
6383 {
6384 assert_eq!(
6386 resp.correlation_id, request_id,
6387 "correlation_id should match request_id"
6388 );
6389 }
6390 }
6391
6392 #[tokio::test]
6393 async fn test_instruments_response_includes_client_id() {
6394 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6396 set_data_event_sender(sender);
6397
6398 let client_id = ClientId::from("DYDX-CLIENT-TEST");
6399 let config = DydxDataClientConfig::default();
6400
6401 let http_client = DydxHttpClient::new(
6402 config.base_url_http.clone(),
6403 config.http_timeout_secs,
6404 config.http_proxy_url.clone(),
6405 config.is_testnet,
6406 None,
6407 )
6408 .unwrap();
6409
6410 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6411
6412 let request = RequestInstruments::new(
6413 None,
6414 None,
6415 Some(client_id),
6416 Some(*DYDX_VENUE),
6417 UUID4::new(),
6418 get_atomic_clock_realtime().get_time_ns(),
6419 None,
6420 );
6421
6422 assert!(client.request_instruments(&request).is_ok());
6423
6424 let timeout = tokio::time::Duration::from_secs(2);
6425 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6426 tokio::time::timeout(timeout, rx.recv()).await
6427 {
6428 assert_eq!(
6430 resp.client_id, client_id,
6431 "client_id should be included in response"
6432 );
6433 }
6434 }
6435
6436 #[tokio::test]
6437 async fn test_instruments_response_includes_timestamps() {
6438 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6440 set_data_event_sender(sender);
6441
6442 let client_id = ClientId::from("DYDX-TS-TEST");
6443 let config = DydxDataClientConfig::default();
6444
6445 let http_client = DydxHttpClient::new(
6446 config.base_url_http.clone(),
6447 config.http_timeout_secs,
6448 config.http_proxy_url.clone(),
6449 config.is_testnet,
6450 None,
6451 )
6452 .unwrap();
6453
6454 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6455
6456 let start = Some(Utc::now() - chrono::Duration::days(1));
6457 let end = Some(Utc::now());
6458 let ts_init = get_atomic_clock_realtime().get_time_ns();
6459
6460 let request = RequestInstruments::new(
6461 start,
6462 end,
6463 Some(client_id),
6464 Some(*DYDX_VENUE),
6465 UUID4::new(),
6466 ts_init,
6467 None,
6468 );
6469
6470 assert!(client.request_instruments(&request).is_ok());
6471
6472 let timeout = tokio::time::Duration::from_secs(2);
6473 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6474 tokio::time::timeout(timeout, rx.recv()).await
6475 {
6476 assert!(
6478 resp.start.is_some() || resp.start.is_none(),
6479 "start timestamp field exists"
6480 );
6481 assert!(
6482 resp.end.is_some() || resp.end.is_none(),
6483 "end timestamp field exists"
6484 );
6485 assert!(resp.ts_init > 0, "ts_init should be greater than 0");
6486 }
6487 }
6488
6489 #[tokio::test]
6490 async fn test_instruments_response_includes_params_when_provided() {
6491 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6493 set_data_event_sender(sender);
6494
6495 let client_id = ClientId::from("DYDX-PARAMS-TEST");
6496 let config = DydxDataClientConfig::default();
6497
6498 let http_client = DydxHttpClient::new(
6499 config.base_url_http.clone(),
6500 config.http_timeout_secs,
6501 config.http_proxy_url.clone(),
6502 config.is_testnet,
6503 None,
6504 )
6505 .unwrap();
6506
6507 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6508
6509 let request = RequestInstruments::new(
6512 None,
6513 None,
6514 Some(client_id),
6515 Some(*DYDX_VENUE),
6516 UUID4::new(),
6517 get_atomic_clock_realtime().get_time_ns(),
6518 None, );
6520
6521 assert!(client.request_instruments(&request).is_ok());
6522
6523 let timeout = tokio::time::Duration::from_secs(2);
6524 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6525 tokio::time::timeout(timeout, rx.recv()).await
6526 {
6527 let _params = resp.params;
6529 }
6530 }
6531
6532 #[tokio::test]
6533 async fn test_instruments_response_params_none_when_not_provided() {
6534 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6536 set_data_event_sender(sender);
6537
6538 let client_id = ClientId::from("DYDX-NO-PARAMS");
6539 let config = DydxDataClientConfig::default();
6540
6541 let http_client = DydxHttpClient::new(
6542 config.base_url_http.clone(),
6543 config.http_timeout_secs,
6544 config.http_proxy_url.clone(),
6545 config.is_testnet,
6546 None,
6547 )
6548 .unwrap();
6549
6550 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6551
6552 let request = RequestInstruments::new(
6553 None,
6554 None,
6555 Some(client_id),
6556 Some(*DYDX_VENUE),
6557 UUID4::new(),
6558 get_atomic_clock_realtime().get_time_ns(),
6559 None, );
6561
6562 assert!(client.request_instruments(&request).is_ok());
6563
6564 let timeout = tokio::time::Duration::from_secs(2);
6565 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6566 tokio::time::timeout(timeout, rx.recv()).await
6567 {
6568 assert!(
6570 resp.params.is_none(),
6571 "params should be None when not provided"
6572 );
6573 }
6574 }
6575
6576 #[tokio::test]
6577 async fn test_instruments_response_complete_structure() {
6578 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6580 set_data_event_sender(sender);
6581
6582 let client_id = ClientId::from("DYDX-FULL-TEST");
6583 let config = DydxDataClientConfig::default();
6584
6585 let http_client = DydxHttpClient::new(
6586 config.base_url_http.clone(),
6587 config.http_timeout_secs,
6588 config.http_proxy_url.clone(),
6589 config.is_testnet,
6590 None,
6591 )
6592 .unwrap();
6593
6594 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6595
6596 let request_id = UUID4::new();
6597 let start = Some(Utc::now() - chrono::Duration::hours(1));
6598 let end = Some(Utc::now());
6599 let ts_init = get_atomic_clock_realtime().get_time_ns();
6600
6601 let request = RequestInstruments::new(
6602 start,
6603 end,
6604 Some(client_id),
6605 Some(*DYDX_VENUE),
6606 request_id,
6607 ts_init,
6608 None,
6609 );
6610
6611 assert!(client.request_instruments(&request).is_ok());
6612
6613 let timeout = tokio::time::Duration::from_secs(2);
6614 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6615 tokio::time::timeout(timeout, rx.recv()).await
6616 {
6617 assert_eq!(resp.venue, *DYDX_VENUE, "venue should be DYDX");
6619 assert_eq!(
6620 resp.correlation_id, request_id,
6621 "correlation_id should match"
6622 );
6623 assert_eq!(resp.client_id, client_id, "client_id should match");
6624 assert!(resp.ts_init > 0, "ts_init should be set");
6625
6626 let _data: Vec<InstrumentAny> = resp.data;
6628
6629 let _start = resp.start;
6631 let _end = resp.end;
6632 let _params = resp.params;
6633 }
6634 }
6635
6636 #[tokio::test]
6637 async fn test_instrument_response_properly_boxed() {
6638 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6640 set_data_event_sender(sender);
6641
6642 let client_id = ClientId::from("DYDX-BOXED-TEST");
6643 let config = DydxDataClientConfig::default();
6644
6645 let http_client = DydxHttpClient::new(
6646 config.base_url_http.clone(),
6647 config.http_timeout_secs,
6648 config.http_proxy_url.clone(),
6649 config.is_testnet,
6650 None,
6651 )
6652 .unwrap();
6653
6654 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6655
6656 let instrument = create_test_instrument_any();
6657 let instrument_id = instrument.id();
6658 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6659 client.instruments.insert(symbol_key, instrument);
6660
6661 let request = RequestInstrument::new(
6662 instrument_id,
6663 None,
6664 None,
6665 Some(client_id),
6666 UUID4::new(),
6667 get_atomic_clock_realtime().get_time_ns(),
6668 None,
6669 );
6670
6671 assert!(client.request_instrument(&request).is_ok());
6672
6673 let timeout = tokio::time::Duration::from_secs(2);
6674 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(boxed_resp)))) =
6675 tokio::time::timeout(timeout, rx.recv()).await
6676 {
6677 let _response: Box<InstrumentResponse> = boxed_resp;
6679 }
6681 }
6682
6683 #[tokio::test]
6684 async fn test_instrument_response_contains_single_instrument() {
6685 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6687 set_data_event_sender(sender);
6688
6689 let client_id = ClientId::from("DYDX-SINGLE-TEST");
6690 let config = DydxDataClientConfig::default();
6691
6692 let http_client = DydxHttpClient::new(
6693 config.base_url_http.clone(),
6694 config.http_timeout_secs,
6695 config.http_proxy_url.clone(),
6696 config.is_testnet,
6697 None,
6698 )
6699 .unwrap();
6700
6701 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6702
6703 let instrument = create_test_instrument_any();
6704 let instrument_id = instrument.id();
6705 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6706 client.instruments.insert(symbol_key, instrument.clone());
6707
6708 let request = RequestInstrument::new(
6709 instrument_id,
6710 None,
6711 None,
6712 Some(client_id),
6713 UUID4::new(),
6714 get_atomic_clock_realtime().get_time_ns(),
6715 None,
6716 );
6717
6718 assert!(client.request_instrument(&request).is_ok());
6719
6720 let timeout = tokio::time::Duration::from_secs(2);
6721 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6722 tokio::time::timeout(timeout, rx.recv()).await
6723 {
6724 let _instrument: InstrumentAny = resp.data;
6726 }
6728 }
6729
6730 #[tokio::test]
6731 async fn test_instrument_response_has_correct_instrument_id() {
6732 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6734 set_data_event_sender(sender);
6735
6736 let client_id = ClientId::from("DYDX-ID-TEST");
6737 let config = DydxDataClientConfig::default();
6738
6739 let http_client = DydxHttpClient::new(
6740 config.base_url_http.clone(),
6741 config.http_timeout_secs,
6742 config.http_proxy_url.clone(),
6743 config.is_testnet,
6744 None,
6745 )
6746 .unwrap();
6747
6748 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6749
6750 let instrument = create_test_instrument_any();
6751 let instrument_id = instrument.id();
6752 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6753 client.instruments.insert(symbol_key, instrument);
6754
6755 let request = RequestInstrument::new(
6756 instrument_id,
6757 None,
6758 None,
6759 Some(client_id),
6760 UUID4::new(),
6761 get_atomic_clock_realtime().get_time_ns(),
6762 None,
6763 );
6764
6765 assert!(client.request_instrument(&request).is_ok());
6766
6767 let timeout = tokio::time::Duration::from_secs(2);
6768 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6769 tokio::time::timeout(timeout, rx.recv()).await
6770 {
6771 assert_eq!(
6773 resp.instrument_id, instrument_id,
6774 "instrument_id should match requested ID"
6775 );
6776 }
6777 }
6778
6779 #[tokio::test]
6780 async fn test_instrument_response_includes_metadata() {
6781 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6783 set_data_event_sender(sender);
6784
6785 let client_id = ClientId::from("DYDX-META-TEST");
6786 let config = DydxDataClientConfig::default();
6787
6788 let http_client = DydxHttpClient::new(
6789 config.base_url_http.clone(),
6790 config.http_timeout_secs,
6791 config.http_proxy_url.clone(),
6792 config.is_testnet,
6793 None,
6794 )
6795 .unwrap();
6796
6797 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6798
6799 let instrument = create_test_instrument_any();
6800 let instrument_id = instrument.id();
6801 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6802 client.instruments.insert(symbol_key, instrument);
6803
6804 let request_id = UUID4::new();
6805 let start = Some(Utc::now() - chrono::Duration::hours(1));
6806 let end = Some(Utc::now());
6807 let ts_init = get_atomic_clock_realtime().get_time_ns();
6808
6809 let request = RequestInstrument::new(
6810 instrument_id,
6811 start,
6812 end,
6813 Some(client_id),
6814 request_id,
6815 ts_init,
6816 None,
6817 );
6818
6819 assert!(client.request_instrument(&request).is_ok());
6820
6821 let timeout = tokio::time::Duration::from_secs(2);
6822 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6823 tokio::time::timeout(timeout, rx.recv()).await
6824 {
6825 assert_eq!(
6827 resp.correlation_id, request_id,
6828 "correlation_id should match"
6829 );
6830 assert_eq!(resp.client_id, client_id, "client_id should match");
6831 assert!(resp.ts_init > 0, "ts_init should be set");
6832
6833 let _start = resp.start;
6835 let _end = resp.end;
6836
6837 let _params = resp.params;
6839 }
6840 }
6841
6842 #[tokio::test]
6843 async fn test_instrument_response_matches_requested_instrument() {
6844 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6846 set_data_event_sender(sender);
6847
6848 let client_id = ClientId::from("DYDX-MATCH-TEST");
6849 let config = DydxDataClientConfig::default();
6850
6851 let http_client = DydxHttpClient::new(
6852 config.base_url_http.clone(),
6853 config.http_timeout_secs,
6854 config.http_proxy_url.clone(),
6855 config.is_testnet,
6856 None,
6857 )
6858 .unwrap();
6859
6860 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6861
6862 let instrument = create_test_instrument_any();
6863 let instrument_id = instrument.id();
6864 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6865 client.instruments.insert(symbol_key, instrument.clone());
6866
6867 let request = RequestInstrument::new(
6868 instrument_id,
6869 None,
6870 None,
6871 Some(client_id),
6872 UUID4::new(),
6873 get_atomic_clock_realtime().get_time_ns(),
6874 None,
6875 );
6876
6877 assert!(client.request_instrument(&request).is_ok());
6878
6879 let timeout = tokio::time::Duration::from_secs(2);
6880 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6881 tokio::time::timeout(timeout, rx.recv()).await
6882 {
6883 assert_eq!(
6885 resp.data.id(),
6886 instrument_id,
6887 "Returned instrument should match requested"
6888 );
6889 assert_eq!(
6890 resp.instrument_id, instrument_id,
6891 "instrument_id field should match"
6892 );
6893
6894 assert_eq!(
6896 resp.data.id(),
6897 resp.instrument_id,
6898 "data.id() should match instrument_id field"
6899 );
6900 }
6901 }
6902
6903 #[tokio::test]
6904 async fn test_instrument_response_complete_structure() {
6905 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6907 set_data_event_sender(sender);
6908
6909 let client_id = ClientId::from("DYDX-FULL-INST-TEST");
6910 let config = DydxDataClientConfig::default();
6911
6912 let http_client = DydxHttpClient::new(
6913 config.base_url_http.clone(),
6914 config.http_timeout_secs,
6915 config.http_proxy_url.clone(),
6916 config.is_testnet,
6917 None,
6918 )
6919 .unwrap();
6920
6921 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6922
6923 let instrument = create_test_instrument_any();
6924 let instrument_id = instrument.id();
6925 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6926 client.instruments.insert(symbol_key, instrument.clone());
6927
6928 let request_id = UUID4::new();
6929 let ts_init = get_atomic_clock_realtime().get_time_ns();
6930
6931 let request = RequestInstrument::new(
6932 instrument_id,
6933 None,
6934 None,
6935 Some(client_id),
6936 request_id,
6937 ts_init,
6938 None,
6939 );
6940
6941 assert!(client.request_instrument(&request).is_ok());
6942
6943 let timeout = tokio::time::Duration::from_secs(2);
6944 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6945 tokio::time::timeout(timeout, rx.recv()).await
6946 {
6947 let _boxed: Box<InstrumentResponse> = resp.clone();
6950
6951 assert_eq!(resp.correlation_id, request_id);
6953 assert_eq!(resp.client_id, client_id);
6954 assert_eq!(resp.instrument_id, instrument_id);
6955 assert!(resp.ts_init > 0);
6956
6957 let returned_instrument: InstrumentAny = resp.data;
6959 assert_eq!(returned_instrument.id(), instrument_id);
6960
6961 let _start = resp.start;
6963 let _end = resp.end;
6964 let _params = resp.params;
6965 }
6966 }
6967
6968 #[tokio::test]
6969 async fn test_trades_response_contains_vec_trade_tick() {
6970 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6972 set_data_event_sender(sender);
6973
6974 let created_at = Utc::now();
6975 let http_trades = vec![
6976 crate::http::models::Trade {
6977 id: "trade-1".to_string(),
6978 side: OrderSide::Buy,
6979 size: dec!(1.0),
6980 price: dec!(100.0),
6981 created_at,
6982 created_at_height: 100,
6983 trade_type: crate::common::enums::DydxTradeType::Limit,
6984 },
6985 crate::http::models::Trade {
6986 id: "trade-2".to_string(),
6987 side: OrderSide::Sell,
6988 size: dec!(2.0),
6989 price: dec!(101.0),
6990 created_at: created_at + chrono::Duration::seconds(1),
6991 created_at_height: 101,
6992 trade_type: crate::common::enums::DydxTradeType::Limit,
6993 },
6994 ];
6995
6996 let trades_response = crate::http::models::TradesResponse {
6997 trades: http_trades,
6998 };
6999
7000 let state = TradesTestState {
7001 response: Arc::new(trades_response),
7002 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7003 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7004 };
7005
7006 let addr = start_trades_test_server(state).await;
7007 let base_url = format!("http://{addr}");
7008
7009 let client_id = ClientId::from("DYDX-VEC-TEST");
7010 let config = DydxDataClientConfig {
7011 base_url_http: Some(base_url),
7012 is_testnet: true,
7013 ..Default::default()
7014 };
7015
7016 let http_client = DydxHttpClient::new(
7017 config.base_url_http.clone(),
7018 config.http_timeout_secs,
7019 config.http_proxy_url.clone(),
7020 config.is_testnet,
7021 None,
7022 )
7023 .unwrap();
7024
7025 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7026
7027 let instrument = create_test_instrument_any();
7028 let instrument_id = instrument.id();
7029 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7030 client.instruments.insert(symbol_key, instrument);
7031
7032 let request = RequestTrades::new(
7033 instrument_id,
7034 None,
7035 None,
7036 None,
7037 Some(client_id),
7038 UUID4::new(),
7039 get_atomic_clock_realtime().get_time_ns(),
7040 None,
7041 );
7042
7043 assert!(client.request_trades(&request).is_ok());
7044
7045 let timeout = tokio::time::Duration::from_millis(500);
7046 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7047 tokio::time::timeout(timeout, rx.recv()).await
7048 {
7049 let trade_ticks: Vec<TradeTick> = resp.data;
7051 assert_eq!(trade_ticks.len(), 2, "Should contain 2 TradeTick elements");
7052
7053 for tick in &trade_ticks {
7055 assert_eq!(tick.instrument_id, instrument_id);
7056 }
7057 }
7058 }
7059
7060 #[tokio::test]
7061 async fn test_trades_response_has_correct_instrument_id() {
7062 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7064 set_data_event_sender(sender);
7065
7066 let created_at = Utc::now();
7067 let http_trade = crate::http::models::Trade {
7068 id: "instrument-id-test".to_string(),
7069 side: OrderSide::Buy,
7070 size: dec!(1.0),
7071 price: dec!(100.0),
7072 created_at,
7073 created_at_height: 100,
7074 trade_type: crate::common::enums::DydxTradeType::Limit,
7075 };
7076
7077 let trades_response = crate::http::models::TradesResponse {
7078 trades: vec![http_trade],
7079 };
7080
7081 let state = TradesTestState {
7082 response: Arc::new(trades_response),
7083 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7084 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7085 };
7086
7087 let addr = start_trades_test_server(state).await;
7088 let base_url = format!("http://{addr}");
7089
7090 let client_id = ClientId::from("DYDX-INSTID-TEST");
7091 let config = DydxDataClientConfig {
7092 base_url_http: Some(base_url),
7093 is_testnet: true,
7094 ..Default::default()
7095 };
7096
7097 let http_client = DydxHttpClient::new(
7098 config.base_url_http.clone(),
7099 config.http_timeout_secs,
7100 config.http_proxy_url.clone(),
7101 config.is_testnet,
7102 None,
7103 )
7104 .unwrap();
7105
7106 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7107
7108 let instrument = create_test_instrument_any();
7109 let instrument_id = instrument.id();
7110 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7111 client.instruments.insert(symbol_key, instrument);
7112
7113 let request = RequestTrades::new(
7114 instrument_id,
7115 None,
7116 None,
7117 None,
7118 Some(client_id),
7119 UUID4::new(),
7120 get_atomic_clock_realtime().get_time_ns(),
7121 None,
7122 );
7123
7124 assert!(client.request_trades(&request).is_ok());
7125
7126 let timeout = tokio::time::Duration::from_millis(500);
7127 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7128 tokio::time::timeout(timeout, rx.recv()).await
7129 {
7130 assert_eq!(
7132 resp.instrument_id, instrument_id,
7133 "TradesResponse.instrument_id should match request"
7134 );
7135
7136 for tick in &resp.data {
7138 assert_eq!(
7139 tick.instrument_id, instrument_id,
7140 "Each TradeTick should have correct instrument_id"
7141 );
7142 }
7143 }
7144 }
7145
7146 #[tokio::test]
7147 async fn test_trades_response_properly_ordered_by_timestamp() {
7148 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7150 set_data_event_sender(sender);
7151
7152 let base_time = Utc::now();
7153 let http_trades = vec![
7154 crate::http::models::Trade {
7155 id: "trade-oldest".to_string(),
7156 side: OrderSide::Buy,
7157 size: dec!(1.0),
7158 price: dec!(100.0),
7159 created_at: base_time,
7160 created_at_height: 100,
7161 trade_type: crate::common::enums::DydxTradeType::Limit,
7162 },
7163 crate::http::models::Trade {
7164 id: "trade-middle".to_string(),
7165 side: OrderSide::Sell,
7166 size: dec!(2.0),
7167 price: dec!(101.0),
7168 created_at: base_time + chrono::Duration::seconds(1),
7169 created_at_height: 101,
7170 trade_type: crate::common::enums::DydxTradeType::Limit,
7171 },
7172 crate::http::models::Trade {
7173 id: "trade-newest".to_string(),
7174 side: OrderSide::Buy,
7175 size: dec!(3.0),
7176 price: dec!(102.0),
7177 created_at: base_time + chrono::Duration::seconds(2),
7178 created_at_height: 102,
7179 trade_type: crate::common::enums::DydxTradeType::Limit,
7180 },
7181 ];
7182
7183 let trades_response = crate::http::models::TradesResponse {
7184 trades: http_trades,
7185 };
7186
7187 let state = TradesTestState {
7188 response: Arc::new(trades_response),
7189 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7190 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7191 };
7192
7193 let addr = start_trades_test_server(state).await;
7194 let base_url = format!("http://{addr}");
7195
7196 let client_id = ClientId::from("DYDX-ORDER-TEST");
7197 let config = DydxDataClientConfig {
7198 base_url_http: Some(base_url),
7199 is_testnet: true,
7200 ..Default::default()
7201 };
7202
7203 let http_client = DydxHttpClient::new(
7204 config.base_url_http.clone(),
7205 config.http_timeout_secs,
7206 config.http_proxy_url.clone(),
7207 config.is_testnet,
7208 None,
7209 )
7210 .unwrap();
7211
7212 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7213
7214 let instrument = create_test_instrument_any();
7215 let instrument_id = instrument.id();
7216 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7217 client.instruments.insert(symbol_key, instrument);
7218
7219 let request = RequestTrades::new(
7220 instrument_id,
7221 None,
7222 None,
7223 None,
7224 Some(client_id),
7225 UUID4::new(),
7226 get_atomic_clock_realtime().get_time_ns(),
7227 None,
7228 );
7229
7230 assert!(client.request_trades(&request).is_ok());
7231
7232 let timeout = tokio::time::Duration::from_millis(500);
7233 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7234 tokio::time::timeout(timeout, rx.recv()).await
7235 {
7236 let trade_ticks = resp.data;
7238 assert_eq!(trade_ticks.len(), 3, "Should have 3 trades");
7239
7240 for i in 1..trade_ticks.len() {
7242 assert!(
7243 trade_ticks[i].ts_event >= trade_ticks[i - 1].ts_event,
7244 "Trades should be ordered by timestamp (ts_event) in ascending order"
7245 );
7246 }
7247
7248 assert!(
7250 trade_ticks[0].ts_event < trade_ticks[1].ts_event,
7251 "First trade should be before second"
7252 );
7253 assert!(
7254 trade_ticks[1].ts_event < trade_ticks[2].ts_event,
7255 "Second trade should be before third"
7256 );
7257 }
7258 }
7259
7260 #[tokio::test]
7261 async fn test_trades_response_all_trade_tick_fields_populated() {
7262 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7264 set_data_event_sender(sender);
7265
7266 let created_at = Utc::now();
7267 let http_trade = crate::http::models::Trade {
7268 id: "field-test".to_string(),
7269 side: OrderSide::Buy,
7270 size: dec!(5.5),
7271 price: dec!(12345.67),
7272 created_at,
7273 created_at_height: 999,
7274 trade_type: crate::common::enums::DydxTradeType::Limit,
7275 };
7276
7277 let trades_response = crate::http::models::TradesResponse {
7278 trades: vec![http_trade],
7279 };
7280
7281 let state = TradesTestState {
7282 response: Arc::new(trades_response),
7283 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7284 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7285 };
7286
7287 let addr = start_trades_test_server(state).await;
7288 let base_url = format!("http://{addr}");
7289
7290 let client_id = ClientId::from("DYDX-FIELDS-TEST");
7291 let config = DydxDataClientConfig {
7292 base_url_http: Some(base_url),
7293 is_testnet: true,
7294 ..Default::default()
7295 };
7296
7297 let http_client = DydxHttpClient::new(
7298 config.base_url_http.clone(),
7299 config.http_timeout_secs,
7300 config.http_proxy_url.clone(),
7301 config.is_testnet,
7302 None,
7303 )
7304 .unwrap();
7305
7306 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7307
7308 let instrument = create_test_instrument_any();
7309 let instrument_id = instrument.id();
7310 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7311 client.instruments.insert(symbol_key, instrument);
7312
7313 let request = RequestTrades::new(
7314 instrument_id,
7315 None,
7316 None,
7317 None,
7318 Some(client_id),
7319 UUID4::new(),
7320 get_atomic_clock_realtime().get_time_ns(),
7321 None,
7322 );
7323
7324 assert!(client.request_trades(&request).is_ok());
7325
7326 let timeout = tokio::time::Duration::from_millis(500);
7327 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7328 tokio::time::timeout(timeout, rx.recv()).await
7329 {
7330 assert_eq!(resp.data.len(), 1, "Should have 1 trade");
7331 let tick = &resp.data[0];
7332
7333 assert_eq!(
7335 tick.instrument_id, instrument_id,
7336 "instrument_id should be set"
7337 );
7338 assert!(tick.price.as_f64() > 0.0, "price should be positive");
7339 assert!(tick.size.as_f64() > 0.0, "size should be positive");
7340
7341 match tick.aggressor_side {
7343 AggressorSide::Buyer | AggressorSide::Seller => {
7344 }
7346 AggressorSide::NoAggressor => {
7347 panic!("aggressor_side should be Buyer or Seller, not NoAggressor")
7348 }
7349 }
7350
7351 assert!(
7353 !tick.trade_id.to_string().is_empty(),
7354 "trade_id should be set"
7355 );
7356
7357 assert!(tick.ts_event > 0, "ts_event should be set");
7359 assert!(tick.ts_init > 0, "ts_init should be set");
7360 assert!(
7361 tick.ts_init >= tick.ts_event,
7362 "ts_init should be >= ts_event"
7363 );
7364 }
7365 }
7366
7367 #[tokio::test]
7368 async fn test_trades_response_includes_metadata() {
7369 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7371 set_data_event_sender(sender);
7372
7373 let created_at = Utc::now();
7374 let http_trade = crate::http::models::Trade {
7375 id: "metadata-test".to_string(),
7376 side: OrderSide::Buy,
7377 size: dec!(1.0),
7378 price: dec!(100.0),
7379 created_at,
7380 created_at_height: 100,
7381 trade_type: crate::common::enums::DydxTradeType::Limit,
7382 };
7383
7384 let trades_response = crate::http::models::TradesResponse {
7385 trades: vec![http_trade],
7386 };
7387
7388 let state = TradesTestState {
7389 response: Arc::new(trades_response),
7390 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7391 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7392 };
7393
7394 let addr = start_trades_test_server(state).await;
7395 let base_url = format!("http://{addr}");
7396
7397 let client_id = ClientId::from("DYDX-META-TEST");
7398 let config = DydxDataClientConfig {
7399 base_url_http: Some(base_url),
7400 is_testnet: true,
7401 ..Default::default()
7402 };
7403
7404 let http_client = DydxHttpClient::new(
7405 config.base_url_http.clone(),
7406 config.http_timeout_secs,
7407 config.http_proxy_url.clone(),
7408 config.is_testnet,
7409 None,
7410 )
7411 .unwrap();
7412
7413 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7414
7415 let instrument = create_test_instrument_any();
7416 let instrument_id = instrument.id();
7417 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7418 client.instruments.insert(symbol_key, instrument);
7419
7420 let request_id = UUID4::new();
7421 let request = RequestTrades::new(
7422 instrument_id,
7423 None,
7424 None,
7425 None,
7426 Some(client_id),
7427 request_id,
7428 get_atomic_clock_realtime().get_time_ns(),
7429 None,
7430 );
7431
7432 assert!(client.request_trades(&request).is_ok());
7433
7434 let timeout = tokio::time::Duration::from_millis(500);
7435 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7436 tokio::time::timeout(timeout, rx.recv()).await
7437 {
7438 assert_eq!(
7440 resp.correlation_id, request_id,
7441 "correlation_id should match request"
7442 );
7443 assert_eq!(resp.client_id, client_id, "client_id should be set");
7444 assert_eq!(
7445 resp.instrument_id, instrument_id,
7446 "instrument_id should be set"
7447 );
7448 assert!(resp.ts_init > 0, "ts_init should be set");
7449
7450 let _start = resp.start;
7451 let _end = resp.end;
7452 let _params = resp.params;
7453 }
7454 }
7455
7456 #[tokio::test]
7457 async fn test_orderbook_cache_growth_with_many_instruments() {
7458 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7459 set_data_event_sender(sender);
7460
7461 let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7462 let config = DydxDataClientConfig {
7463 base_url_http: Some(base_url),
7464 is_testnet: true,
7465 ..Default::default()
7466 };
7467
7468 let http_client = DydxHttpClient::new(
7469 config.base_url_http.clone(),
7470 config.http_timeout_secs,
7471 config.http_proxy_url.clone(),
7472 config.is_testnet,
7473 None,
7474 )
7475 .unwrap();
7476
7477 let client =
7478 DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7479
7480 let initial_capacity = client.order_books.capacity();
7481
7482 for i in 0..100 {
7483 let symbol = format!("INSTRUMENT-{i}");
7484 let instrument_id = InstrumentId::from(format!("{symbol}-PERP.DYDX").as_str());
7485 client.order_books.insert(
7486 instrument_id,
7487 OrderBook::new(instrument_id, BookType::L2_MBP),
7488 );
7489 }
7490
7491 assert_eq!(client.order_books.len(), 100);
7492 assert!(client.order_books.capacity() >= initial_capacity);
7493
7494 client.order_books.clear();
7495 assert_eq!(client.order_books.len(), 0);
7496 }
7497
7498 #[rstest]
7499 fn test_instrument_id_validation_rejects_invalid_formats() {
7500 let test_cases = vec![
7502 ("", "Empty string missing separator"),
7503 ("INVALID", "No venue separator"),
7504 ("NO-VENUE", "No venue separator"),
7505 (".DYDX", "Empty symbol"),
7506 ("SYMBOL.", "Empty venue"),
7507 ];
7508
7509 for (invalid_id, description) in test_cases {
7510 let result = std::panic::catch_unwind(|| InstrumentId::from(invalid_id));
7511 assert!(
7512 result.is_err(),
7513 "Expected {invalid_id} to panic: {description}"
7514 );
7515 }
7516 }
7517
7518 #[rstest]
7519 fn test_instrument_id_validation_accepts_valid_formats() {
7520 let valid_ids = vec![
7521 "BTC-USD-PERP.DYDX",
7522 "ETH-USD-PERP.DYDX",
7523 "SOL-USD.DYDX",
7524 "AVAX-USD-PERP.DYDX",
7525 ];
7526
7527 for valid_id in valid_ids {
7528 let instrument_id = InstrumentId::from(valid_id);
7529 assert!(
7530 !instrument_id.symbol.as_str().is_empty()
7531 && !instrument_id.venue.as_str().is_empty(),
7532 "Expected {valid_id} to have non-empty symbol and venue"
7533 );
7534 }
7535 }
7536
7537 #[tokio::test]
7538 async fn test_request_bars_with_inverted_date_range() {
7539 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7540 set_data_event_sender(sender);
7541
7542 let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7543 let config = DydxDataClientConfig {
7544 base_url_http: Some(base_url),
7545 is_testnet: true,
7546 ..Default::default()
7547 };
7548
7549 let http_client = DydxHttpClient::new(
7550 config.base_url_http.clone(),
7551 config.http_timeout_secs,
7552 config.http_proxy_url.clone(),
7553 config.is_testnet,
7554 None,
7555 )
7556 .unwrap();
7557
7558 let client =
7559 DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7560
7561 let instrument = create_test_instrument_any();
7562 let instrument_id = instrument.id();
7563 client
7564 .instruments
7565 .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7566
7567 let spec = BarSpecification {
7568 step: std::num::NonZeroUsize::new(1).unwrap(),
7569 aggregation: BarAggregation::Minute,
7570 price_type: PriceType::Last,
7571 };
7572 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
7573
7574 let now = Utc::now();
7575 let start = Some(now);
7576 let end = Some(now - chrono::Duration::hours(1));
7577
7578 let request = RequestBars::new(
7579 bar_type,
7580 start,
7581 end,
7582 None,
7583 Some(ClientId::from("dydx_test")),
7584 UUID4::new(),
7585 get_atomic_clock_realtime().get_time_ns(),
7586 None,
7587 );
7588
7589 let result = client.request_bars(&request);
7590 assert!(result.is_ok());
7591 }
7592
7593 #[tokio::test]
7594 async fn test_request_bars_with_zero_limit() {
7595 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7596 set_data_event_sender(sender);
7597
7598 let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7599 let config = DydxDataClientConfig {
7600 base_url_http: Some(base_url),
7601 is_testnet: true,
7602 ..Default::default()
7603 };
7604
7605 let http_client = DydxHttpClient::new(
7606 config.base_url_http.clone(),
7607 config.http_timeout_secs,
7608 config.http_proxy_url.clone(),
7609 config.is_testnet,
7610 None,
7611 )
7612 .unwrap();
7613
7614 let client =
7615 DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7616
7617 let instrument = create_test_instrument_any();
7618 let instrument_id = instrument.id();
7619 client
7620 .instruments
7621 .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7622
7623 let spec = BarSpecification {
7624 step: std::num::NonZeroUsize::new(1).unwrap(),
7625 aggregation: BarAggregation::Minute,
7626 price_type: PriceType::Last,
7627 };
7628 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
7629
7630 let request = RequestBars::new(
7631 bar_type,
7632 None,
7633 None,
7634 Some(std::num::NonZeroUsize::new(1).unwrap()),
7635 Some(ClientId::from("dydx_test")),
7636 UUID4::new(),
7637 get_atomic_clock_realtime().get_time_ns(),
7638 None,
7639 );
7640
7641 let result = client.request_bars(&request);
7642 assert!(result.is_ok());
7643 }
7644
7645 #[tokio::test]
7646 async fn test_request_trades_with_excessive_limit() {
7647 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7648 set_data_event_sender(sender);
7649
7650 let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7651 let config = DydxDataClientConfig {
7652 base_url_http: Some(base_url),
7653 is_testnet: true,
7654 ..Default::default()
7655 };
7656
7657 let http_client = DydxHttpClient::new(
7658 config.base_url_http.clone(),
7659 config.http_timeout_secs,
7660 config.http_proxy_url.clone(),
7661 config.is_testnet,
7662 None,
7663 )
7664 .unwrap();
7665
7666 let client =
7667 DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7668
7669 let instrument = create_test_instrument_any();
7670 let instrument_id = instrument.id();
7671 client
7672 .instruments
7673 .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7674
7675 let request = RequestTrades::new(
7676 instrument_id,
7677 None,
7678 None,
7679 Some(std::num::NonZeroUsize::new(100_000).unwrap()),
7680 Some(ClientId::from("dydx_test")),
7681 UUID4::new(),
7682 get_atomic_clock_realtime().get_time_ns(),
7683 None,
7684 );
7685
7686 let result = client.request_trades(&request);
7687 assert!(result.is_ok());
7688 }
7689
7690 #[rstest]
7691 fn test_candle_topic_format() {
7692 let instrument_id = InstrumentId::new(Symbol::from("BTC-USD-PERP"), Venue::from("DYDX"));
7693 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
7694 let resolution = "1MIN";
7695 let topic = format!("{ticker}/{resolution}");
7696
7697 assert_eq!(topic, "BTC-USD/1MIN");
7698 assert!(!topic.contains("-PERP"));
7699 assert!(!topic.contains(".DYDX"));
7700 }
7701}