1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use anyhow::Context;
24use chrono::{DateTime, Utc};
25use dashmap::DashMap;
26use nautilus_common::{
27 live::runner::get_data_event_sender,
28 messages::{
29 DataEvent, DataResponse,
30 data::{
31 BarsResponse, InstrumentResponse, InstrumentsResponse, RequestBars, RequestInstrument,
32 RequestInstruments, RequestTrades, SubscribeBars, SubscribeBookDeltas,
33 SubscribeBookSnapshots, SubscribeInstrument, SubscribeInstruments, SubscribeQuotes,
34 SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
35 UnsubscribeBookSnapshots, UnsubscribeInstrument, UnsubscribeInstruments,
36 UnsubscribeQuotes, UnsubscribeTrades,
37 },
38 },
39};
40use nautilus_core::{
41 UnixNanos,
42 time::{AtomicTime, get_atomic_clock_realtime},
43};
44use nautilus_data::client::DataClient;
45use nautilus_model::{
46 data::{
47 Bar, BarSpecification, BarType, BookOrder, Data as NautilusData, IndexPriceUpdate,
48 OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API, QuoteTick,
49 },
50 enums::{BarAggregation, BookAction, BookType, OrderSide, RecordFlag},
51 identifiers::{ClientId, InstrumentId, Venue},
52 instruments::{Instrument, InstrumentAny},
53 orderbook::OrderBook,
54 types::{Price, Quantity, price::PriceRaw},
55};
56use tokio::{task::JoinHandle, time::Duration};
57use tokio_util::sync::CancellationToken;
58use ustr::Ustr;
59
60use crate::{
61 common::{consts::DYDX_VENUE, parse::extract_raw_symbol},
62 config::DydxDataClientConfig,
63 http::client::DydxHttpClient,
64 types::DydxOraclePrice,
65 websocket::client::DydxWebSocketClient,
66};
67
68struct WsMessageContext<'a> {
70 data_sender: &'a tokio::sync::mpsc::UnboundedSender<DataEvent>,
71 instruments: &'a Arc<DashMap<Ustr, InstrumentAny>>,
72 order_books: &'a Arc<DashMap<InstrumentId, OrderBook>>,
73 last_quotes: &'a Arc<DashMap<InstrumentId, QuoteTick>>,
74 ws_client: &'a Option<DydxWebSocketClient>,
75 active_orderbook_subs: &'a Arc<DashMap<InstrumentId, ()>>,
76 active_trade_subs: &'a Arc<DashMap<InstrumentId, ()>>,
77 active_bar_subs: &'a Arc<DashMap<(InstrumentId, String), BarType>>,
78 incomplete_bars: &'a Arc<DashMap<BarType, Bar>>,
79}
80
81#[derive(Debug)]
89pub struct DydxDataClient {
90 client_id: ClientId,
92 config: DydxDataClientConfig,
94 http_client: DydxHttpClient,
96 ws_client: Option<DydxWebSocketClient>,
98 is_connected: AtomicBool,
100 cancellation_token: CancellationToken,
102 tasks: Vec<JoinHandle<()>>,
104 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
106 instruments: Arc<DashMap<Ustr, InstrumentAny>>,
108 clock: &'static AtomicTime,
110 order_books: Arc<DashMap<InstrumentId, OrderBook>>,
112 last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
114 incomplete_bars: Arc<DashMap<BarType, Bar>>,
118 bar_type_mappings: Arc<DashMap<String, BarType>>,
122 active_orderbook_subs: Arc<DashMap<InstrumentId, ()>>,
124 active_trade_subs: Arc<DashMap<InstrumentId, ()>>,
126 active_bar_subs: Arc<DashMap<(InstrumentId, String), BarType>>,
128}
129
130impl DydxDataClient {
131 fn map_bar_spec_to_resolution(spec: &BarSpecification) -> anyhow::Result<&'static str> {
137 match spec.step.get() {
138 1 => match spec.aggregation {
139 BarAggregation::Minute => Ok("1MIN"),
140 BarAggregation::Hour => Ok("1HOUR"),
141 BarAggregation::Day => Ok("1DAY"),
142 _ => anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation),
143 },
144 5 if spec.aggregation == BarAggregation::Minute => Ok("5MINS"),
145 15 if spec.aggregation == BarAggregation::Minute => Ok("15MINS"),
146 30 if spec.aggregation == BarAggregation::Minute => Ok("30MINS"),
147 4 if spec.aggregation == BarAggregation::Hour => Ok("4HOURS"),
148 step => anyhow::bail!(
149 "Unsupported bar step: {step} with aggregation {:?}",
150 spec.aggregation
151 ),
152 }
153 }
154
155 pub fn new(
161 client_id: ClientId,
162 config: DydxDataClientConfig,
163 http_client: DydxHttpClient,
164 ws_client: Option<DydxWebSocketClient>,
165 ) -> anyhow::Result<Self> {
166 let clock = get_atomic_clock_realtime();
167 let data_sender = get_data_event_sender();
168
169 let instruments_cache = http_client.instruments().clone();
171
172 Ok(Self {
173 client_id,
174 config,
175 http_client,
176 ws_client,
177 is_connected: AtomicBool::new(false),
178 cancellation_token: CancellationToken::new(),
179 tasks: Vec::new(),
180 data_sender,
181 instruments: instruments_cache,
182 clock,
183 order_books: Arc::new(DashMap::new()),
184 last_quotes: Arc::new(DashMap::new()),
185 incomplete_bars: Arc::new(DashMap::new()),
186 bar_type_mappings: Arc::new(DashMap::new()),
187 active_orderbook_subs: Arc::new(DashMap::new()),
188 active_trade_subs: Arc::new(DashMap::new()),
189 active_bar_subs: Arc::new(DashMap::new()),
190 })
191 }
192
193 #[must_use]
195 pub fn venue(&self) -> Venue {
196 *DYDX_VENUE
197 }
198
199 fn ws_client(&self) -> anyhow::Result<&DydxWebSocketClient> {
200 self.ws_client
201 .as_ref()
202 .context("websocket client not initialized; call connect first")
203 }
204
205 fn ws_client_mut(&mut self) -> anyhow::Result<&mut DydxWebSocketClient> {
207 self.ws_client
208 .as_mut()
209 .context("websocket client not initialized; call connect first")
210 }
211
212 #[must_use]
214 pub fn is_connected(&self) -> bool {
215 self.is_connected.load(Ordering::Relaxed)
216 }
217
218 fn spawn_ws<F>(&self, fut: F, context: &'static str)
222 where
223 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
224 {
225 tokio::spawn(async move {
226 if let Err(e) = fut.await {
227 tracing::error!("{context}: {e:?}");
228 }
229 });
230 }
231
232 async fn bootstrap_instruments(&mut self) -> anyhow::Result<Vec<InstrumentAny>> {
247 tracing::info!("Bootstrapping dYdX instruments");
248
249 let instruments = self
252 .http_client
253 .request_instruments(None, None, None)
254 .await
255 .context("failed to load instruments from dYdX")?;
256
257 if instruments.is_empty() {
258 tracing::warn!("No dYdX instruments were loaded");
259 return Ok(instruments);
260 }
261
262 tracing::info!("Loaded {} dYdX instruments", instruments.len());
263
264 self.http_client.cache_instruments(instruments.clone());
266
267 if let Some(ref ws) = self.ws_client {
269 ws.cache_instruments(instruments.clone());
270 }
271
272 Ok(instruments)
273 }
274}
275
276#[async_trait::async_trait(?Send)]
278impl DataClient for DydxDataClient {
279 fn client_id(&self) -> ClientId {
280 self.client_id
281 }
282
283 fn venue(&self) -> Option<Venue> {
284 Some(*DYDX_VENUE)
285 }
286
287 fn start(&mut self) -> anyhow::Result<()> {
288 tracing::info!(
289 client_id = %self.client_id,
290 is_testnet = self.http_client.is_testnet(),
291 "Starting dYdX data client"
292 );
293 Ok(())
294 }
295
296 fn stop(&mut self) -> anyhow::Result<()> {
297 tracing::info!("Stopping dYdX data client {}", self.client_id);
298 self.cancellation_token.cancel();
299 self.is_connected.store(false, Ordering::Relaxed);
300 Ok(())
301 }
302
303 fn reset(&mut self) -> anyhow::Result<()> {
304 tracing::debug!("Resetting dYdX data client {}", self.client_id);
305 self.is_connected.store(false, Ordering::Relaxed);
306 self.cancellation_token = CancellationToken::new();
307 self.tasks.clear();
308 Ok(())
309 }
310
311 fn dispose(&mut self) -> anyhow::Result<()> {
312 tracing::debug!("Disposing dYdX data client {}", self.client_id);
313 self.stop()
314 }
315
316 async fn connect(&mut self) -> anyhow::Result<()> {
317 if self.is_connected() {
318 return Ok(());
319 }
320
321 tracing::info!("Connecting dYdX data client");
322
323 self.bootstrap_instruments().await?;
325
326 if self.ws_client.is_some() {
328 let ws = self.ws_client_mut()?;
329
330 ws.connect()
331 .await
332 .context("failed to connect dYdX websocket")?;
333
334 ws.subscribe_markets()
335 .await
336 .context("failed to subscribe to markets channel")?;
337
338 if let Some(rx) = ws.take_receiver() {
340 let data_tx = self.data_sender.clone();
341 let instruments = self.instruments.clone();
342 let order_books = self.order_books.clone();
343 let last_quotes = self.last_quotes.clone();
344 let ws_client = self.ws_client.clone();
345 let active_orderbook_subs = self.active_orderbook_subs.clone();
346 let active_trade_subs = self.active_trade_subs.clone();
347 let active_bar_subs = self.active_bar_subs.clone();
348 let incomplete_bars = self.incomplete_bars.clone();
349
350 let task = tokio::spawn(async move {
351 let mut rx = rx;
352 while let Some(msg) = rx.recv().await {
353 let ctx = WsMessageContext {
354 data_sender: &data_tx,
355 instruments: &instruments,
356 order_books: &order_books,
357 last_quotes: &last_quotes,
358 ws_client: &ws_client,
359 active_orderbook_subs: &active_orderbook_subs,
360 active_trade_subs: &active_trade_subs,
361 active_bar_subs: &active_bar_subs,
362 incomplete_bars: &incomplete_bars,
363 };
364 Self::handle_ws_message(msg, &ctx);
365 }
366 });
367 self.tasks.push(task);
368 } else {
369 tracing::warn!("No inbound WS receiver available after connect");
370 }
371 }
372
373 self.start_orderbook_refresh_task()?;
375
376 self.start_instrument_refresh_task()?;
378
379 self.is_connected.store(true, Ordering::Relaxed);
380 tracing::info!("Connected dYdX data client");
381
382 Ok(())
383 }
384
385 async fn disconnect(&mut self) -> anyhow::Result<()> {
386 if !self.is_connected() {
387 return Ok(());
388 }
389
390 tracing::info!("Disconnecting dYdX data client");
391
392 if let Some(ref mut ws) = self.ws_client {
394 ws.disconnect()
395 .await
396 .context("failed to disconnect dYdX websocket")?;
397 }
398
399 self.is_connected.store(false, Ordering::Relaxed);
400 tracing::info!("Disconnected dYdX data client");
401
402 Ok(())
403 }
404
405 fn is_connected(&self) -> bool {
406 self.is_connected.load(Ordering::Relaxed)
407 }
408
409 fn is_disconnected(&self) -> bool {
410 !self.is_connected()
411 }
412
413 fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
414 tracing::debug!("unsubscribe_instruments: dYdX markets channel is global; no-op");
418 Ok(())
419 }
420
421 fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
422 tracing::debug!("unsubscribe_instrument: dYdX markets channel is global; no-op");
425 Ok(())
426 }
427
428 fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
429 tracing::debug!("subscribe_instruments: dYdX auto-subscribes via markets channel");
432 Ok(())
433 }
434
435 fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
436 tracing::debug!("subscribe_instrument: dYdX auto-subscribes via markets channel");
439 Ok(())
440 }
441
442 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
443 let ws = self.ws_client()?.clone();
444 let instrument_id = cmd.instrument_id;
445
446 self.active_trade_subs.insert(instrument_id, ());
448
449 self.spawn_ws(
450 async move {
451 ws.subscribe_trades(instrument_id)
452 .await
453 .context("trade subscription")
454 },
455 "dYdX trade subscription",
456 );
457
458 Ok(())
459 }
460
461 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
462 if cmd.book_type != BookType::L2_MBP {
463 anyhow::bail!(
464 "dYdX only supports L2_MBP order book deltas, received {:?}",
465 cmd.book_type
466 );
467 }
468
469 self.ensure_order_book(cmd.instrument_id, BookType::L2_MBP);
471
472 self.active_orderbook_subs.insert(cmd.instrument_id, ());
474
475 let ws = self.ws_client()?.clone();
476 let instrument_id = cmd.instrument_id;
477
478 self.spawn_ws(
479 async move {
480 ws.subscribe_orderbook(instrument_id)
481 .await
482 .context("orderbook subscription")
483 },
484 "dYdX orderbook subscription",
485 );
486
487 Ok(())
488 }
489
490 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
491 if cmd.book_type != BookType::L2_MBP {
492 anyhow::bail!(
493 "dYdX only supports L2_MBP order book snapshots, received {:?}",
494 cmd.book_type
495 );
496 }
497
498 self.active_orderbook_subs.insert(cmd.instrument_id, ());
500
501 let ws = self.ws_client()?.clone();
502 let instrument_id = cmd.instrument_id;
503
504 tokio::spawn(async move {
505 if let Err(e) = ws.subscribe_orderbook(instrument_id).await {
506 tracing::error!(
507 "Failed to subscribe to orderbook snapshot for {instrument_id}: {e:?}"
508 );
509 }
510 });
511
512 Ok(())
513 }
514
515 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
516 tracing::debug!(
519 "subscribe_quotes for {}: delegating to subscribe_book_deltas (no native quotes channel)",
520 cmd.instrument_id
521 );
522
523 let book_cmd = SubscribeBookDeltas {
525 client_id: cmd.client_id,
526 venue: cmd.venue,
527 instrument_id: cmd.instrument_id,
528 book_type: BookType::L2_MBP,
529 depth: None,
530 managed: false,
531 params: None,
532 command_id: cmd.command_id,
533 ts_init: cmd.ts_init,
534 };
535
536 self.subscribe_book_deltas(&book_cmd)
537 }
538
539 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
540 let ws = self.ws_client()?.clone();
541 let instrument_id = cmd.bar_type.instrument_id();
542 let spec = cmd.bar_type.spec();
543
544 let resolution = Self::map_bar_spec_to_resolution(&spec)?;
546
547 let bar_type = cmd.bar_type;
549 self.active_bar_subs
550 .insert((instrument_id, resolution.to_string()), bar_type);
551
552 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
554 let topic = format!("{ticker}/{resolution}");
555 self.bar_type_mappings.insert(topic.clone(), bar_type);
556
557 self.spawn_ws(
558 async move {
559 if let Err(e) =
561 ws.send_command(crate::websocket::handler::HandlerCommand::RegisterBarType {
562 topic,
563 bar_type,
564 })
565 {
566 anyhow::bail!("Failed to register bar type: {e}");
567 }
568
569 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
571
572 ws.subscribe_candles(instrument_id, resolution)
573 .await
574 .context("candles subscription")
575 },
576 "dYdX candles subscription",
577 );
578
579 Ok(())
580 }
581
582 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
583 self.active_trade_subs.remove(&cmd.instrument_id);
585
586 let ws = self.ws_client()?.clone();
587 let instrument_id = cmd.instrument_id;
588
589 self.spawn_ws(
590 async move {
591 ws.unsubscribe_trades(instrument_id)
592 .await
593 .context("trade unsubscription")
594 },
595 "dYdX trade unsubscription",
596 );
597
598 Ok(())
599 }
600
601 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
602 self.active_orderbook_subs.remove(&cmd.instrument_id);
604
605 let ws = self.ws_client()?.clone();
606 let instrument_id = cmd.instrument_id;
607
608 self.spawn_ws(
609 async move {
610 ws.unsubscribe_orderbook(instrument_id)
611 .await
612 .context("orderbook unsubscription")
613 },
614 "dYdX orderbook unsubscription",
615 );
616
617 Ok(())
618 }
619
620 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
621 self.active_orderbook_subs.remove(&cmd.instrument_id);
625
626 let ws = self.ws_client()?.clone();
627 let instrument_id = cmd.instrument_id;
628
629 self.spawn_ws(
630 async move {
631 ws.unsubscribe_orderbook(instrument_id)
632 .await
633 .context("orderbook snapshot unsubscription")
634 },
635 "dYdX orderbook snapshot unsubscription",
636 );
637
638 Ok(())
639 }
640
641 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
642 tracing::debug!(
644 "unsubscribe_quotes for {}: delegating to unsubscribe_book_deltas (no native quotes channel)",
645 cmd.instrument_id
646 );
647
648 let book_cmd = UnsubscribeBookDeltas {
649 instrument_id: cmd.instrument_id,
650 client_id: cmd.client_id,
651 venue: cmd.venue,
652 command_id: cmd.command_id,
653 ts_init: cmd.ts_init,
654 params: cmd.params.clone(),
655 };
656
657 self.unsubscribe_book_deltas(&book_cmd)
658 }
659
660 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
661 let ws = self.ws_client()?.clone();
662 let instrument_id = cmd.bar_type.instrument_id();
663 let spec = cmd.bar_type.spec();
664
665 let resolution = match spec.step.get() {
667 1 => match spec.aggregation {
668 BarAggregation::Minute => "1MIN",
669 BarAggregation::Hour => "1HOUR",
670 BarAggregation::Day => "1DAY",
671 _ => {
672 anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation);
673 }
674 },
675 5 => {
676 if spec.aggregation == BarAggregation::Minute {
677 "5MINS"
678 } else {
679 anyhow::bail!("Unsupported 5-step aggregation: {:?}", spec.aggregation);
680 }
681 }
682 15 => {
683 if spec.aggregation == BarAggregation::Minute {
684 "15MINS"
685 } else {
686 anyhow::bail!("Unsupported 15-step aggregation: {:?}", spec.aggregation);
687 }
688 }
689 30 => {
690 if spec.aggregation == BarAggregation::Minute {
691 "30MINS"
692 } else {
693 anyhow::bail!("Unsupported 30-step aggregation: {:?}", spec.aggregation);
694 }
695 }
696 4 => {
697 if spec.aggregation == BarAggregation::Hour {
698 "4HOURS"
699 } else {
700 anyhow::bail!("Unsupported 4-step aggregation: {:?}", spec.aggregation);
701 }
702 }
703 step => {
704 anyhow::bail!("Unsupported bar step: {step}");
705 }
706 };
707
708 self.active_bar_subs
710 .remove(&(instrument_id, resolution.to_string()));
711
712 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
714 let topic = format!("{ticker}/{resolution}");
715 self.bar_type_mappings.remove(&topic);
716
717 if let Err(e) =
718 ws.send_command(crate::websocket::handler::HandlerCommand::UnregisterBarType { topic })
719 {
720 tracing::warn!("Failed to unregister bar type: {e}");
721 }
722
723 self.spawn_ws(
724 async move {
725 ws.unsubscribe_candles(instrument_id, resolution)
726 .await
727 .context("candles unsubscription")
728 },
729 "dYdX candles unsubscription",
730 );
731
732 Ok(())
733 }
734
735 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
736 let instruments_cache = self.instruments.clone();
737 let sender = self.data_sender.clone();
738 let http = self.http_client.clone();
739 let instrument_id = request.instrument_id;
740 let request_id = request.request_id;
741 let client_id = request.client_id.unwrap_or(self.client_id);
742 let start = request.start;
743 let end = request.end;
744 let params = request.params.clone();
745 let clock = self.clock;
746 let start_nanos = datetime_to_unix_nanos(start);
747 let end_nanos = datetime_to_unix_nanos(end);
748
749 tokio::spawn(async move {
750 let symbol = Ustr::from(instrument_id.symbol.as_str());
752 let instrument = if let Some(cached) = instruments_cache.get(&symbol) {
753 tracing::debug!("Found instrument {instrument_id} in cache");
754 Some(cached.clone())
755 } else {
756 tracing::debug!("Instrument {instrument_id} not in cache, fetching from API");
758 match http.request_instruments(None, None, None).await {
759 Ok(instruments) => {
760 for inst in &instruments {
762 upsert_instrument(&instruments_cache, inst.clone());
763 }
764 instruments.into_iter().find(|i| i.id() == instrument_id)
766 }
767 Err(e) => {
768 tracing::error!("Failed to fetch instruments from dYdX: {e:?}");
769 None
770 }
771 }
772 };
773
774 if let Some(inst) = instrument {
775 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
776 request_id,
777 client_id,
778 instrument_id,
779 inst,
780 start_nanos,
781 end_nanos,
782 clock.get_time_ns(),
783 params,
784 )));
785
786 if let Err(e) = sender.send(DataEvent::Response(response)) {
787 tracing::error!("Failed to send instrument response: {e}");
788 }
789 } else {
790 tracing::error!("Instrument {instrument_id} not found");
791 }
792 });
793
794 Ok(())
795 }
796
797 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
798 let http = self.http_client.clone();
799 let sender = self.data_sender.clone();
800 let instruments_cache = self.instruments.clone();
801 let request_id = request.request_id;
802 let client_id = request.client_id.unwrap_or(self.client_id);
803 let venue = self.venue();
804 let start = request.start;
805 let end = request.end;
806 let params = request.params.clone();
807 let clock = self.clock;
808 let start_nanos = datetime_to_unix_nanos(start);
809 let end_nanos = datetime_to_unix_nanos(end);
810
811 tokio::spawn(async move {
812 match http.request_instruments(None, None, None).await {
813 Ok(instruments) => {
814 tracing::info!("Fetched {} instruments from dYdX", instruments.len());
815
816 for instrument in &instruments {
818 upsert_instrument(&instruments_cache, instrument.clone());
819 }
820
821 let response = DataResponse::Instruments(InstrumentsResponse::new(
822 request_id,
823 client_id,
824 venue,
825 instruments,
826 start_nanos,
827 end_nanos,
828 clock.get_time_ns(),
829 params,
830 ));
831
832 if let Err(e) = sender.send(DataEvent::Response(response)) {
833 tracing::error!("Failed to send instruments response: {e}");
834 }
835 }
836 Err(e) => {
837 tracing::error!("Failed to fetch instruments from dYdX: {e:?}");
838
839 let response = DataResponse::Instruments(InstrumentsResponse::new(
841 request_id,
842 client_id,
843 venue,
844 Vec::new(),
845 start_nanos,
846 end_nanos,
847 clock.get_time_ns(),
848 params,
849 ));
850
851 if let Err(e) = sender.send(DataEvent::Response(response)) {
852 tracing::error!("Failed to send empty instruments response: {e}");
853 }
854 }
855 }
856 });
857
858 Ok(())
859 }
860
861 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
862 use nautilus_model::{
863 data::TradeTick,
864 enums::{AggressorSide, OrderSide},
865 identifiers::TradeId,
866 };
867
868 let http = self.http_client.clone();
869 let instruments = self.instruments.clone();
870 let sender = self.data_sender.clone();
871 let instrument_id = request.instrument_id;
872 let start = request.start;
873 let end = request.end;
874 let limit = request.limit.map(|n| n.get() as u32);
875 let request_id = request.request_id;
876 let client_id = request.client_id.unwrap_or(self.client_id);
877 let params = request.params.clone();
878 let clock = self.clock;
879 let start_nanos = datetime_to_unix_nanos(start);
880 let end_nanos = datetime_to_unix_nanos(end);
881
882 tokio::spawn(async move {
883 let ticker = instrument_id
887 .symbol
888 .as_str()
889 .trim_end_matches("-PERP")
890 .to_string();
891
892 let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
894 Some(inst) => inst.clone(),
895 None => {
896 tracing::error!(
897 "request_trades: instrument {} not found in cache; cannot convert trades",
898 instrument_id
899 );
900 let ts_now = clock.get_time_ns();
901 let response = DataResponse::Trades(TradesResponse::new(
902 request_id,
903 client_id,
904 instrument_id,
905 Vec::new(),
906 start_nanos,
907 end_nanos,
908 ts_now,
909 params,
910 ));
911 if let Err(e) = sender.send(DataEvent::Response(response)) {
912 tracing::error!("Failed to send empty trades response: {e}");
913 }
914 return;
915 }
916 };
917
918 let price_precision = instrument.price_precision();
919 let size_precision = instrument.size_precision();
920
921 match http
922 .inner
923 .get_trades(&ticker, limit)
924 .await
925 .context("failed to request trades from dYdX")
926 {
927 Ok(trades_response) => {
928 let mut ticks = Vec::new();
929
930 for trade in trades_response.trades {
931 let aggressor_side = match trade.side {
932 OrderSide::Buy => AggressorSide::Buyer,
933 OrderSide::Sell => AggressorSide::Seller,
934 _ => continue, };
936
937 let price = match Price::from_decimal_dp(trade.price, price_precision) {
938 Ok(p) => p,
939 Err(e) => {
940 tracing::warn!(
941 "request_trades: failed to convert price for trade {}: {e}",
942 trade.id
943 );
944 continue;
945 }
946 };
947
948 let size = match Quantity::from_decimal_dp(trade.size, size_precision) {
949 Ok(q) => q,
950 Err(e) => {
951 tracing::warn!(
952 "request_trades: failed to convert size for trade {}: {e}",
953 trade.id
954 );
955 continue;
956 }
957 };
958
959 let ts_event = match trade.created_at.timestamp_nanos_opt() {
960 Some(ns) if ns >= 0 => UnixNanos::from(ns as u64),
961 _ => {
962 tracing::warn!(
963 "request_trades: timestamp out of range for trade {}",
964 trade.id
965 );
966 continue;
967 }
968 };
969
970 if let Some(start_ts) = start_nanos
972 && ts_event < start_ts
973 {
974 continue;
975 }
976 if let Some(end_ts) = end_nanos
977 && ts_event > end_ts
978 {
979 continue;
980 }
981
982 let tick = TradeTick::new(
983 instrument_id,
984 price,
985 size,
986 aggressor_side,
987 TradeId::new(&trade.id),
988 ts_event,
989 clock.get_time_ns(),
990 );
991 ticks.push(tick);
992 }
993
994 let response = DataResponse::Trades(TradesResponse::new(
995 request_id,
996 client_id,
997 instrument_id,
998 ticks,
999 start_nanos,
1000 end_nanos,
1001 clock.get_time_ns(),
1002 params,
1003 ));
1004
1005 if let Err(e) = sender.send(DataEvent::Response(response)) {
1006 tracing::error!("Failed to send trades response: {e}");
1007 }
1008 }
1009 Err(e) => {
1010 tracing::error!("Trade request failed for {}: {e:?}", instrument_id);
1011
1012 let response = DataResponse::Trades(TradesResponse::new(
1013 request_id,
1014 client_id,
1015 instrument_id,
1016 Vec::new(),
1017 start_nanos,
1018 end_nanos,
1019 clock.get_time_ns(),
1020 params,
1021 ));
1022
1023 if let Err(e) = sender.send(DataEvent::Response(response)) {
1024 tracing::error!("Failed to send empty trades response: {e}");
1025 }
1026 }
1027 }
1028 });
1029
1030 Ok(())
1031 }
1032
1033 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
1034 use nautilus_model::enums::{AggregationSource, BarAggregation, PriceType};
1035
1036 const DYDX_MAX_BARS_PER_REQUEST: u32 = 1_000;
1037
1038 let bar_type = request.bar_type;
1039 let spec = bar_type.spec();
1040
1041 if bar_type.aggregation_source() != AggregationSource::External {
1043 anyhow::bail!(
1044 "dYdX only supports EXTERNAL aggregation, got {:?}",
1045 bar_type.aggregation_source()
1046 );
1047 }
1048
1049 if spec.price_type != PriceType::Last {
1050 anyhow::bail!(
1051 "dYdX only supports LAST price type, got {:?}",
1052 spec.price_type
1053 );
1054 }
1055
1056 let resolution = match spec.step.get() {
1058 1 => match spec.aggregation {
1059 BarAggregation::Minute => "1MIN",
1060 BarAggregation::Hour => "1HOUR",
1061 BarAggregation::Day => "1DAY",
1062 _ => {
1063 anyhow::bail!("Unsupported bar aggregation: {:?}", spec.aggregation);
1064 }
1065 },
1066 5 if spec.aggregation == BarAggregation::Minute => "5MINS",
1067 15 if spec.aggregation == BarAggregation::Minute => "15MINS",
1068 30 if spec.aggregation == BarAggregation::Minute => "30MINS",
1069 4 if spec.aggregation == BarAggregation::Hour => "4HOURS",
1070 step => {
1071 anyhow::bail!("Unsupported bar step: {step}");
1072 }
1073 };
1074
1075 let http = self.http_client.clone();
1076 let instruments = self.instruments.clone();
1077 let sender = self.data_sender.clone();
1078 let instrument_id = bar_type.instrument_id();
1079 let symbol = instrument_id
1081 .symbol
1082 .as_str()
1083 .trim_end_matches("-PERP")
1084 .to_string();
1085 let request_id = request.request_id;
1086 let client_id = request.client_id.unwrap_or(self.client_id);
1087 let params = request.params.clone();
1088 let clock = self.clock;
1089
1090 let start = request.start;
1091 let end = request.end;
1092 let overall_limit = request.limit.map(|n| n.get() as u32);
1093
1094 let start_nanos = datetime_to_unix_nanos(start);
1096 let end_nanos = datetime_to_unix_nanos(end);
1097
1098 let resolution_enum = match resolution {
1100 "1MIN" => crate::common::enums::DydxCandleResolution::OneMinute,
1101 "5MINS" => crate::common::enums::DydxCandleResolution::FiveMinutes,
1102 "15MINS" => crate::common::enums::DydxCandleResolution::FifteenMinutes,
1103 "30MINS" => crate::common::enums::DydxCandleResolution::ThirtyMinutes,
1104 "1HOUR" => crate::common::enums::DydxCandleResolution::OneHour,
1105 "4HOURS" => crate::common::enums::DydxCandleResolution::FourHours,
1106 "1DAY" => crate::common::enums::DydxCandleResolution::OneDay,
1107 _ => {
1108 anyhow::bail!("Unsupported resolution: {resolution}");
1109 }
1110 };
1111
1112 tokio::spawn(async move {
1113 let bar_secs: i64 = match spec.aggregation {
1115 BarAggregation::Minute => spec.step.get() as i64 * 60,
1116 BarAggregation::Hour => spec.step.get() as i64 * 3_600,
1117 BarAggregation::Day => spec.step.get() as i64 * 86_400,
1118 _ => {
1119 tracing::error!(
1120 "Unsupported aggregation for request_bars: {:?}",
1121 spec.aggregation
1122 );
1123 return;
1124 }
1125 };
1126
1127 let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
1129 Some(inst) => inst.clone(),
1130 None => {
1131 tracing::error!(
1132 "request_bars: instrument {} not found in cache; cannot convert candles",
1133 instrument_id
1134 );
1135 let ts_now = clock.get_time_ns();
1136 let response = DataResponse::Bars(BarsResponse::new(
1137 request_id,
1138 client_id,
1139 bar_type,
1140 Vec::new(),
1141 start_nanos,
1142 end_nanos,
1143 ts_now,
1144 params,
1145 ));
1146 if let Err(e) = sender.send(DataEvent::Response(response)) {
1147 tracing::error!("Failed to send empty bars response: {e}");
1148 }
1149 return;
1150 }
1151 };
1152
1153 let price_precision = instrument.price_precision();
1154 let size_precision = instrument.size_precision();
1155
1156 let mut all_bars: Vec<Bar> = Vec::new();
1157
1158 let (range_start, range_end) = match (start, end) {
1160 (Some(s), Some(e)) if e > s => (s, e),
1161 _ => {
1162 let limit = overall_limit.unwrap_or(DYDX_MAX_BARS_PER_REQUEST);
1163 match http
1164 .inner
1165 .get_candles(&symbol, resolution_enum, Some(limit), None, None)
1166 .await
1167 {
1168 Ok(candles_response) => {
1169 tracing::debug!(
1170 "request_bars fetched {} candles without explicit date range",
1171 candles_response.candles.len()
1172 );
1173
1174 for candle in &candles_response.candles {
1175 match Self::candle_to_bar(
1176 candle,
1177 bar_type,
1178 price_precision,
1179 size_precision,
1180 bar_secs,
1181 clock,
1182 ) {
1183 Ok(bar) => all_bars.push(bar),
1184 Err(e) => {
1185 tracing::warn!(
1186 "Failed to convert dYdX candle to bar for {}: {e}",
1187 instrument_id
1188 );
1189 }
1190 }
1191 }
1192
1193 let current_time_ns = clock.get_time_ns();
1194 all_bars.retain(|bar| bar.ts_event < current_time_ns);
1195
1196 let response = DataResponse::Bars(BarsResponse::new(
1197 request_id,
1198 client_id,
1199 bar_type,
1200 all_bars,
1201 start_nanos,
1202 end_nanos,
1203 current_time_ns,
1204 params,
1205 ));
1206
1207 if let Err(e) = sender.send(DataEvent::Response(response)) {
1208 tracing::error!("Failed to send bars response: {e}");
1209 }
1210 }
1211 Err(e) => {
1212 tracing::error!(
1213 "Failed to request candles for {symbol} without date range: {e:?}"
1214 );
1215 }
1216 }
1217 return;
1218 }
1219 };
1220
1221 let total_secs = (range_end - range_start).num_seconds().max(0);
1223 let expected_bars = (total_secs / bar_secs).max(1) as u64;
1224
1225 tracing::debug!(
1226 "request_bars range {:?} -> {:?}, expected_bars ~= {}",
1227 range_start,
1228 range_end,
1229 expected_bars
1230 );
1231
1232 let mut remaining = overall_limit.unwrap_or(u32::MAX);
1233
1234 let bars_per_call = DYDX_MAX_BARS_PER_REQUEST.min(remaining);
1236 let chunk_duration = chrono::Duration::seconds(bar_secs * bars_per_call as i64);
1237
1238 let mut chunk_start = range_start;
1239
1240 while chunk_start < range_end && remaining > 0 {
1241 let mut chunk_end = chunk_start + chunk_duration;
1242 if chunk_end > range_end {
1243 chunk_end = range_end;
1244 }
1245
1246 let per_call_limit = remaining.min(DYDX_MAX_BARS_PER_REQUEST);
1247
1248 tracing::debug!(
1249 "request_bars chunk: {} -> {}, limit={}",
1250 chunk_start,
1251 chunk_end,
1252 per_call_limit
1253 );
1254
1255 match http
1256 .inner
1257 .get_candles(
1258 &symbol,
1259 resolution_enum,
1260 Some(per_call_limit),
1261 Some(chunk_start),
1262 Some(chunk_end),
1263 )
1264 .await
1265 {
1266 Ok(candles_response) => {
1267 let count = candles_response.candles.len() as u32;
1268
1269 if count == 0 {
1270 break;
1272 }
1273
1274 for candle in &candles_response.candles {
1276 match Self::candle_to_bar(
1277 candle,
1278 bar_type,
1279 price_precision,
1280 size_precision,
1281 bar_secs,
1282 clock,
1283 ) {
1284 Ok(bar) => all_bars.push(bar),
1285 Err(e) => {
1286 tracing::warn!(
1287 "Failed to convert dYdX candle to bar for {}: {e}",
1288 instrument_id
1289 );
1290 }
1291 }
1292 }
1293
1294 if remaining <= count {
1295 break;
1296 } else {
1297 remaining -= count;
1298 }
1299 }
1300 Err(e) => {
1301 tracing::error!(
1302 "Failed to request candles for {symbol} in chunk {:?} -> {:?}: {e:?}",
1303 chunk_start,
1304 chunk_end
1305 );
1306 break;
1307 }
1308 }
1309
1310 chunk_start += chunk_duration;
1311 }
1312
1313 tracing::debug!("request_bars completed partitioned fetch for {}", bar_type);
1314
1315 let current_time_ns = clock.get_time_ns();
1317 all_bars.retain(|bar| bar.ts_event < current_time_ns);
1318
1319 tracing::debug!(
1320 "request_bars filtered to {} completed bars (current_time_ns={})",
1321 all_bars.len(),
1322 current_time_ns
1323 );
1324
1325 let response = DataResponse::Bars(BarsResponse::new(
1326 request_id,
1327 client_id,
1328 bar_type,
1329 all_bars,
1330 start_nanos,
1331 end_nanos,
1332 current_time_ns,
1333 params,
1334 ));
1335
1336 if let Err(e) = sender.send(DataEvent::Response(response)) {
1337 tracing::error!("Failed to send bars response: {e}");
1338 }
1339 });
1340
1341 Ok(())
1342 }
1343}
1344
1345fn upsert_instrument(cache: &Arc<DashMap<Ustr, InstrumentAny>>, instrument: InstrumentAny) {
1347 let symbol = Ustr::from(instrument.id().symbol.as_str());
1348 cache.insert(symbol, instrument);
1349}
1350
1351fn datetime_to_unix_nanos(value: Option<DateTime<Utc>>) -> Option<UnixNanos> {
1353 value
1354 .and_then(|dt| dt.timestamp_nanos_opt())
1355 .and_then(|nanos| u64::try_from(nanos).ok())
1356 .map(UnixNanos::from)
1357}
1358
1359impl DydxDataClient {
1360 pub fn start_instrument_refresh_task(&mut self) -> anyhow::Result<()> {
1369 let interval_secs = match self.config.instrument_refresh_interval_secs {
1370 Some(secs) if secs > 0 => secs,
1371 _ => {
1372 tracing::info!("Instrument refresh disabled (interval not configured)");
1373 return Ok(());
1374 }
1375 };
1376
1377 let interval = Duration::from_secs(interval_secs);
1378 let http_client = self.http_client.clone();
1379 let instruments_cache = self.instruments.clone();
1380 let cancellation_token = self.cancellation_token.clone();
1381
1382 tracing::info!(
1383 "Starting instrument refresh task (interval: {}s)",
1384 interval_secs
1385 );
1386
1387 let task = tokio::spawn(async move {
1388 let mut interval_timer = tokio::time::interval(interval);
1389 interval_timer.tick().await; loop {
1392 tokio::select! {
1393 _ = cancellation_token.cancelled() => {
1394 tracing::info!("Instrument refresh task cancelled");
1395 break;
1396 }
1397 _ = interval_timer.tick() => {
1398 tracing::debug!("Refreshing instruments");
1399
1400 match http_client.request_instruments(None, None, None).await {
1401 Ok(instruments) => {
1402 tracing::debug!("Refreshed {} instruments", instruments.len());
1403
1404 for instrument in instruments {
1406 upsert_instrument(&instruments_cache, instrument);
1407 }
1408
1409 let all_instruments: Vec<_> = instruments_cache
1411 .iter()
1412 .map(|entry| entry.value().clone())
1413 .collect();
1414 http_client.cache_instruments(all_instruments);
1415 }
1416 Err(e) => {
1417 tracing::error!("Failed to refresh instruments: {}", e);
1418 }
1419 }
1420 }
1421 }
1422 }
1423 });
1424
1425 self.tasks.push(task);
1426 Ok(())
1427 }
1428
1429 fn start_orderbook_refresh_task(&mut self) -> anyhow::Result<()> {
1439 let interval_secs = match self.config.orderbook_refresh_interval_secs {
1440 Some(secs) if secs > 0 => secs,
1441 _ => {
1442 tracing::info!("Orderbook snapshot refresh disabled (interval not configured)");
1443 return Ok(());
1444 }
1445 };
1446
1447 let interval = Duration::from_secs(interval_secs);
1448 let http_client = self.http_client.clone();
1449 let instruments = self.instruments.clone();
1450 let order_books = self.order_books.clone();
1451 let active_subs = self.active_orderbook_subs.clone();
1452 let cancellation_token = self.cancellation_token.clone();
1453 let data_sender = self.data_sender.clone();
1454
1455 tracing::info!(
1456 "Starting orderbook snapshot refresh task (interval: {}s)",
1457 interval_secs
1458 );
1459
1460 let task = tokio::spawn(async move {
1461 let mut interval_timer = tokio::time::interval(interval);
1462 interval_timer.tick().await; loop {
1465 tokio::select! {
1466 _ = cancellation_token.cancelled() => {
1467 tracing::info!("Orderbook refresh task cancelled");
1468 break;
1469 }
1470 _ = interval_timer.tick() => {
1471 let active_instruments: Vec<InstrumentId> = active_subs
1472 .iter()
1473 .map(|entry| *entry.key())
1474 .collect();
1475
1476 if active_instruments.is_empty() {
1477 tracing::debug!("No active orderbook subscriptions to refresh");
1478 continue;
1479 }
1480
1481 tracing::debug!(
1482 "Refreshing {} orderbook snapshots",
1483 active_instruments.len()
1484 );
1485
1486 for instrument_id in active_instruments {
1487 let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
1489 Some(inst) => inst.clone(),
1490 None => {
1491 tracing::warn!(
1492 "Cannot refresh orderbook: no instrument for {}",
1493 instrument_id
1494 );
1495 continue;
1496 }
1497 };
1498
1499 let symbol = instrument_id.symbol.as_str().trim_end_matches("-PERP");
1501 let snapshot_result = http_client.inner.get_orderbook(symbol).await;
1502
1503 let snapshot = match snapshot_result {
1504 Ok(s) => s,
1505 Err(e) => {
1506 tracing::error!(
1507 "Failed to fetch orderbook snapshot for {}: {}",
1508 instrument_id,
1509 e
1510 );
1511 continue;
1512 }
1513 };
1514
1515 let deltas_result = Self::parse_orderbook_snapshot(
1517 instrument_id,
1518 &snapshot,
1519 &instrument,
1520 );
1521
1522 let deltas = match deltas_result {
1523 Ok(d) => d,
1524 Err(e) => {
1525 tracing::error!(
1526 "Failed to parse orderbook snapshot for {}: {}",
1527 instrument_id,
1528 e
1529 );
1530 continue;
1531 }
1532 };
1533
1534 if let Some(mut book) = order_books.get_mut(&instrument_id) {
1536 if let Err(e) = book.apply_deltas(&deltas) {
1537 tracing::error!(
1538 "Failed to apply orderbook snapshot for {}: {}",
1539 instrument_id,
1540 e
1541 );
1542 continue;
1543 }
1544
1545 tracing::debug!(
1546 "Refreshed orderbook snapshot for {} (bid={:?}, ask={:?})",
1547 instrument_id,
1548 book.best_bid_price(),
1549 book.best_ask_price()
1550 );
1551 }
1552
1553 let data = NautilusData::from(OrderBookDeltas_API::new(deltas));
1555 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1556 tracing::error!("Failed to emit orderbook snapshot: {}", e);
1557 }
1558 }
1559 }
1560 }
1561 }
1562 });
1563
1564 self.tasks.push(task);
1565 Ok(())
1566 }
1567
1568 fn parse_orderbook_snapshot(
1572 instrument_id: InstrumentId,
1573 snapshot: &crate::http::models::OrderbookResponse,
1574 instrument: &InstrumentAny,
1575 ) -> anyhow::Result<OrderBookDeltas> {
1576 use nautilus_model::{
1577 data::{BookOrder, OrderBookDelta},
1578 enums::{BookAction, OrderSide, RecordFlag},
1579 instruments::Instrument,
1580 types::{Price, Quantity},
1581 };
1582
1583 let ts_init = get_atomic_clock_realtime().get_time_ns();
1584 let mut deltas = Vec::new();
1585
1586 deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init));
1588
1589 let price_precision = instrument.price_precision();
1590 let size_precision = instrument.size_precision();
1591
1592 let bids_len = snapshot.bids.len();
1593 let asks_len = snapshot.asks.len();
1594
1595 for (idx, bid) in snapshot.bids.iter().enumerate() {
1597 let is_last = idx == bids_len - 1 && asks_len == 0;
1598 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1599
1600 let price = Price::from_decimal_dp(bid.price, price_precision)
1601 .context("failed to parse bid price")?;
1602 let size = Quantity::from_decimal_dp(bid.size, size_precision)
1603 .context("failed to parse bid size")?;
1604
1605 let order = BookOrder::new(OrderSide::Buy, price, size, 0);
1606 deltas.push(OrderBookDelta::new(
1607 instrument_id,
1608 BookAction::Add,
1609 order,
1610 flags,
1611 0,
1612 ts_init,
1613 ts_init,
1614 ));
1615 }
1616
1617 for (idx, ask) in snapshot.asks.iter().enumerate() {
1619 let is_last = idx == asks_len - 1;
1620 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1621
1622 let price = Price::from_decimal_dp(ask.price, price_precision)
1623 .context("failed to parse ask price")?;
1624 let size = Quantity::from_decimal_dp(ask.size, size_precision)
1625 .context("failed to parse ask size")?;
1626
1627 let order = BookOrder::new(OrderSide::Sell, price, size, 0);
1628 deltas.push(OrderBookDelta::new(
1629 instrument_id,
1630 BookAction::Add,
1631 order,
1632 flags,
1633 0,
1634 ts_init,
1635 ts_init,
1636 ));
1637 }
1638
1639 Ok(OrderBookDeltas::new(instrument_id, deltas))
1640 }
1641
1642 #[must_use]
1644 pub fn get_instrument(&self, symbol: &str) -> Option<InstrumentAny> {
1645 self.instruments.get(&Ustr::from(symbol)).map(|i| i.clone())
1646 }
1647
1648 #[must_use]
1650 pub fn get_instruments(&self) -> Vec<InstrumentAny> {
1651 self.instruments.iter().map(|i| i.clone()).collect()
1652 }
1653
1654 fn ensure_order_book(&self, instrument_id: InstrumentId, book_type: BookType) {
1655 self.order_books
1656 .entry(instrument_id)
1657 .or_insert_with(|| OrderBook::new(instrument_id, book_type));
1658 }
1659
1660 #[must_use]
1662 pub fn get_bar_type_for_topic(&self, topic: &str) -> Option<BarType> {
1663 self.bar_type_mappings
1664 .get(topic)
1665 .map(|entry| *entry.value())
1666 }
1667
1668 #[must_use]
1670 pub fn get_bar_topics(&self) -> Vec<String> {
1671 self.bar_type_mappings
1672 .iter()
1673 .map(|entry| entry.key().clone())
1674 .collect()
1675 }
1676
1677 fn candle_to_bar(
1684 candle: &crate::http::models::Candle,
1685 bar_type: BarType,
1686 price_precision: u8,
1687 size_precision: u8,
1688 bar_secs: i64,
1689 clock: &AtomicTime,
1690 ) -> anyhow::Result<Bar> {
1691 use anyhow::Context;
1692
1693 let ts_init =
1695 datetime_to_unix_nanos(Some(candle.started_at)).unwrap_or_else(|| clock.get_time_ns());
1696
1697 let ts_event_ns = ts_init
1699 .as_u64()
1700 .saturating_add((bar_secs as u64).saturating_mul(1_000_000_000));
1701 let ts_event = UnixNanos::from(ts_event_ns);
1702
1703 let open = Price::from_decimal_dp(candle.open, price_precision)
1704 .context("failed to parse candle open price")?;
1705 let high = Price::from_decimal_dp(candle.high, price_precision)
1706 .context("failed to parse candle high price")?;
1707 let low = Price::from_decimal_dp(candle.low, price_precision)
1708 .context("failed to parse candle low price")?;
1709 let close = Price::from_decimal_dp(candle.close, price_precision)
1710 .context("failed to parse candle close price")?;
1711
1712 let volume = Quantity::from_decimal_dp(candle.base_token_volume, size_precision)
1714 .context("failed to parse candle base_token_volume")?;
1715
1716 Ok(Bar::new(
1717 bar_type, open, high, low, close, volume, ts_event, ts_init,
1718 ))
1719 }
1720
1721 fn handle_ws_message(
1722 message: crate::websocket::enums::NautilusWsMessage,
1723 ctx: &WsMessageContext,
1724 ) {
1725 match message {
1726 crate::websocket::enums::NautilusWsMessage::Data(payloads) => {
1727 Self::handle_data_message(payloads, ctx.data_sender, ctx.incomplete_bars);
1728 }
1729 crate::websocket::enums::NautilusWsMessage::Deltas(deltas) => {
1730 Self::handle_deltas_message(
1731 *deltas,
1732 ctx.data_sender,
1733 ctx.order_books,
1734 ctx.last_quotes,
1735 ctx.instruments,
1736 );
1737 }
1738 crate::websocket::enums::NautilusWsMessage::OraclePrices(oracle_prices) => {
1739 Self::handle_oracle_prices(oracle_prices, ctx.instruments, ctx.data_sender);
1740 }
1741 crate::websocket::enums::NautilusWsMessage::Error(err) => {
1742 tracing::error!("dYdX WS error: {err}");
1743 }
1744 crate::websocket::enums::NautilusWsMessage::Reconnected => {
1745 tracing::info!("dYdX WS reconnected - re-subscribing to active subscriptions");
1746
1747 if let Some(ws) = ctx.ws_client {
1749 let total_subs = ctx.active_orderbook_subs.len()
1750 + ctx.active_trade_subs.len()
1751 + ctx.active_bar_subs.len();
1752
1753 if total_subs == 0 {
1754 tracing::debug!("No active subscriptions to restore");
1755 return;
1756 }
1757
1758 tracing::info!(
1759 "Restoring {} subscriptions (orderbook={}, trades={}, bars={})",
1760 total_subs,
1761 ctx.active_orderbook_subs.len(),
1762 ctx.active_trade_subs.len(),
1763 ctx.active_bar_subs.len()
1764 );
1765
1766 for entry in ctx.active_orderbook_subs.iter() {
1768 let instrument_id = *entry.key();
1769 let ws_clone = ws.clone();
1770 tokio::spawn(async move {
1771 if let Err(e) = ws_clone.subscribe_orderbook(instrument_id).await {
1772 tracing::error!(
1773 "Failed to re-subscribe to orderbook for {instrument_id}: {e:?}"
1774 );
1775 } else {
1776 tracing::debug!("Re-subscribed to orderbook for {instrument_id}");
1777 }
1778 });
1779 }
1780
1781 for entry in ctx.active_trade_subs.iter() {
1783 let instrument_id = *entry.key();
1784 let ws_clone = ws.clone();
1785 tokio::spawn(async move {
1786 if let Err(e) = ws_clone.subscribe_trades(instrument_id).await {
1787 tracing::error!(
1788 "Failed to re-subscribe to trades for {instrument_id}: {e:?}"
1789 );
1790 } else {
1791 tracing::debug!("Re-subscribed to trades for {instrument_id}");
1792 }
1793 });
1794 }
1795
1796 for entry in ctx.active_bar_subs.iter() {
1798 let (instrument_id, resolution) = entry.key();
1799 let instrument_id = *instrument_id;
1800 let resolution = resolution.clone();
1801 let bar_type = *entry.value();
1802 let ws_clone = ws.clone();
1803
1804 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1806 let topic = format!("{ticker}/{resolution}");
1807 if let Err(e) = ws.send_command(
1808 crate::websocket::handler::HandlerCommand::RegisterBarType {
1809 topic,
1810 bar_type,
1811 },
1812 ) {
1813 tracing::warn!(
1814 "Failed to re-register bar type for {instrument_id} ({resolution}): {e}"
1815 );
1816 }
1817
1818 tokio::spawn(async move {
1819 if let Err(e) =
1820 ws_clone.subscribe_candles(instrument_id, &resolution).await
1821 {
1822 tracing::error!(
1823 "Failed to re-subscribe to candles for {instrument_id} ({resolution}): {e:?}"
1824 );
1825 } else {
1826 tracing::debug!(
1827 "Re-subscribed to candles for {instrument_id} ({resolution})"
1828 );
1829 }
1830 });
1831 }
1832
1833 tracing::info!("Completed re-subscription requests after reconnection");
1834 } else {
1835 tracing::warn!("WebSocket client not available for re-subscription");
1836 }
1837 }
1838 crate::websocket::enums::NautilusWsMessage::Order(_)
1839 | crate::websocket::enums::NautilusWsMessage::Fill(_)
1840 | crate::websocket::enums::NautilusWsMessage::Position(_)
1841 | crate::websocket::enums::NautilusWsMessage::AccountState(_)
1842 | crate::websocket::enums::NautilusWsMessage::SubaccountSubscribed(_)
1843 | crate::websocket::enums::NautilusWsMessage::SubaccountsChannelData(_) => {
1844 tracing::debug!(
1845 "Ignoring execution/subaccount message on dYdX data client (handled by execution adapter)"
1846 );
1847 }
1848 }
1849 }
1850
1851 fn handle_data_message(
1852 payloads: Vec<NautilusData>,
1853 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1854 incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1855 ) {
1856 for data in payloads {
1857 if let NautilusData::Bar(bar) = data {
1859 Self::handle_bar_message(bar, data_sender, incomplete_bars);
1860 } else if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1861 tracing::error!("Failed to emit data event: {e}");
1862 }
1863 }
1864 }
1865
1866 fn handle_bar_message(
1873 bar: Bar,
1874 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1875 incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1876 ) {
1877 let current_time_ns = get_atomic_clock_realtime().get_time_ns();
1878 let bar_type = bar.bar_type;
1879
1880 if bar.ts_event <= current_time_ns {
1881 incomplete_bars.remove(&bar_type);
1883 if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Bar(bar))) {
1884 tracing::error!("Failed to emit completed bar: {e}");
1885 }
1886 } else {
1887 tracing::trace!(
1889 "Caching incomplete bar for {} (ts_event={}, current={})",
1890 bar_type,
1891 bar.ts_event,
1892 current_time_ns
1893 );
1894 incomplete_bars.insert(bar_type, bar);
1895 }
1896 }
1897
1898 fn resolve_crossed_order_book(
1916 book: &mut OrderBook,
1917 venue_deltas: OrderBookDeltas,
1918 instrument: &InstrumentAny,
1919 ) -> anyhow::Result<OrderBookDeltas> {
1920 let instrument_id = venue_deltas.instrument_id;
1921 let ts_init = venue_deltas.ts_init;
1922 let mut all_deltas = venue_deltas.deltas.clone();
1923
1924 book.apply_deltas(&venue_deltas)?;
1926
1927 let mut is_crossed = if let (Some(bid_price), Some(ask_price)) =
1929 (book.best_bid_price(), book.best_ask_price())
1930 {
1931 bid_price >= ask_price
1932 } else {
1933 false
1934 };
1935
1936 while is_crossed {
1938 tracing::debug!(
1939 "Resolving crossed order book for {}: bid={:?} >= ask={:?}",
1940 instrument_id,
1941 book.best_bid_price(),
1942 book.best_ask_price()
1943 );
1944
1945 let bid_price = match book.best_bid_price() {
1946 Some(p) => p,
1947 None => break,
1948 };
1949 let ask_price = match book.best_ask_price() {
1950 Some(p) => p,
1951 None => break,
1952 };
1953 let bid_size = match book.best_bid_size() {
1954 Some(s) => s,
1955 None => break,
1956 };
1957 let ask_size = match book.best_ask_size() {
1958 Some(s) => s,
1959 None => break,
1960 };
1961
1962 let mut temp_deltas = Vec::new();
1963
1964 if bid_size > ask_size {
1965 let new_bid_size = Quantity::new(
1967 bid_size.as_f64() - ask_size.as_f64(),
1968 instrument.size_precision(),
1969 );
1970 temp_deltas.push(OrderBookDelta::new(
1971 instrument_id,
1972 BookAction::Update,
1973 BookOrder::new(OrderSide::Buy, bid_price, new_bid_size, 0),
1974 0,
1975 0,
1976 ts_init,
1977 ts_init,
1978 ));
1979 temp_deltas.push(OrderBookDelta::new(
1980 instrument_id,
1981 BookAction::Delete,
1982 BookOrder::new(
1983 OrderSide::Sell,
1984 ask_price,
1985 Quantity::new(0.0, instrument.size_precision()),
1986 0,
1987 ),
1988 0,
1989 0,
1990 ts_init,
1991 ts_init,
1992 ));
1993 } else if bid_size < ask_size {
1994 let new_ask_size = Quantity::new(
1996 ask_size.as_f64() - bid_size.as_f64(),
1997 instrument.size_precision(),
1998 );
1999 temp_deltas.push(OrderBookDelta::new(
2000 instrument_id,
2001 BookAction::Update,
2002 BookOrder::new(OrderSide::Sell, ask_price, new_ask_size, 0),
2003 0,
2004 0,
2005 ts_init,
2006 ts_init,
2007 ));
2008 temp_deltas.push(OrderBookDelta::new(
2009 instrument_id,
2010 BookAction::Delete,
2011 BookOrder::new(
2012 OrderSide::Buy,
2013 bid_price,
2014 Quantity::new(0.0, instrument.size_precision()),
2015 0,
2016 ),
2017 0,
2018 0,
2019 ts_init,
2020 ts_init,
2021 ));
2022 } else {
2023 temp_deltas.push(OrderBookDelta::new(
2025 instrument_id,
2026 BookAction::Delete,
2027 BookOrder::new(
2028 OrderSide::Buy,
2029 bid_price,
2030 Quantity::new(0.0, instrument.size_precision()),
2031 0,
2032 ),
2033 0,
2034 0,
2035 ts_init,
2036 ts_init,
2037 ));
2038 temp_deltas.push(OrderBookDelta::new(
2039 instrument_id,
2040 BookAction::Delete,
2041 BookOrder::new(
2042 OrderSide::Sell,
2043 ask_price,
2044 Quantity::new(0.0, instrument.size_precision()),
2045 0,
2046 ),
2047 0,
2048 0,
2049 ts_init,
2050 ts_init,
2051 ));
2052 }
2053
2054 let temp_deltas_obj = OrderBookDeltas::new(instrument_id, temp_deltas.clone());
2056 book.apply_deltas(&temp_deltas_obj)?;
2057 all_deltas.extend(temp_deltas);
2058
2059 is_crossed = if let (Some(bid_price), Some(ask_price)) =
2061 (book.best_bid_price(), book.best_ask_price())
2062 {
2063 bid_price >= ask_price
2064 } else {
2065 false
2066 };
2067 }
2068
2069 if let Some(last_delta) = all_deltas.last_mut() {
2071 last_delta.flags = RecordFlag::F_LAST as u8;
2072 }
2073
2074 Ok(OrderBookDeltas::new(instrument_id, all_deltas))
2075 }
2076
2077 fn handle_deltas_message(
2078 deltas: OrderBookDeltas,
2079 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
2080 order_books: &Arc<DashMap<InstrumentId, OrderBook>>,
2081 last_quotes: &Arc<DashMap<InstrumentId, QuoteTick>>,
2082 instruments: &Arc<DashMap<Ustr, InstrumentAny>>,
2083 ) {
2084 let instrument_id = deltas.instrument_id;
2085
2086 let instrument = match instruments.get(&Ustr::from(instrument_id.symbol.as_ref())) {
2088 Some(inst) => inst.clone(),
2089 None => {
2090 tracing::error!(
2091 "Cannot resolve crossed order book: no instrument for {instrument_id}"
2092 );
2093 if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::from(
2095 OrderBookDeltas_API::new(deltas),
2096 ))) {
2097 tracing::error!("Failed to emit order book deltas: {e}");
2098 }
2099 return;
2100 }
2101 };
2102
2103 let mut book = order_books
2105 .entry(instrument_id)
2106 .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
2107
2108 let resolved_deltas = match Self::resolve_crossed_order_book(&mut book, deltas, &instrument)
2110 {
2111 Ok(d) => d,
2112 Err(e) => {
2113 tracing::error!("Failed to resolve crossed order book for {instrument_id}: {e}");
2114 return;
2115 }
2116 };
2117
2118 let quote_opt = if let (Some(bid_price), Some(ask_price)) =
2121 (book.best_bid_price(), book.best_ask_price())
2122 && let (Some(bid_size), Some(ask_size)) = (book.best_bid_size(), book.best_ask_size())
2123 {
2124 Some(QuoteTick::new(
2125 instrument_id,
2126 bid_price,
2127 ask_price,
2128 bid_size,
2129 ask_size,
2130 resolved_deltas.ts_event,
2131 resolved_deltas.ts_init,
2132 ))
2133 } else {
2134 if book.best_bid_price().is_none() && book.best_ask_price().is_none() {
2136 tracing::debug!(
2137 "Empty orderbook for {instrument_id} after applying deltas, using last quote"
2138 );
2139 last_quotes.get(&instrument_id).map(|q| *q)
2140 } else {
2141 None
2142 }
2143 };
2144
2145 if let Some(quote) = quote_opt {
2146 let emit_quote =
2148 !matches!(last_quotes.get(&instrument_id), Some(existing) if *existing == quote);
2149
2150 if emit_quote {
2151 last_quotes.insert(instrument_id, quote);
2152 if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Quote(quote))) {
2153 tracing::error!("Failed to emit quote tick: {e}");
2154 }
2155 }
2156 } else if book.best_bid_price().is_some() || book.best_ask_price().is_some() {
2157 tracing::debug!(
2159 "Incomplete top-of-book for {instrument_id} (bid={:?}, ask={:?})",
2160 book.best_bid_price(),
2161 book.best_ask_price()
2162 );
2163 }
2164
2165 let data: NautilusData = OrderBookDeltas_API::new(resolved_deltas).into();
2167 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
2168 tracing::error!("Failed to emit order book deltas event: {e}");
2169 }
2170 }
2171
2172 fn handle_oracle_prices(
2173 oracle_prices: std::collections::HashMap<
2174 String,
2175 crate::websocket::messages::DydxOraclePriceMarket,
2176 >,
2177 instruments: &Arc<DashMap<Ustr, InstrumentAny>>,
2178 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
2179 ) {
2180 let ts_init = get_atomic_clock_realtime().get_time_ns();
2181
2182 for (symbol_str, oracle_market) in oracle_prices {
2183 let symbol = Ustr::from(&symbol_str);
2184
2185 let Some(instrument) = instruments.get(&symbol) else {
2187 tracing::debug!(
2188 symbol = %symbol,
2189 "Received oracle price for unknown instrument (not cached yet)"
2190 );
2191 continue;
2192 };
2193
2194 let instrument_id = instrument.id();
2195
2196 let oracle_price_str = &oracle_market.oracle_price;
2198 let Ok(oracle_price_f64) = oracle_price_str.parse::<f64>() else {
2199 tracing::error!(
2200 symbol = %symbol,
2201 price_str = %oracle_price_str,
2202 "Failed to parse oracle price as f64"
2203 );
2204 continue;
2205 };
2206
2207 let price_precision = instrument.price_precision();
2208 let oracle_price = Price::from_raw(
2209 (oracle_price_f64 * 10_f64.powi(price_precision as i32)) as PriceRaw,
2210 price_precision,
2211 );
2212
2213 let oracle_price_event = DydxOraclePrice::new(
2214 instrument_id,
2215 oracle_price,
2216 ts_init, ts_init,
2218 );
2219
2220 tracing::debug!(
2221 instrument_id = %instrument_id,
2222 oracle_price = %oracle_price,
2223 "Received dYdX oracle price: {oracle_price_event:?}"
2224 );
2225
2226 let data = NautilusData::IndexPriceUpdate(IndexPriceUpdate::new(
2227 instrument_id,
2228 oracle_price,
2229 ts_init, ts_init,
2231 ));
2232
2233 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
2234 tracing::error!("Failed to emit oracle price: {e}");
2235 }
2236 }
2237 }
2238}
2239
2240#[cfg(test)]
2241mod tests {
2242 use std::{collections::HashMap, net::SocketAddr};
2243
2244 use axum::{
2245 Router,
2246 extract::{Path, Query, State},
2247 response::Json,
2248 routing::get,
2249 };
2250 use indexmap::IndexMap;
2251 use nautilus_common::{
2252 live::runner::set_data_event_sender,
2253 messages::{DataEvent, data::DataResponse},
2254 };
2255 use nautilus_core::UUID4;
2256 use nautilus_model::{
2257 data::{
2258 BarSpecification, BarType, Data as NautilusData, OrderBookDelta, OrderBookDeltas,
2259 TradeTick, order::BookOrder,
2260 },
2261 enums::{
2262 AggregationSource, AggressorSide, BarAggregation, BookAction, BookType, OrderSide,
2263 PriceType,
2264 },
2265 identifiers::{ClientId, InstrumentId, Symbol, Venue},
2266 instruments::{CryptoPerpetual, Instrument, InstrumentAny},
2267 orderbook::OrderBook,
2268 types::{Currency, Price, Quantity},
2269 };
2270 use rstest::rstest;
2271 use rust_decimal::Decimal;
2272 use rust_decimal_macros::dec;
2273 use tokio::net::TcpListener;
2274
2275 use super::*;
2276 use crate::http::models::{Candle, CandlesResponse};
2277
2278 fn setup_test_env() {
2279 let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel();
2281 set_data_event_sender(sender);
2282 }
2283
2284 #[rstest]
2285 fn test_new_data_client() {
2286 setup_test_env();
2287
2288 let client_id = ClientId::from("DYDX-001");
2289 let config = DydxDataClientConfig::default();
2290 let http_client = DydxHttpClient::default();
2291
2292 let client = DydxDataClient::new(client_id, config, http_client, None);
2293 assert!(client.is_ok());
2294
2295 let client = client.unwrap();
2296 assert_eq!(client.client_id(), client_id);
2297 assert_eq!(client.venue(), *DYDX_VENUE);
2298 assert!(!client.is_connected());
2299 }
2300
2301 #[tokio::test]
2302 async fn test_data_client_lifecycle() {
2303 setup_test_env();
2304
2305 let client_id = ClientId::from("DYDX-001");
2306 let config = DydxDataClientConfig::default();
2307 let http_client = DydxHttpClient::default();
2308
2309 let mut client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2310
2311 assert!(client.start().is_ok());
2313
2314 assert!(client.stop().is_ok());
2316 assert!(!client.is_connected());
2317
2318 assert!(client.reset().is_ok());
2320
2321 assert!(client.dispose().is_ok());
2323 }
2324
2325 #[rstest]
2326 fn test_subscribe_unsubscribe_instruments_noop() {
2327 setup_test_env();
2328
2329 let client_id = ClientId::from("DYDX-TEST");
2330 let config = DydxDataClientConfig::default();
2331 let http_client = DydxHttpClient::default();
2332
2333 let mut client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2334
2335 let venue = *DYDX_VENUE;
2336 let command_id = UUID4::new();
2337 let ts_init = get_atomic_clock_realtime().get_time_ns();
2338
2339 let subscribe = SubscribeInstruments {
2340 client_id: Some(client_id),
2341 venue,
2342 command_id,
2343 ts_init,
2344 params: None,
2345 };
2346 let unsubscribe = UnsubscribeInstruments::new(None, venue, command_id, ts_init, None);
2347
2348 assert!(client.subscribe_instruments(&subscribe).is_ok());
2350 assert!(client.unsubscribe_instruments(&unsubscribe).is_ok());
2351 }
2352
2353 #[rstest]
2354 fn test_bar_type_mappings_registration() {
2355 setup_test_env();
2356
2357 let client_id = ClientId::from("DYDX-TEST");
2358 let config = DydxDataClientConfig::default();
2359 let http_client = DydxHttpClient::default();
2360
2361 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2362
2363 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2364 let spec = BarSpecification {
2365 step: std::num::NonZeroUsize::new(1).unwrap(),
2366 aggregation: BarAggregation::Minute,
2367 price_type: PriceType::Last,
2368 };
2369 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2370
2371 assert!(client.get_bar_topics().is_empty());
2373 assert!(client.get_bar_type_for_topic("BTC-USD/1MIN").is_none());
2374
2375 client
2377 .bar_type_mappings
2378 .insert("BTC-USD/1MIN".to_string(), bar_type);
2379
2380 assert_eq!(client.get_bar_topics().len(), 1);
2382 assert!(
2383 client
2384 .get_bar_topics()
2385 .contains(&"BTC-USD/1MIN".to_string())
2386 );
2387 assert_eq!(
2388 client.get_bar_type_for_topic("BTC-USD/1MIN"),
2389 Some(bar_type)
2390 );
2391
2392 let spec_5min = BarSpecification {
2394 step: std::num::NonZeroUsize::new(5).unwrap(),
2395 aggregation: BarAggregation::Minute,
2396 price_type: PriceType::Last,
2397 };
2398 let bar_type_5min = BarType::new(instrument_id, spec_5min, AggregationSource::External);
2399 client
2400 .bar_type_mappings
2401 .insert("BTC-USD/5MINS".to_string(), bar_type_5min);
2402
2403 assert_eq!(client.get_bar_topics().len(), 2);
2405 assert_eq!(
2406 client.get_bar_type_for_topic("BTC-USD/5MINS"),
2407 Some(bar_type_5min)
2408 );
2409 }
2410
2411 #[rstest]
2412 fn test_bar_type_mappings_unregistration() {
2413 setup_test_env();
2414
2415 let client_id = ClientId::from("DYDX-TEST");
2416 let config = DydxDataClientConfig::default();
2417 let http_client = DydxHttpClient::default();
2418
2419 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2420
2421 let instrument_id = InstrumentId::from("ETH-USD-PERP.DYDX");
2422 let spec = BarSpecification {
2423 step: std::num::NonZeroUsize::new(1).unwrap(),
2424 aggregation: BarAggregation::Hour,
2425 price_type: PriceType::Last,
2426 };
2427 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2428
2429 client
2431 .bar_type_mappings
2432 .insert("ETH-USD/1HOUR".to_string(), bar_type);
2433 assert_eq!(client.get_bar_topics().len(), 1);
2434
2435 client.bar_type_mappings.remove("ETH-USD/1HOUR");
2437
2438 assert!(client.get_bar_topics().is_empty());
2440 assert!(client.get_bar_type_for_topic("ETH-USD/1HOUR").is_none());
2441 }
2442
2443 #[rstest]
2444 fn test_bar_type_mappings_lookup_nonexistent() {
2445 setup_test_env();
2446
2447 let client_id = ClientId::from("DYDX-TEST");
2448 let config = DydxDataClientConfig::default();
2449 let http_client = DydxHttpClient::default();
2450
2451 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2452
2453 assert!(client.get_bar_type_for_topic("NONEXISTENT/1MIN").is_none());
2455 assert!(client.get_bar_topics().is_empty());
2456 }
2457
2458 #[tokio::test]
2459 async fn test_handle_ws_message_deltas_updates_orderbook_and_emits_quote() {
2460 setup_test_env();
2461
2462 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2463 let instruments = Arc::new(DashMap::new());
2464 let order_books = Arc::new(DashMap::new());
2465 let last_quotes = Arc::new(DashMap::new());
2466 let ws_client: Option<DydxWebSocketClient> = None;
2467 let active_orderbook_subs = Arc::new(DashMap::new());
2468 let active_trade_subs = Arc::new(DashMap::new());
2469 let active_bar_subs = Arc::new(DashMap::new());
2470
2471 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2472 let bar_ts = get_atomic_clock_realtime().get_time_ns();
2473
2474 use nautilus_model::{identifiers::Symbol, instruments::CryptoPerpetual, types::Currency};
2476 let symbol = Symbol::from("BTC-USD-PERP");
2477 let instrument = CryptoPerpetual::new(
2478 instrument_id,
2479 symbol,
2480 Currency::BTC(),
2481 Currency::USD(),
2482 Currency::USD(),
2483 false,
2484 2,
2485 4,
2486 Price::from("0.01"),
2487 Quantity::from("0.0001"),
2488 None,
2489 None,
2490 None,
2491 None,
2492 None,
2493 None,
2494 None,
2495 None,
2496 None,
2497 None,
2498 None,
2499 None,
2500 bar_ts,
2501 bar_ts,
2502 );
2503 instruments.insert(
2504 Ustr::from("BTC-USD-PERP"),
2505 InstrumentAny::CryptoPerpetual(instrument),
2506 );
2507
2508 let price = Price::from("100.00");
2509 let size = Quantity::from("1.0");
2510
2511 let bid_delta = OrderBookDelta::new(
2513 instrument_id,
2514 BookAction::Add,
2515 BookOrder::new(OrderSide::Buy, price, size, 1),
2516 0,
2517 1,
2518 bar_ts,
2519 bar_ts,
2520 );
2521 let ask_delta = OrderBookDelta::new(
2522 instrument_id,
2523 BookAction::Add,
2524 BookOrder::new(OrderSide::Sell, Price::from("101.00"), size, 1),
2525 0,
2526 1,
2527 bar_ts,
2528 bar_ts,
2529 );
2530 let deltas = OrderBookDeltas::new(instrument_id, vec![bid_delta, ask_delta]);
2531
2532 let message = crate::websocket::enums::NautilusWsMessage::Deltas(Box::new(deltas));
2533
2534 let incomplete_bars = Arc::new(DashMap::new());
2535 let ctx = WsMessageContext {
2536 data_sender: &sender,
2537 instruments: &instruments,
2538 order_books: &order_books,
2539 last_quotes: &last_quotes,
2540 ws_client: &ws_client,
2541 active_orderbook_subs: &active_orderbook_subs,
2542 active_trade_subs: &active_trade_subs,
2543 active_bar_subs: &active_bar_subs,
2544 incomplete_bars: &incomplete_bars,
2545 };
2546 DydxDataClient::handle_ws_message(message, &ctx);
2547
2548 assert!(order_books.get(&instrument_id).is_some());
2550 assert!(last_quotes.get(&instrument_id).is_some());
2551
2552 let mut saw_quote = false;
2554 let mut saw_deltas = false;
2555
2556 while let Ok(event) = rx.try_recv() {
2557 if let DataEvent::Data(data) = event {
2558 match data {
2559 NautilusData::Quote(_) => saw_quote = true,
2560 NautilusData::Deltas(_) => saw_deltas = true,
2561 _ => {}
2562 }
2563 }
2564 }
2565
2566 assert!(saw_quote);
2567 assert!(saw_deltas);
2568 }
2569
2570 #[rstest]
2571 fn test_handle_ws_message_error_does_not_panic() {
2572 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2575 let instruments = Arc::new(DashMap::new());
2576 let order_books = Arc::new(DashMap::new());
2577 let last_quotes = Arc::new(DashMap::new());
2578 let ws_client: Option<DydxWebSocketClient> = None;
2579 let active_orderbook_subs = Arc::new(DashMap::new());
2580 let active_trade_subs = Arc::new(DashMap::new());
2581 let active_bar_subs = Arc::new(DashMap::new());
2582 let incomplete_bars = Arc::new(DashMap::new());
2583
2584 let ctx = WsMessageContext {
2585 data_sender: &sender,
2586 instruments: &instruments,
2587 order_books: &order_books,
2588 last_quotes: &last_quotes,
2589 ws_client: &ws_client,
2590 active_orderbook_subs: &active_orderbook_subs,
2591 active_trade_subs: &active_trade_subs,
2592 active_bar_subs: &active_bar_subs,
2593 incomplete_bars: &incomplete_bars,
2594 };
2595
2596 let err = crate::websocket::error::DydxWebSocketError::from_message(
2597 "malformed WebSocket payload".to_string(),
2598 );
2599
2600 DydxDataClient::handle_ws_message(
2601 crate::websocket::enums::NautilusWsMessage::Error(err),
2602 &ctx,
2603 );
2604 }
2605
2606 #[tokio::test]
2607 async fn test_request_bars_partitioning_math_does_not_panic() {
2608 setup_test_env();
2609
2610 let client_id = ClientId::from("DYDX-BARS");
2611 let config = DydxDataClientConfig::default();
2612 let http_client = DydxHttpClient::default();
2613
2614 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2615
2616 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
2617 let spec = BarSpecification {
2618 step: std::num::NonZeroUsize::new(1).unwrap(),
2619 aggregation: BarAggregation::Minute,
2620 price_type: PriceType::Last,
2621 };
2622 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2623
2624 let now = Utc::now();
2625 let start = Some(now - chrono::Duration::hours(10));
2626 let end = Some(now);
2627
2628 let request = RequestBars::new(
2629 bar_type,
2630 start,
2631 end,
2632 None,
2633 Some(client_id),
2634 UUID4::new(),
2635 get_atomic_clock_realtime().get_time_ns(),
2636 None,
2637 );
2638
2639 assert!(client.request_bars(&request).is_ok());
2642 }
2643
2644 #[tokio::test]
2645 async fn test_request_bars_partitioning_months_range_does_not_overflow() {
2646 setup_test_env();
2647
2648 let now = Utc::now();
2650 let candle = crate::http::models::Candle {
2651 started_at: now - chrono::Duration::minutes(1),
2652 ticker: "BTC-USD".to_string(),
2653 resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2654 open: dec!(100.0),
2655 high: dec!(101.0),
2656 low: dec!(99.0),
2657 close: dec!(100.5),
2658 base_token_volume: dec!(1.0),
2659 usd_volume: dec!(100.0),
2660 trades: 10,
2661 starting_open_interest: dec!(1000.0),
2662 };
2663 let candles_response = crate::http::models::CandlesResponse {
2664 candles: vec![candle],
2665 };
2666 let state = CandlesTestState {
2667 response: Arc::new(candles_response),
2668 };
2669 let addr = start_candles_test_server(state).await;
2670 let base_url = format!("http://{addr}");
2671
2672 let client_id = ClientId::from("DYDX-BARS-MONTHS");
2673 let config = DydxDataClientConfig {
2674 base_url_http: Some(base_url),
2675 is_testnet: true,
2676 ..Default::default()
2677 };
2678
2679 let http_client = DydxHttpClient::new(
2680 config.base_url_http.clone(),
2681 config.http_timeout_secs,
2682 config.http_proxy_url.clone(),
2683 config.is_testnet,
2684 None,
2685 )
2686 .unwrap();
2687
2688 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
2689
2690 let instrument = create_test_instrument_any();
2692 let instrument_id = instrument.id();
2693 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
2694 client.instruments.insert(symbol_key, instrument);
2695
2696 let spec = BarSpecification {
2697 step: std::num::NonZeroUsize::new(1).unwrap(),
2698 aggregation: BarAggregation::Minute,
2699 price_type: PriceType::Last,
2700 };
2701 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2702
2703 let start = Some(now - chrono::Duration::days(90));
2705 let end = Some(now);
2706
2707 let limit = Some(std::num::NonZeroUsize::new(10).unwrap());
2709
2710 let request = RequestBars::new(
2711 bar_type,
2712 start,
2713 end,
2714 limit,
2715 Some(client_id),
2716 UUID4::new(),
2717 get_atomic_clock_realtime().get_time_ns(),
2718 None,
2719 );
2720
2721 assert!(client.request_bars(&request).is_ok());
2722 }
2723
2724 #[derive(Clone)]
2725 struct OrderbookTestState {
2726 snapshot: Arc<crate::http::models::OrderbookResponse>,
2727 }
2728
2729 #[derive(Clone)]
2730 struct TradesTestState {
2731 response: Arc<crate::http::models::TradesResponse>,
2732 last_ticker: Arc<tokio::sync::Mutex<Option<String>>>,
2733 last_limit: Arc<tokio::sync::Mutex<Option<Option<u32>>>>,
2734 }
2735
2736 #[derive(Clone)]
2737 struct CandlesTestState {
2738 response: Arc<crate::http::models::CandlesResponse>,
2739 }
2740
2741 async fn start_orderbook_test_server(state: OrderbookTestState) -> SocketAddr {
2742 async fn handle_orderbook(
2743 Path(_ticker): Path<String>,
2744 State(state): State<OrderbookTestState>,
2745 ) -> Json<crate::http::models::OrderbookResponse> {
2746 Json((*state.snapshot).clone())
2747 }
2748
2749 let router = Router::new().route(
2750 "/v4/orderbooks/perpetualMarket/{ticker}",
2751 get(handle_orderbook).with_state(state),
2752 );
2753
2754 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2755 let addr = listener.local_addr().unwrap();
2756
2757 tokio::spawn(async move {
2758 axum::serve(listener, router.into_make_service())
2759 .await
2760 .unwrap();
2761 });
2762
2763 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2764 addr
2765 }
2766
2767 async fn start_trades_test_server(state: TradesTestState) -> SocketAddr {
2768 async fn handle_trades(
2769 Path(ticker): Path<String>,
2770 Query(params): Query<HashMap<String, String>>,
2771 State(state): State<TradesTestState>,
2772 ) -> Json<crate::http::models::TradesResponse> {
2773 {
2774 let mut last_ticker = state.last_ticker.lock().await;
2775 *last_ticker = Some(ticker);
2776 }
2777
2778 let limit = params
2779 .get("limit")
2780 .and_then(|value| value.parse::<u32>().ok());
2781 {
2782 let mut last_limit = state.last_limit.lock().await;
2783 *last_limit = Some(limit);
2784 }
2785
2786 Json((*state.response).clone())
2787 }
2788
2789 let router = Router::new().route(
2790 "/v4/trades/perpetualMarket/{ticker}",
2791 get(handle_trades).with_state(state),
2792 );
2793
2794 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2795 let addr = listener.local_addr().unwrap();
2796
2797 tokio::spawn(async move {
2798 axum::serve(listener, router.into_make_service())
2799 .await
2800 .unwrap();
2801 });
2802
2803 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2804 addr
2805 }
2806
2807 async fn start_candles_test_server(state: CandlesTestState) -> SocketAddr {
2808 async fn handle_candles(
2809 Path(_ticker): Path<String>,
2810 Query(_params): Query<HashMap<String, String>>,
2811 State(state): State<CandlesTestState>,
2812 ) -> Json<crate::http::models::CandlesResponse> {
2813 Json((*state.response).clone())
2814 }
2815
2816 let router = Router::new().route(
2817 "/v4/candles/perpetualMarkets/{ticker}",
2818 get(handle_candles).with_state(state),
2819 );
2820
2821 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2822 let addr = listener.local_addr().unwrap();
2823
2824 tokio::spawn(async move {
2825 axum::serve(listener, router.into_make_service())
2826 .await
2827 .unwrap();
2828 });
2829
2830 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2831 addr
2832 }
2833
2834 fn create_test_instrument_any() -> InstrumentAny {
2835 let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
2836
2837 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
2838 instrument_id,
2839 instrument_id.symbol,
2840 Currency::BTC(),
2841 Currency::USD(),
2842 Currency::USD(),
2843 false,
2844 2, 8, Price::new(0.01, 2), Quantity::new(0.001, 8), Some(Quantity::new(1.0, 0)), Some(Quantity::new(0.001, 8)), Some(Quantity::new(100000.0, 8)), Some(Quantity::new(0.001, 8)), None, None, Some(Price::new(1000000.0, 2)), Some(Price::new(0.01, 2)), Some(dec!(0.05)), Some(dec!(0.03)), Some(dec!(0.0002)), Some(dec!(0.0005)), UnixNanos::default(), UnixNanos::default(), ))
2863 }
2864
2865 #[tokio::test]
2870 async fn test_candle_to_bar_price_size_edge_cases() {
2871 setup_test_env();
2872
2873 let clock = get_atomic_clock_realtime();
2874 let now = Utc::now();
2875
2876 let candle = Candle {
2878 started_at: now,
2879 ticker: "BTC-USD".to_string(),
2880 resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2881 open: dec!(123456789.123456),
2882 high: dec!(987654321.987654), low: dec!(123456.789), close: dec!(223456789.123456), base_token_volume: dec!(0.00000001),
2886 usd_volume: dec!(1234500.0),
2887 trades: 42,
2888 starting_open_interest: dec!(1000.0),
2889 };
2890
2891 let instrument = create_test_instrument_any();
2892 let instrument_id = instrument.id();
2893 let spec = BarSpecification {
2894 step: std::num::NonZeroUsize::new(1).unwrap(),
2895 aggregation: BarAggregation::Minute,
2896 price_type: PriceType::Last,
2897 };
2898 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2899
2900 let bar = DydxDataClient::candle_to_bar(
2901 &candle,
2902 bar_type,
2903 instrument.price_precision(),
2904 instrument.size_precision(),
2905 60,
2906 clock,
2907 )
2908 .expect("candle_to_bar should handle large/scientific values");
2909
2910 assert!(bar.open.as_f64() > 0.0);
2911 assert!(bar.high.as_f64() >= bar.low.as_f64());
2912 assert!(bar.volume.as_f64() > 0.0);
2913 }
2914
2915 #[tokio::test]
2916 async fn test_candle_to_bar_ts_event_overflow_safe() {
2917 setup_test_env();
2918
2919 let clock = get_atomic_clock_realtime();
2920 let now = Utc::now();
2921
2922 let candle = Candle {
2923 started_at: now,
2924 ticker: "BTC-USD".to_string(),
2925 resolution: crate::common::enums::DydxCandleResolution::OneDay,
2926 open: Decimal::from(1),
2927 high: Decimal::from(1),
2928 low: Decimal::from(1),
2929 close: Decimal::from(1),
2930 base_token_volume: Decimal::from(1),
2931 usd_volume: Decimal::from(1),
2932 trades: 1,
2933 starting_open_interest: Decimal::from(1),
2934 };
2935
2936 let instrument = create_test_instrument_any();
2937 let instrument_id = instrument.id();
2938 let spec = BarSpecification {
2939 step: std::num::NonZeroUsize::new(1).unwrap(),
2940 aggregation: BarAggregation::Day,
2941 price_type: PriceType::Last,
2942 };
2943 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
2944
2945 let bar_secs = i64::MAX / 1_000_000_000;
2947 let bar = DydxDataClient::candle_to_bar(
2948 &candle,
2949 bar_type,
2950 instrument.price_precision(),
2951 instrument.size_precision(),
2952 bar_secs,
2953 clock,
2954 )
2955 .expect("candle_to_bar should not overflow on ts_event");
2956
2957 assert!(bar.ts_event.as_u64() >= bar.ts_init.as_u64());
2958 }
2959
2960 #[tokio::test]
2961 async fn test_request_bars_incomplete_bar_filtering_with_clock_skew() {
2962 let clock = get_atomic_clock_realtime();
2965 let now = Utc::now();
2966
2967 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2970 set_data_event_sender(sender);
2971
2972 let candle_past = Candle {
2974 started_at: now - chrono::Duration::minutes(2),
2975 ticker: "BTC-USD".to_string(),
2976 resolution: crate::common::enums::DydxCandleResolution::OneMinute,
2977 open: Decimal::from(1),
2978 high: Decimal::from(2),
2979 low: Decimal::from(1),
2980 close: Decimal::from(1),
2981 base_token_volume: Decimal::from(1),
2982 usd_volume: Decimal::from(1),
2983 trades: 1,
2984 starting_open_interest: Decimal::from(1),
2985 };
2986 let candle_future = Candle {
2987 started_at: now + chrono::Duration::minutes(2),
2988 ..candle_past.clone()
2989 };
2990
2991 let candles_response = CandlesResponse {
2992 candles: vec![candle_past, candle_future],
2993 };
2994
2995 let state = CandlesTestState {
2996 response: Arc::new(candles_response),
2997 };
2998 let addr = start_candles_test_server(state).await;
2999 let base_url = format!("http://{addr}");
3000
3001 let client_id = ClientId::from("DYDX-BARS-SKEW");
3002 let config = DydxDataClientConfig {
3003 base_url_http: Some(base_url),
3004 is_testnet: true,
3005 ..Default::default()
3006 };
3007
3008 let http_client = DydxHttpClient::new(
3009 config.base_url_http.clone(),
3010 config.http_timeout_secs,
3011 config.http_proxy_url.clone(),
3012 config.is_testnet,
3013 None,
3014 )
3015 .unwrap();
3016
3017 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3018
3019 let instrument = create_test_instrument_any();
3020 let instrument_id = instrument.id();
3021 let symbol_key = Ustr::from(instrument_id.symbol.as_ref());
3022 client.instruments.insert(symbol_key, instrument);
3023
3024 let spec = BarSpecification {
3025 step: std::num::NonZeroUsize::new(1).unwrap(),
3026 aggregation: BarAggregation::Minute,
3027 price_type: PriceType::Last,
3028 };
3029 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
3030
3031 let request = RequestBars::new(
3032 bar_type,
3033 Some(now - chrono::Duration::minutes(5)),
3034 Some(now + chrono::Duration::minutes(5)),
3035 None,
3036 Some(client_id),
3037 UUID4::new(),
3038 clock.get_time_ns(),
3039 None,
3040 );
3041
3042 assert!(client.request_bars(&request).is_ok());
3043
3044 let timeout = tokio::time::Duration::from_secs(3);
3045 if let Ok(Some(DataEvent::Response(DataResponse::Bars(resp)))) =
3046 tokio::time::timeout(timeout, rx.recv()).await
3047 {
3048 assert_eq!(resp.data.len(), 1);
3050 }
3051 }
3052
3053 #[rstest]
3054 fn test_decimal_to_f64_precision_loss_within_tolerance() {
3055 let price_value = 12345.125_f64;
3057 let qty_value = 0.00012345_f64;
3058
3059 let price = Price::new(price_value, 6);
3060 let qty = Quantity::new(qty_value, 8);
3061
3062 let price_diff = (price.as_f64() - price_value).abs();
3063 let qty_diff = (qty.as_f64() - qty_value).abs();
3064
3065 assert!(price_diff < 1e-10);
3067 assert!(qty_diff < 1e-12);
3068 }
3069
3070 #[tokio::test]
3071 async fn test_orderbook_refresh_task_applies_http_snapshot_and_emits_event() {
3072 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3074 set_data_event_sender(sender);
3075
3076 let snapshot = crate::http::models::OrderbookResponse {
3078 bids: vec![crate::http::models::OrderbookLevel {
3079 price: dec!(100.0),
3080 size: dec!(1.0),
3081 }],
3082 asks: vec![crate::http::models::OrderbookLevel {
3083 price: dec!(101.0),
3084 size: dec!(2.0),
3085 }],
3086 };
3087 let state = OrderbookTestState {
3088 snapshot: Arc::new(snapshot),
3089 };
3090 let addr = start_orderbook_test_server(state).await;
3091 let base_url = format!("http://{addr}");
3092
3093 let client_id = ClientId::from("DYDX-REFRESH");
3095 let config = DydxDataClientConfig {
3096 is_testnet: true,
3097 base_url_http: Some(base_url),
3098 orderbook_refresh_interval_secs: Some(1),
3099 instrument_refresh_interval_secs: None,
3100 ..Default::default()
3101 };
3102
3103 let http_client = DydxHttpClient::new(
3104 config.base_url_http.clone(),
3105 config.http_timeout_secs,
3106 config.http_proxy_url.clone(),
3107 config.is_testnet,
3108 None,
3109 )
3110 .unwrap();
3111
3112 let mut client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3113
3114 let instrument = create_test_instrument_any();
3116 let instrument_id = instrument.id();
3117 let symbol_key = Ustr::from(instrument_id.symbol.as_ref());
3118 client.instruments.insert(symbol_key, instrument);
3119 client.order_books.insert(
3120 instrument_id,
3121 OrderBook::new(instrument_id, BookType::L2_MBP),
3122 );
3123 client.active_orderbook_subs.insert(instrument_id, ());
3124
3125 client.start_orderbook_refresh_task().unwrap();
3127
3128 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
3129 let mut saw_snapshot_event = false;
3130
3131 while std::time::Instant::now() < deadline {
3132 if let Ok(Some(DataEvent::Data(NautilusData::Deltas(_)))) =
3133 tokio::time::timeout(std::time::Duration::from_millis(250), rx.recv()).await
3134 {
3135 saw_snapshot_event = true;
3136 break;
3137 }
3138 }
3139
3140 assert!(
3141 saw_snapshot_event,
3142 "expected at least one snapshot deltas event from refresh task"
3143 );
3144
3145 let book = client
3147 .order_books
3148 .get(&instrument_id)
3149 .expect("orderbook should exist after refresh");
3150 let best_bid = book.best_bid_price().expect("best bid should be set");
3151 let best_ask = book.best_ask_price().expect("best ask should be set");
3152
3153 assert_eq!(best_bid, Price::from("100.00"));
3154 assert_eq!(best_ask, Price::from("101.00"));
3155 }
3156
3157 #[rstest]
3158 fn test_resolve_crossed_order_book_bid_larger_than_ask() {
3159 let instrument = create_test_instrument_any();
3162 let instrument_id = instrument.id();
3163 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3164 let ts_init = get_atomic_clock_realtime().get_time_ns();
3165
3166 let initial_deltas = vec![
3168 OrderBookDelta::new(
3169 instrument_id,
3170 BookAction::Add,
3171 BookOrder::new(
3172 OrderSide::Buy,
3173 Price::from("99.00"),
3174 Quantity::from("1.0"),
3175 0,
3176 ),
3177 0,
3178 0,
3179 ts_init,
3180 ts_init,
3181 ),
3182 OrderBookDelta::new(
3183 instrument_id,
3184 BookAction::Add,
3185 BookOrder::new(
3186 OrderSide::Sell,
3187 Price::from("101.00"),
3188 Quantity::from("2.0"),
3189 0,
3190 ),
3191 0,
3192 0,
3193 ts_init,
3194 ts_init,
3195 ),
3196 ];
3197 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3198 .unwrap();
3199
3200 let crossed_deltas = vec![OrderBookDelta::new(
3202 instrument_id,
3203 BookAction::Add,
3204 BookOrder::new(
3205 OrderSide::Buy,
3206 Price::from("102.00"),
3207 Quantity::from("5.0"),
3208 0,
3209 ),
3210 0,
3211 0,
3212 ts_init,
3213 ts_init,
3214 )];
3215 let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3216
3217 let resolved =
3218 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3219 .unwrap();
3220
3221 assert_eq!(book.best_bid_price(), Some(Price::from("102.00")));
3224 assert!(book.best_bid_size().unwrap().as_f64() < 5.0); assert!(
3226 book.best_ask_price().is_none()
3227 || book.best_ask_price().unwrap() > book.best_bid_price().unwrap()
3228 ); assert!(resolved.deltas.len() > 1); }
3233
3234 #[rstest]
3235 fn test_resolve_crossed_order_book_ask_larger_than_bid() {
3236 let instrument = create_test_instrument_any();
3239 let instrument_id = instrument.id();
3240 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3241 let ts_init = get_atomic_clock_realtime().get_time_ns();
3242
3243 let initial_deltas = vec![
3245 OrderBookDelta::new(
3246 instrument_id,
3247 BookAction::Add,
3248 BookOrder::new(
3249 OrderSide::Buy,
3250 Price::from("99.00"),
3251 Quantity::from("1.0"),
3252 0,
3253 ),
3254 0,
3255 0,
3256 ts_init,
3257 ts_init,
3258 ),
3259 OrderBookDelta::new(
3260 instrument_id,
3261 BookAction::Add,
3262 BookOrder::new(
3263 OrderSide::Sell,
3264 Price::from("101.00"),
3265 Quantity::from("5.0"),
3266 0,
3267 ),
3268 0,
3269 0,
3270 ts_init,
3271 ts_init,
3272 ),
3273 ];
3274 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3275 .unwrap();
3276
3277 let crossed_deltas = vec![OrderBookDelta::new(
3279 instrument_id,
3280 BookAction::Add,
3281 BookOrder::new(
3282 OrderSide::Buy,
3283 Price::from("102.00"),
3284 Quantity::from("2.0"),
3285 0,
3286 ),
3287 0,
3288 0,
3289 ts_init,
3290 ts_init,
3291 )];
3292 let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3293
3294 let resolved =
3295 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3296 .unwrap();
3297
3298 assert_eq!(book.best_ask_price(), Some(Price::from("101.00")));
3300 assert!(book.best_ask_size().unwrap().as_f64() < 5.0); assert_eq!(book.best_bid_price(), Some(Price::from("99.00"))); assert!(book.best_ask_price().unwrap() > book.best_bid_price().unwrap()); assert!(resolved.deltas.len() > 1);
3306 }
3307
3308 #[rstest]
3309 fn test_resolve_crossed_order_book_equal_sizes() {
3310 let instrument = create_test_instrument_any();
3313 let instrument_id = instrument.id();
3314 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3315 let ts_init = get_atomic_clock_realtime().get_time_ns();
3316
3317 let initial_deltas = vec![
3319 OrderBookDelta::new(
3320 instrument_id,
3321 BookAction::Add,
3322 BookOrder::new(
3323 OrderSide::Buy,
3324 Price::from("99.00"),
3325 Quantity::from("1.0"),
3326 0,
3327 ),
3328 0,
3329 0,
3330 ts_init,
3331 ts_init,
3332 ),
3333 OrderBookDelta::new(
3334 instrument_id,
3335 BookAction::Add,
3336 BookOrder::new(
3337 OrderSide::Sell,
3338 Price::from("101.00"),
3339 Quantity::from("3.0"),
3340 0,
3341 ),
3342 0,
3343 0,
3344 ts_init,
3345 ts_init,
3346 ),
3347 ];
3348 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3349 .unwrap();
3350
3351 let crossed_deltas = vec![OrderBookDelta::new(
3353 instrument_id,
3354 BookAction::Add,
3355 BookOrder::new(
3356 OrderSide::Buy,
3357 Price::from("102.00"),
3358 Quantity::from("3.0"),
3359 0,
3360 ),
3361 0,
3362 0,
3363 ts_init,
3364 ts_init,
3365 )];
3366 let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3367
3368 let resolved =
3369 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3370 .unwrap();
3371
3372 assert_eq!(book.best_bid_price(), Some(Price::from("99.00"))); if let Some(ask_price) = book.best_ask_price() {
3376 assert!(ask_price > book.best_bid_price().unwrap()); }
3378
3379 assert!(resolved.deltas.len() > 1);
3381 }
3382
3383 #[rstest]
3384 fn test_resolve_crossed_order_book_multiple_iterations() {
3385 let instrument = create_test_instrument_any();
3387 let instrument_id = instrument.id();
3388 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3389 let ts_init = get_atomic_clock_realtime().get_time_ns();
3390
3391 let initial_deltas = vec![
3393 OrderBookDelta::new(
3394 instrument_id,
3395 BookAction::Add,
3396 BookOrder::new(
3397 OrderSide::Buy,
3398 Price::from("98.00"),
3399 Quantity::from("1.0"),
3400 0,
3401 ),
3402 0,
3403 0,
3404 ts_init,
3405 ts_init,
3406 ),
3407 OrderBookDelta::new(
3408 instrument_id,
3409 BookAction::Add,
3410 BookOrder::new(
3411 OrderSide::Sell,
3412 Price::from("100.00"),
3413 Quantity::from("1.0"),
3414 0,
3415 ),
3416 0,
3417 0,
3418 ts_init,
3419 ts_init,
3420 ),
3421 OrderBookDelta::new(
3422 instrument_id,
3423 BookAction::Add,
3424 BookOrder::new(
3425 OrderSide::Sell,
3426 Price::from("101.00"),
3427 Quantity::from("1.0"),
3428 0,
3429 ),
3430 0,
3431 0,
3432 ts_init,
3433 ts_init,
3434 ),
3435 ];
3436 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3437 .unwrap();
3438
3439 let crossed_deltas = vec![
3441 OrderBookDelta::new(
3442 instrument_id,
3443 BookAction::Add,
3444 BookOrder::new(
3445 OrderSide::Buy,
3446 Price::from("102.00"),
3447 Quantity::from("1.0"),
3448 0,
3449 ),
3450 0,
3451 0,
3452 ts_init,
3453 ts_init,
3454 ),
3455 OrderBookDelta::new(
3456 instrument_id,
3457 BookAction::Add,
3458 BookOrder::new(
3459 OrderSide::Buy,
3460 Price::from("103.00"),
3461 Quantity::from("1.0"),
3462 0,
3463 ),
3464 0,
3465 0,
3466 ts_init,
3467 ts_init,
3468 ),
3469 ];
3470 let venue_deltas = OrderBookDeltas::new(instrument_id, crossed_deltas);
3471
3472 let resolved =
3473 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3474 .unwrap();
3475
3476 if let (Some(bid_price), Some(ask_price)) = (book.best_bid_price(), book.best_ask_price()) {
3478 assert!(ask_price > bid_price, "Book should be uncrossed");
3479 }
3480
3481 assert!(resolved.deltas.len() > 2); }
3484
3485 #[rstest]
3486 fn test_resolve_crossed_order_book_non_crossed_passthrough() {
3487 let instrument = create_test_instrument_any();
3489 let instrument_id = instrument.id();
3490 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
3491 let ts_init = get_atomic_clock_realtime().get_time_ns();
3492
3493 let initial_deltas = vec![
3495 OrderBookDelta::new(
3496 instrument_id,
3497 BookAction::Add,
3498 BookOrder::new(
3499 OrderSide::Buy,
3500 Price::from("99.00"),
3501 Quantity::from("1.0"),
3502 0,
3503 ),
3504 0,
3505 0,
3506 ts_init,
3507 ts_init,
3508 ),
3509 OrderBookDelta::new(
3510 instrument_id,
3511 BookAction::Add,
3512 BookOrder::new(
3513 OrderSide::Sell,
3514 Price::from("101.00"),
3515 Quantity::from("1.0"),
3516 0,
3517 ),
3518 0,
3519 0,
3520 ts_init,
3521 ts_init,
3522 ),
3523 ];
3524 book.apply_deltas(&OrderBookDeltas::new(instrument_id, initial_deltas))
3525 .unwrap();
3526
3527 let new_deltas = vec![OrderBookDelta::new(
3529 instrument_id,
3530 BookAction::Add,
3531 BookOrder::new(
3532 OrderSide::Buy,
3533 Price::from("98.50"),
3534 Quantity::from("2.0"),
3535 0,
3536 ),
3537 0,
3538 0,
3539 ts_init,
3540 ts_init,
3541 )];
3542 let venue_deltas = OrderBookDeltas::new(instrument_id, new_deltas.clone());
3543
3544 let original_bid = book.best_bid_price();
3545 let original_ask = book.best_ask_price();
3546
3547 let resolved =
3548 DydxDataClient::resolve_crossed_order_book(&mut book, venue_deltas, &instrument)
3549 .unwrap();
3550
3551 assert_eq!(resolved.deltas.len(), new_deltas.len());
3553 assert_eq!(book.best_bid_price(), original_bid);
3554 assert_eq!(book.best_ask_price(), original_ask);
3555 assert!(book.best_ask_price().unwrap() > book.best_bid_price().unwrap());
3556 }
3557
3558 #[tokio::test]
3559 async fn test_request_instruments_successful_fetch() {
3560 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3562 set_data_event_sender(sender);
3563
3564 let client_id = ClientId::from("DYDX-TEST");
3565 let config = DydxDataClientConfig::default();
3566 let http_client = DydxHttpClient::default();
3567 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3568
3569 let request = RequestInstruments::new(
3570 None,
3571 None,
3572 Some(client_id),
3573 Some(*DYDX_VENUE),
3574 UUID4::new(),
3575 get_atomic_clock_realtime().get_time_ns(),
3576 None,
3577 );
3578
3579 assert!(client.request_instruments(&request).is_ok());
3581
3582 let timeout = tokio::time::Duration::from_secs(5);
3584 let result = tokio::time::timeout(timeout, rx.recv()).await;
3585
3586 match result {
3587 Ok(Some(DataEvent::Response(resp))) => {
3588 if let DataResponse::Instruments(inst_resp) = resp {
3589 assert_eq!(inst_resp.correlation_id, request.request_id);
3591 assert_eq!(inst_resp.client_id, client_id);
3592 assert_eq!(inst_resp.venue, *DYDX_VENUE);
3593 assert!(inst_resp.start.is_none());
3594 assert!(inst_resp.end.is_none());
3595 }
3597 }
3598 Ok(Some(_)) => panic!("Expected InstrumentsResponse"),
3599 Ok(None) => panic!("Channel closed unexpectedly"),
3600 Err(_) => {
3601 println!("Test timed out - testnet may be unreachable");
3603 }
3604 }
3605 }
3606
3607 #[tokio::test]
3608 async fn test_request_instruments_empty_response_on_http_error() {
3609 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3611 set_data_event_sender(sender);
3612
3613 let client_id = ClientId::from("DYDX-ERROR-TEST");
3614 let config = DydxDataClientConfig {
3615 base_url_http: Some("http://invalid-url-does-not-exist.local".to_string()),
3616 ..Default::default()
3617 };
3618 let http_client = DydxHttpClient::new(
3619 config.base_url_http.clone(),
3620 config.http_timeout_secs,
3621 config.http_proxy_url.clone(),
3622 config.is_testnet,
3623 None,
3624 )
3625 .unwrap();
3626
3627 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3628
3629 let request = RequestInstruments::new(
3630 None,
3631 None,
3632 Some(client_id),
3633 Some(*DYDX_VENUE),
3634 UUID4::new(),
3635 get_atomic_clock_realtime().get_time_ns(),
3636 None,
3637 );
3638
3639 assert!(client.request_instruments(&request).is_ok());
3640
3641 let timeout = tokio::time::Duration::from_secs(3);
3643 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3644 tokio::time::timeout(timeout, rx.recv()).await
3645 {
3646 assert!(
3647 resp.data.is_empty(),
3648 "Expected empty instruments on HTTP error"
3649 );
3650 assert_eq!(resp.correlation_id, request.request_id);
3651 assert_eq!(resp.client_id, client_id);
3652 }
3653 }
3654
3655 #[tokio::test]
3656 async fn test_request_instruments_caching() {
3657 setup_test_env();
3659
3660 let client_id = ClientId::from("DYDX-CACHE-TEST");
3661 let config = DydxDataClientConfig::default();
3662 let http_client = DydxHttpClient::default();
3663 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3664
3665 let initial_cache_size = client.instruments.len();
3666
3667 let request = RequestInstruments::new(
3668 None,
3669 None,
3670 Some(client_id),
3671 Some(*DYDX_VENUE),
3672 UUID4::new(),
3673 get_atomic_clock_realtime().get_time_ns(),
3674 None,
3675 );
3676
3677 assert!(client.request_instruments(&request).is_ok());
3678
3679 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
3681
3682 let final_cache_size = client.instruments.len();
3684 assert!(final_cache_size >= initial_cache_size);
3687 }
3688
3689 #[tokio::test]
3690 async fn test_request_instruments_correlation_id_matching() {
3691 setup_test_env();
3693
3694 let client_id = ClientId::from("DYDX-CORR-TEST");
3695 let config = DydxDataClientConfig::default();
3696 let http_client = DydxHttpClient::default();
3697 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3698
3699 let request_id = UUID4::new();
3700 let request = RequestInstruments::new(
3701 None,
3702 None,
3703 Some(client_id),
3704 Some(*DYDX_VENUE),
3705 request_id,
3706 get_atomic_clock_realtime().get_time_ns(),
3707 None,
3708 );
3709
3710 assert!(client.request_instruments(&request).is_ok());
3712 }
3713
3714 #[tokio::test]
3715 async fn test_request_instruments_venue_assignment() {
3716 setup_test_env();
3718
3719 let client_id = ClientId::from("DYDX-VENUE-TEST");
3720 let config = DydxDataClientConfig::default();
3721 let http_client = DydxHttpClient::default();
3722 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3723
3724 assert_eq!(client.venue(), *DYDX_VENUE);
3725
3726 let request = RequestInstruments::new(
3727 None,
3728 None,
3729 Some(client_id),
3730 Some(*DYDX_VENUE),
3731 UUID4::new(),
3732 get_atomic_clock_realtime().get_time_ns(),
3733 None,
3734 );
3735
3736 assert!(client.request_instruments(&request).is_ok());
3737 }
3738
3739 #[tokio::test]
3740 async fn test_request_instruments_timestamp_handling() {
3741 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3743 set_data_event_sender(sender);
3744
3745 let client_id = ClientId::from("DYDX-TS-TEST");
3746 let config = DydxDataClientConfig::default();
3747 let http_client = DydxHttpClient::default();
3748 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3749
3750 let now = Utc::now();
3751 let start = Some(now - chrono::Duration::hours(24));
3752 let end = Some(now);
3753
3754 let request = RequestInstruments::new(
3755 start,
3756 end,
3757 Some(client_id),
3758 Some(*DYDX_VENUE),
3759 UUID4::new(),
3760 get_atomic_clock_realtime().get_time_ns(),
3761 None,
3762 );
3763
3764 assert!(client.request_instruments(&request).is_ok());
3765
3766 let timeout = tokio::time::Duration::from_secs(3);
3768 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3769 tokio::time::timeout(timeout, rx.recv()).await
3770 {
3771 assert!(resp.start.unwrap() > 0);
3773 assert!(resp.end.unwrap() > 0);
3774 assert!(resp.start.unwrap() <= resp.end.unwrap());
3775 assert!(resp.ts_init > 0);
3776 }
3777 }
3778
3779 #[tokio::test]
3780 async fn test_request_instruments_with_start_only() {
3781 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3783 set_data_event_sender(sender);
3784
3785 let client_id = ClientId::from("DYDX-TS-START-ONLY");
3786 let config = DydxDataClientConfig::default();
3787 let http_client = DydxHttpClient::default();
3788 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3789
3790 let now = Utc::now();
3791 let start = Some(now - chrono::Duration::hours(24));
3792
3793 let request = RequestInstruments::new(
3794 start,
3795 None,
3796 Some(client_id),
3797 Some(*DYDX_VENUE),
3798 UUID4::new(),
3799 get_atomic_clock_realtime().get_time_ns(),
3800 None,
3801 );
3802
3803 assert!(client.request_instruments(&request).is_ok());
3804
3805 let timeout = tokio::time::Duration::from_secs(3);
3806 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3807 tokio::time::timeout(timeout, rx.recv()).await
3808 {
3809 assert!(resp.start.is_some());
3810 assert!(resp.end.is_none());
3811 assert!(resp.ts_init > 0);
3812 }
3813 }
3814
3815 #[tokio::test]
3816 async fn test_request_instruments_with_end_only() {
3817 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3819 set_data_event_sender(sender);
3820
3821 let client_id = ClientId::from("DYDX-TS-END-ONLY");
3822 let config = DydxDataClientConfig::default();
3823 let http_client = DydxHttpClient::default();
3824 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3825
3826 let now = Utc::now();
3827 let end = Some(now);
3828
3829 let request = RequestInstruments::new(
3830 None,
3831 end,
3832 Some(client_id),
3833 Some(*DYDX_VENUE),
3834 UUID4::new(),
3835 get_atomic_clock_realtime().get_time_ns(),
3836 None,
3837 );
3838
3839 assert!(client.request_instruments(&request).is_ok());
3840
3841 let timeout = tokio::time::Duration::from_secs(3);
3842 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3843 tokio::time::timeout(timeout, rx.recv()).await
3844 {
3845 assert!(resp.start.is_none());
3846 assert!(resp.end.is_some());
3847 assert!(resp.ts_init > 0);
3848 }
3849 }
3850
3851 #[tokio::test]
3852 async fn test_request_instruments_client_id_fallback() {
3853 setup_test_env();
3855
3856 let client_id = ClientId::from("DYDX-FALLBACK-TEST");
3857 let config = DydxDataClientConfig::default();
3858 let http_client = DydxHttpClient::default();
3859 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3860
3861 let request = RequestInstruments::new(
3862 None,
3863 None,
3864 None, Some(*DYDX_VENUE),
3866 UUID4::new(),
3867 get_atomic_clock_realtime().get_time_ns(),
3868 None,
3869 );
3870
3871 assert!(client.request_instruments(&request).is_ok());
3873 }
3874
3875 #[tokio::test]
3876 async fn test_request_instruments_with_params() {
3877 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3879 set_data_event_sender(sender);
3880
3881 let client_id = ClientId::from("DYDX-PARAMS-TEST");
3882 let config = DydxDataClientConfig::default();
3883 let http_client = DydxHttpClient::default();
3884 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3885
3886 let mut params_map = IndexMap::new();
3888 params_map.insert("test_key".to_string(), "test_value".to_string());
3889
3890 let request = RequestInstruments::new(
3891 None,
3892 None,
3893 Some(client_id),
3894 Some(*DYDX_VENUE),
3895 UUID4::new(),
3896 get_atomic_clock_realtime().get_time_ns(),
3897 Some(params_map),
3898 );
3899
3900 assert!(client.request_instruments(&request).is_ok());
3901
3902 let timeout = tokio::time::Duration::from_secs(3);
3904 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3905 tokio::time::timeout(timeout, rx.recv()).await
3906 {
3907 assert_eq!(resp.client_id, client_id);
3909 let params = resp
3910 .params
3911 .expect("expected params to be present in InstrumentsResponse");
3912 assert_eq!(
3913 params.get("test_key").map(String::as_str),
3914 Some("test_value")
3915 );
3916 }
3917 }
3918
3919 #[tokio::test]
3920 async fn test_request_instruments_with_start_and_end_range() {
3921 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3923 set_data_event_sender(sender);
3924
3925 let client_id = ClientId::from("DYDX-START-END-RANGE");
3926 let config = DydxDataClientConfig::default();
3927 let http_client = DydxHttpClient::default();
3928 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
3929
3930 let now = Utc::now();
3931 let start = Some(now - chrono::Duration::hours(48));
3932 let end = Some(now - chrono::Duration::hours(24));
3933
3934 let request = RequestInstruments::new(
3935 start,
3936 end,
3937 Some(client_id),
3938 Some(*DYDX_VENUE),
3939 UUID4::new(),
3940 get_atomic_clock_realtime().get_time_ns(),
3941 None,
3942 );
3943
3944 assert!(client.request_instruments(&request).is_ok());
3945
3946 let timeout = tokio::time::Duration::from_secs(3);
3947 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
3948 tokio::time::timeout(timeout, rx.recv()).await
3949 {
3950 assert!(
3952 resp.start.is_some(),
3953 "start timestamp should be present when provided"
3954 );
3955 assert!(
3956 resp.end.is_some(),
3957 "end timestamp should be present when provided"
3958 );
3959 assert!(resp.ts_init > 0, "ts_init should always be set");
3960
3961 if let (Some(start_ts), Some(end_ts)) = (resp.start, resp.end) {
3963 assert!(
3964 start_ts < end_ts,
3965 "start timestamp should be before end timestamp"
3966 );
3967 }
3968 }
3969 }
3970
3971 #[tokio::test]
3972 async fn test_request_instruments_different_client_ids() {
3973 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
3975 set_data_event_sender(sender);
3976
3977 let timeout = tokio::time::Duration::from_secs(3);
3978
3979 let client_id_1 = ClientId::from("DYDX-CLIENT-1");
3981 let config1 = DydxDataClientConfig::default();
3982 let http_client1 = DydxHttpClient::default();
3983 let client1 = DydxDataClient::new(client_id_1, config1, http_client1, None).unwrap();
3984
3985 let request1 = RequestInstruments::new(
3986 None,
3987 None,
3988 Some(client_id_1),
3989 Some(*DYDX_VENUE),
3990 UUID4::new(),
3991 get_atomic_clock_realtime().get_time_ns(),
3992 None,
3993 );
3994
3995 assert!(client1.request_instruments(&request1).is_ok());
3996
3997 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp1)))) =
3998 tokio::time::timeout(timeout, rx.recv()).await
3999 {
4000 assert_eq!(
4001 resp1.client_id, client_id_1,
4002 "Response should contain client_id_1"
4003 );
4004 }
4005
4006 let client_id_2 = ClientId::from("DYDX-CLIENT-2");
4008 let config2 = DydxDataClientConfig::default();
4009 let http_client2 = DydxHttpClient::default();
4010 let client2 = DydxDataClient::new(client_id_2, config2, http_client2, None).unwrap();
4011
4012 let request2 = RequestInstruments::new(
4013 None,
4014 None,
4015 Some(client_id_2),
4016 Some(*DYDX_VENUE),
4017 UUID4::new(),
4018 get_atomic_clock_realtime().get_time_ns(),
4019 None,
4020 );
4021
4022 assert!(client2.request_instruments(&request2).is_ok());
4023
4024 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp2)))) =
4025 tokio::time::timeout(timeout, rx.recv()).await
4026 {
4027 assert_eq!(
4028 resp2.client_id, client_id_2,
4029 "Response should contain client_id_2"
4030 );
4031 assert_ne!(
4032 resp2.client_id, client_id_1,
4033 "Different clients should have different client_ids"
4034 );
4035 }
4036 }
4037
4038 #[tokio::test]
4039 async fn test_request_instruments_no_timestamps() {
4040 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4042 set_data_event_sender(sender);
4043
4044 let client_id = ClientId::from("DYDX-NO-TIMESTAMPS");
4045 let config = DydxDataClientConfig::default();
4046 let http_client = DydxHttpClient::default();
4047 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4048
4049 let request = RequestInstruments::new(
4050 None, None, Some(client_id),
4053 Some(*DYDX_VENUE),
4054 UUID4::new(),
4055 get_atomic_clock_realtime().get_time_ns(),
4056 None,
4057 );
4058
4059 assert!(client.request_instruments(&request).is_ok());
4060
4061 let timeout = tokio::time::Duration::from_secs(5);
4062 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
4063 tokio::time::timeout(timeout, rx.recv()).await
4064 {
4065 assert!(
4067 resp.start.is_none(),
4068 "start should be None when not provided"
4069 );
4070 assert!(resp.end.is_none(), "end should be None when not provided");
4071
4072 assert_eq!(resp.venue, *DYDX_VENUE);
4074 assert_eq!(resp.client_id, client_id);
4075 assert!(resp.ts_init > 0);
4076 }
4077 }
4078
4079 #[tokio::test]
4080 async fn test_request_instrument_cache_hit() {
4081 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4083 set_data_event_sender(sender);
4084
4085 let client_id = ClientId::from("DYDX-CACHE-HIT");
4086 let config = DydxDataClientConfig::default();
4087 let http_client = DydxHttpClient::default();
4088 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4089
4090 let instrument = create_test_instrument_any();
4092 let instrument_id = instrument.id();
4093 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4094 client.instruments.insert(symbol_key, instrument.clone());
4095
4096 let request = RequestInstrument::new(
4097 instrument_id,
4098 None,
4099 None,
4100 Some(client_id),
4101 UUID4::new(),
4102 get_atomic_clock_realtime().get_time_ns(),
4103 None,
4104 );
4105
4106 assert!(client.request_instrument(&request).is_ok());
4107
4108 let timeout = tokio::time::Duration::from_millis(500);
4110 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4111 tokio::time::timeout(timeout, rx.recv()).await
4112 {
4113 assert_eq!(resp.instrument_id, instrument_id);
4114 assert_eq!(resp.client_id, client_id);
4115 assert_eq!(resp.data.id(), instrument_id);
4116 }
4117 }
4118
4119 #[tokio::test]
4120 async fn test_request_instrument_cache_miss() {
4121 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4123 set_data_event_sender(sender);
4124
4125 let client_id = ClientId::from("DYDX-CACHE-MISS");
4126 let config = DydxDataClientConfig::default();
4127 let http_client = DydxHttpClient::default();
4128 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4129
4130 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
4131
4132 let request = RequestInstrument::new(
4133 instrument_id,
4134 None,
4135 None,
4136 Some(client_id),
4137 UUID4::new(),
4138 get_atomic_clock_realtime().get_time_ns(),
4139 None,
4140 );
4141
4142 assert!(client.request_instrument(&request).is_ok());
4143
4144 let timeout = tokio::time::Duration::from_secs(5);
4146 let result = tokio::time::timeout(timeout, rx.recv()).await;
4147
4148 match result {
4150 Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) => {
4151 assert_eq!(resp.instrument_id, instrument_id);
4152 assert_eq!(resp.client_id, client_id);
4153 }
4154 Ok(Some(_)) => panic!("Expected InstrumentResponse"),
4155 Ok(None) => panic!("Channel closed unexpectedly"),
4156 Err(_) => {
4157 println!("Test timed out - testnet may be unreachable");
4158 }
4159 }
4160 }
4161
4162 #[tokio::test]
4163 async fn test_request_instrument_not_found() {
4164 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4166 set_data_event_sender(sender);
4167
4168 let client_id = ClientId::from("DYDX-NOT-FOUND");
4169 let config = DydxDataClientConfig {
4170 base_url_http: Some("http://invalid-url.local".to_string()),
4171 ..Default::default()
4172 };
4173 let http_client = DydxHttpClient::new(
4174 config.base_url_http.clone(),
4175 config.http_timeout_secs,
4176 config.http_proxy_url.clone(),
4177 config.is_testnet,
4178 None,
4179 )
4180 .unwrap();
4181
4182 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4183
4184 let instrument_id = InstrumentId::from("INVALID-SYMBOL.DYDX");
4185
4186 let request = RequestInstrument::new(
4187 instrument_id,
4188 None,
4189 None,
4190 Some(client_id),
4191 UUID4::new(),
4192 get_atomic_clock_realtime().get_time_ns(),
4193 None,
4194 );
4195
4196 assert!(client.request_instrument(&request).is_ok());
4198
4199 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
4201 }
4202
4203 #[tokio::test]
4204 async fn test_request_instrument_bulk_caching() {
4205 setup_test_env();
4207
4208 let client_id = ClientId::from("DYDX-BULK-CACHE");
4209 let config = DydxDataClientConfig::default();
4210 let http_client = DydxHttpClient::default();
4211 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4212
4213 let initial_cache_size = client.instruments.len();
4214
4215 let instrument_id = InstrumentId::from("ETH-USD-PERP.DYDX");
4216
4217 let request = RequestInstrument::new(
4218 instrument_id,
4219 None,
4220 None,
4221 Some(client_id),
4222 UUID4::new(),
4223 get_atomic_clock_realtime().get_time_ns(),
4224 None,
4225 );
4226
4227 assert!(client.request_instrument(&request).is_ok());
4228
4229 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
4231
4232 let final_cache_size = client.instruments.len();
4234 assert!(final_cache_size >= initial_cache_size);
4235 }
4237
4238 #[tokio::test]
4239 async fn test_request_instrument_correlation_id() {
4240 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4242 set_data_event_sender(sender);
4243
4244 let client_id = ClientId::from("DYDX-CORR-ID");
4245 let config = DydxDataClientConfig::default();
4246 let http_client = DydxHttpClient::default();
4247 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4248
4249 let instrument = create_test_instrument_any();
4251 let instrument_id = instrument.id();
4252 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4253 client.instruments.insert(symbol_key, instrument.clone());
4254
4255 let request_id = UUID4::new();
4256 let request = RequestInstrument::new(
4257 instrument_id,
4258 None,
4259 None,
4260 Some(client_id),
4261 request_id,
4262 get_atomic_clock_realtime().get_time_ns(),
4263 None,
4264 );
4265
4266 assert!(client.request_instrument(&request).is_ok());
4267
4268 let timeout = tokio::time::Duration::from_millis(500);
4270 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4271 tokio::time::timeout(timeout, rx.recv()).await
4272 {
4273 assert_eq!(resp.correlation_id, request_id);
4274 }
4275 }
4276
4277 #[tokio::test]
4278 async fn test_request_instrument_response_format_boxed() {
4279 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4281 set_data_event_sender(sender);
4282
4283 let client_id = ClientId::from("DYDX-BOXED");
4284 let config = DydxDataClientConfig::default();
4285 let http_client = DydxHttpClient::default();
4286 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4287
4288 let instrument = create_test_instrument_any();
4290 let instrument_id = instrument.id();
4291 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4292 client.instruments.insert(symbol_key, instrument.clone());
4293
4294 let request = RequestInstrument::new(
4295 instrument_id,
4296 None,
4297 None,
4298 Some(client_id),
4299 UUID4::new(),
4300 get_atomic_clock_realtime().get_time_ns(),
4301 None,
4302 );
4303
4304 assert!(client.request_instrument(&request).is_ok());
4305
4306 let timeout = tokio::time::Duration::from_millis(500);
4308 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(boxed_resp)))) =
4309 tokio::time::timeout(timeout, rx.recv()).await
4310 {
4311 assert_eq!(boxed_resp.instrument_id, instrument_id);
4313 assert_eq!(boxed_resp.client_id, client_id);
4314 assert!(boxed_resp.start.is_none());
4315 assert!(boxed_resp.end.is_none());
4316 assert!(boxed_resp.ts_init > 0);
4317 }
4318 }
4319
4320 #[rstest]
4321 fn test_request_instrument_symbol_extraction() {
4322 setup_test_env();
4324
4325 let client_id = ClientId::from("DYDX-SYMBOL");
4326 let config = DydxDataClientConfig::default();
4327 let http_client = DydxHttpClient::default();
4328 let _client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4329
4330 let test_cases = vec![
4333 ("BTC-USD-PERP.DYDX", "BTC-USD-PERP"),
4334 ("ETH-USD-PERP.DYDX", "ETH-USD-PERP"),
4335 ("SOL-USD-PERP.DYDX", "SOL-USD-PERP"),
4336 ];
4337
4338 for (instrument_id_str, expected_symbol) in test_cases {
4339 let instrument_id = InstrumentId::from(instrument_id_str);
4340 let symbol = Ustr::from(instrument_id.symbol.as_str());
4341 assert_eq!(symbol.as_str(), expected_symbol);
4342 }
4343 }
4344
4345 #[tokio::test]
4346 async fn test_request_instrument_client_id_fallback() {
4347 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4349 set_data_event_sender(sender);
4350
4351 let client_id = ClientId::from("DYDX-FALLBACK");
4352 let config = DydxDataClientConfig::default();
4353 let http_client = DydxHttpClient::default();
4354 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4355
4356 let instrument = create_test_instrument_any();
4358 let instrument_id = instrument.id();
4359 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4360 client.instruments.insert(symbol_key, instrument.clone());
4361
4362 let request = RequestInstrument::new(
4363 instrument_id,
4364 None,
4365 None,
4366 None, UUID4::new(),
4368 get_atomic_clock_realtime().get_time_ns(),
4369 None,
4370 );
4371
4372 assert!(client.request_instrument(&request).is_ok());
4373
4374 let timeout = tokio::time::Duration::from_millis(500);
4376 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
4377 tokio::time::timeout(timeout, rx.recv()).await
4378 {
4379 assert_eq!(resp.client_id, client_id);
4380 }
4381 }
4382
4383 #[tokio::test]
4384 async fn test_request_trades_success_with_limit_and_symbol_conversion() {
4385 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4386 set_data_event_sender(sender);
4387
4388 let created_at = Utc::now();
4389
4390 let http_trade = crate::http::models::Trade {
4391 id: "trade-1".to_string(),
4392 side: OrderSide::Buy,
4393 size: dec!(1.5),
4394 price: dec!(100.25),
4395 created_at,
4396 created_at_height: 1,
4397 trade_type: crate::common::enums::DydxTradeType::Limit,
4398 };
4399
4400 let trades_response = crate::http::models::TradesResponse {
4401 trades: vec![http_trade],
4402 };
4403
4404 let state = TradesTestState {
4405 response: Arc::new(trades_response),
4406 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4407 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4408 };
4409
4410 let addr = start_trades_test_server(state.clone()).await;
4411 let base_url = format!("http://{addr}");
4412
4413 let client_id = ClientId::from("DYDX-TRADES-SUCCESS");
4414 let config = DydxDataClientConfig {
4415 base_url_http: Some(base_url),
4416 is_testnet: true,
4417 ..Default::default()
4418 };
4419
4420 let http_client = DydxHttpClient::new(
4421 config.base_url_http.clone(),
4422 config.http_timeout_secs,
4423 config.http_proxy_url.clone(),
4424 config.is_testnet,
4425 None,
4426 )
4427 .unwrap();
4428
4429 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4430
4431 let instrument = create_test_instrument_any();
4432 let instrument_id = instrument.id();
4433 let price_precision = instrument.price_precision();
4434 let size_precision = instrument.size_precision();
4435 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4436 client.instruments.insert(symbol_key, instrument);
4437
4438 let request_id = UUID4::new();
4439 let now = Utc::now();
4440 let start = Some(now - chrono::Duration::seconds(10));
4441 let end = Some(now + chrono::Duration::seconds(10));
4442 let limit = std::num::NonZeroUsize::new(100).unwrap();
4443
4444 let request = RequestTrades::new(
4445 instrument_id,
4446 start,
4447 end,
4448 Some(limit),
4449 Some(client_id),
4450 request_id,
4451 get_atomic_clock_realtime().get_time_ns(),
4452 None,
4453 );
4454
4455 assert!(client.request_trades(&request).is_ok());
4456
4457 let timeout = tokio::time::Duration::from_secs(1);
4458 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4459 tokio::time::timeout(timeout, rx.recv()).await
4460 {
4461 assert_eq!(resp.correlation_id, request_id);
4462 assert_eq!(resp.client_id, client_id);
4463 assert_eq!(resp.instrument_id, instrument_id);
4464 assert_eq!(resp.data.len(), 1);
4465
4466 let tick = &resp.data[0];
4467 assert_eq!(tick.instrument_id, instrument_id);
4468 assert_eq!(tick.price, Price::new(100.25, price_precision));
4469 assert_eq!(tick.size, Quantity::new(1.5, size_precision));
4470 assert_eq!(tick.trade_id.to_string(), "trade-1");
4471
4472 use nautilus_model::enums::AggressorSide;
4473 assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
4474 } else {
4475 panic!("did not receive trades response in time");
4476 }
4477
4478 let last_ticker = state.last_ticker.lock().await.clone();
4480 assert_eq!(last_ticker.as_deref(), Some("BTC-USD"));
4481
4482 let last_limit = *state.last_limit.lock().await;
4483 assert_eq!(last_limit, Some(Some(100)));
4484 }
4485
4486 #[tokio::test]
4487 async fn test_request_trades_empty_response_and_no_limit() {
4488 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4489 set_data_event_sender(sender);
4490
4491 let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4492
4493 let state = TradesTestState {
4494 response: Arc::new(trades_response),
4495 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4496 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4497 };
4498
4499 let addr = start_trades_test_server(state.clone()).await;
4500 let base_url = format!("http://{addr}");
4501
4502 let client_id = ClientId::from("DYDX-TRADES-EMPTY");
4503 let config = DydxDataClientConfig {
4504 base_url_http: Some(base_url),
4505 is_testnet: true,
4506 ..Default::default()
4507 };
4508
4509 let http_client = DydxHttpClient::new(
4510 config.base_url_http.clone(),
4511 config.http_timeout_secs,
4512 config.http_proxy_url.clone(),
4513 config.is_testnet,
4514 None,
4515 )
4516 .unwrap();
4517
4518 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4519
4520 let instrument = create_test_instrument_any();
4521 let instrument_id = instrument.id();
4522 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4523 client.instruments.insert(symbol_key, instrument);
4524
4525 let request_id = UUID4::new();
4526
4527 let request = RequestTrades::new(
4528 instrument_id,
4529 None,
4530 None,
4531 None, Some(client_id),
4533 request_id,
4534 get_atomic_clock_realtime().get_time_ns(),
4535 None,
4536 );
4537
4538 assert!(client.request_trades(&request).is_ok());
4539
4540 let timeout = tokio::time::Duration::from_secs(1);
4541 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4542 tokio::time::timeout(timeout, rx.recv()).await
4543 {
4544 assert_eq!(resp.correlation_id, request_id);
4545 assert_eq!(resp.client_id, client_id);
4546 assert_eq!(resp.instrument_id, instrument_id);
4547 assert!(resp.data.is_empty());
4548 } else {
4549 panic!("did not receive trades response in time");
4550 }
4551
4552 let last_limit = *state.last_limit.lock().await;
4554 assert_eq!(last_limit, Some(None));
4555 }
4556
4557 #[tokio::test]
4558 async fn test_request_trades_timestamp_filtering() {
4559 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4560 set_data_event_sender(sender);
4561
4562 let now = Utc::now();
4563 let trade_before = crate::http::models::Trade {
4564 id: "before".to_string(),
4565 side: OrderSide::Buy,
4566 size: dec!(1.0),
4567 price: dec!(100.0),
4568 created_at: now - chrono::Duration::seconds(60),
4569 created_at_height: 1,
4570 trade_type: crate::common::enums::DydxTradeType::Limit,
4571 };
4572 let trade_inside = crate::http::models::Trade {
4573 id: "inside".to_string(),
4574 side: OrderSide::Sell,
4575 size: dec!(2.0),
4576 price: dec!(101.0),
4577 created_at: now,
4578 created_at_height: 2,
4579 trade_type: crate::common::enums::DydxTradeType::Limit,
4580 };
4581 let trade_after = crate::http::models::Trade {
4582 id: "after".to_string(),
4583 side: OrderSide::Buy,
4584 size: dec!(3.0),
4585 price: dec!(102.0),
4586 created_at: now + chrono::Duration::seconds(60),
4587 created_at_height: 3,
4588 trade_type: crate::common::enums::DydxTradeType::Limit,
4589 };
4590
4591 let trades_response = crate::http::models::TradesResponse {
4592 trades: vec![trade_before, trade_inside.clone(), trade_after],
4593 };
4594
4595 let state = TradesTestState {
4596 response: Arc::new(trades_response),
4597 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4598 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4599 };
4600
4601 let addr = start_trades_test_server(state).await;
4602 let base_url = format!("http://{addr}");
4603
4604 let client_id = ClientId::from("DYDX-TRADES-FILTER");
4605 let config = DydxDataClientConfig {
4606 base_url_http: Some(base_url),
4607 is_testnet: true,
4608 ..Default::default()
4609 };
4610
4611 let http_client = DydxHttpClient::new(
4612 config.base_url_http.clone(),
4613 config.http_timeout_secs,
4614 config.http_proxy_url.clone(),
4615 config.is_testnet,
4616 None,
4617 )
4618 .unwrap();
4619
4620 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4621
4622 let instrument = create_test_instrument_any();
4623 let instrument_id = instrument.id();
4624 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4625 client.instruments.insert(symbol_key, instrument);
4626
4627 let request_id = UUID4::new();
4628
4629 let start = Some(now - chrono::Duration::seconds(10));
4631 let end = Some(now + chrono::Duration::seconds(10));
4632
4633 let request = RequestTrades::new(
4634 instrument_id,
4635 start,
4636 end,
4637 None,
4638 Some(client_id),
4639 request_id,
4640 get_atomic_clock_realtime().get_time_ns(),
4641 None,
4642 );
4643
4644 assert!(client.request_trades(&request).is_ok());
4645
4646 let timeout = tokio::time::Duration::from_secs(1);
4647 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4648 tokio::time::timeout(timeout, rx.recv()).await
4649 {
4650 assert_eq!(resp.correlation_id, request_id);
4651 assert_eq!(resp.client_id, client_id);
4652 assert_eq!(resp.instrument_id, instrument_id);
4653 assert_eq!(resp.data.len(), 1);
4654
4655 let tick = &resp.data[0];
4656 assert_eq!(tick.trade_id.to_string(), "inside");
4657 assert_eq!(tick.price.as_decimal(), dec!(101.0));
4658 } else {
4659 panic!("did not receive trades response in time");
4660 }
4661 }
4662
4663 #[tokio::test]
4664 async fn test_request_trades_correlation_id_matching() {
4665 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4667 set_data_event_sender(sender);
4668
4669 let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4670
4671 let state = TradesTestState {
4672 response: Arc::new(trades_response),
4673 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4674 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4675 };
4676
4677 let addr = start_trades_test_server(state).await;
4678 let base_url = format!("http://{addr}");
4679
4680 let client_id = ClientId::from("DYDX-TRADES-CORR");
4681 let config = DydxDataClientConfig {
4682 base_url_http: Some(base_url),
4683 is_testnet: true,
4684 ..Default::default()
4685 };
4686
4687 let http_client = DydxHttpClient::new(
4688 config.base_url_http.clone(),
4689 config.http_timeout_secs,
4690 config.http_proxy_url.clone(),
4691 config.is_testnet,
4692 None,
4693 )
4694 .unwrap();
4695
4696 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4697
4698 let instrument = create_test_instrument_any();
4699 let instrument_id = instrument.id();
4700 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4701 client.instruments.insert(symbol_key, instrument);
4702
4703 let request_id = UUID4::new();
4704 let request = RequestTrades::new(
4705 instrument_id,
4706 None,
4707 None,
4708 None,
4709 Some(client_id),
4710 request_id,
4711 get_atomic_clock_realtime().get_time_ns(),
4712 None,
4713 );
4714
4715 assert!(client.request_trades(&request).is_ok());
4716
4717 let timeout = tokio::time::Duration::from_millis(500);
4718 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4719 tokio::time::timeout(timeout, rx.recv()).await
4720 {
4721 assert_eq!(resp.correlation_id, request_id);
4722 }
4723 }
4724
4725 #[tokio::test]
4726 async fn test_request_trades_response_format() {
4727 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4729 set_data_event_sender(sender);
4730
4731 let created_at = Utc::now();
4732 let http_trade = crate::http::models::Trade {
4733 id: "format-test".to_string(),
4734 side: OrderSide::Sell,
4735 size: dec!(5.0),
4736 price: dec!(200.0),
4737 created_at,
4738 created_at_height: 100,
4739 trade_type: crate::common::enums::DydxTradeType::Limit,
4740 };
4741
4742 let trades_response = crate::http::models::TradesResponse {
4743 trades: vec![http_trade],
4744 };
4745
4746 let state = TradesTestState {
4747 response: Arc::new(trades_response),
4748 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4749 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4750 };
4751
4752 let addr = start_trades_test_server(state).await;
4753 let base_url = format!("http://{addr}");
4754
4755 let client_id = ClientId::from("DYDX-TRADES-FORMAT");
4756 let config = DydxDataClientConfig {
4757 base_url_http: Some(base_url),
4758 is_testnet: true,
4759 ..Default::default()
4760 };
4761
4762 let http_client = DydxHttpClient::new(
4763 config.base_url_http.clone(),
4764 config.http_timeout_secs,
4765 config.http_proxy_url.clone(),
4766 config.is_testnet,
4767 None,
4768 )
4769 .unwrap();
4770
4771 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4772
4773 let instrument = create_test_instrument_any();
4774 let instrument_id = instrument.id();
4775 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4776 client.instruments.insert(symbol_key, instrument);
4777
4778 let request = RequestTrades::new(
4779 instrument_id,
4780 None,
4781 None,
4782 None,
4783 Some(client_id),
4784 UUID4::new(),
4785 get_atomic_clock_realtime().get_time_ns(),
4786 None,
4787 );
4788
4789 assert!(client.request_trades(&request).is_ok());
4790
4791 let timeout = tokio::time::Duration::from_millis(500);
4792 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4793 tokio::time::timeout(timeout, rx.recv()).await
4794 {
4795 assert_eq!(resp.client_id, client_id);
4797 assert_eq!(resp.instrument_id, instrument_id);
4798 assert!(resp.data.len() == 1);
4799 assert!(resp.ts_init > 0);
4800
4801 let tick = &resp.data[0];
4803 assert_eq!(tick.instrument_id, instrument_id);
4804 assert!(tick.ts_event > 0);
4805 assert!(tick.ts_init > 0);
4806 }
4807 }
4808
4809 #[tokio::test]
4810 async fn test_request_trades_no_instrument_in_cache() {
4811 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4813 set_data_event_sender(sender);
4814
4815 let client_id = ClientId::from("DYDX-TRADES-NO-INST");
4816 let config = DydxDataClientConfig::default();
4817 let http_client = DydxHttpClient::default();
4818 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4819
4820 let instrument_id = InstrumentId::from("UNKNOWN-SYMBOL.DYDX");
4822
4823 let request = RequestTrades::new(
4824 instrument_id,
4825 None,
4826 None,
4827 None,
4828 Some(client_id),
4829 UUID4::new(),
4830 get_atomic_clock_realtime().get_time_ns(),
4831 None,
4832 );
4833
4834 assert!(client.request_trades(&request).is_ok());
4835
4836 let timeout = tokio::time::Duration::from_millis(500);
4838 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
4839 tokio::time::timeout(timeout, rx.recv()).await
4840 {
4841 assert!(resp.data.is_empty());
4842 }
4843 }
4844
4845 #[tokio::test]
4846 async fn test_request_trades_limit_parameter() {
4847 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4849 set_data_event_sender(sender);
4850
4851 let trades_response = crate::http::models::TradesResponse { trades: vec![] };
4852
4853 let state = TradesTestState {
4854 response: Arc::new(trades_response),
4855 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
4856 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
4857 };
4858
4859 let addr = start_trades_test_server(state.clone()).await;
4860 let base_url = format!("http://{addr}");
4861
4862 let client_id = ClientId::from("DYDX-TRADES-LIMIT");
4863 let config = DydxDataClientConfig {
4864 base_url_http: Some(base_url),
4865 is_testnet: true,
4866 ..Default::default()
4867 };
4868
4869 let http_client = DydxHttpClient::new(
4870 config.base_url_http.clone(),
4871 config.http_timeout_secs,
4872 config.http_proxy_url.clone(),
4873 config.is_testnet,
4874 None,
4875 )
4876 .unwrap();
4877
4878 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4879
4880 let instrument = create_test_instrument_any();
4881 let instrument_id = instrument.id();
4882 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
4883 client.instruments.insert(symbol_key, instrument);
4884
4885 let limit = std::num::NonZeroUsize::new(500).unwrap();
4887 let request = RequestTrades::new(
4888 instrument_id,
4889 None,
4890 None,
4891 Some(limit),
4892 Some(client_id),
4893 UUID4::new(),
4894 get_atomic_clock_realtime().get_time_ns(),
4895 None,
4896 );
4897
4898 assert!(client.request_trades(&request).is_ok());
4899 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
4900
4901 let last_limit = *state.last_limit.lock().await;
4903 assert_eq!(last_limit, Some(Some(500)));
4904 }
4905
4906 #[rstest]
4907 fn test_request_trades_symbol_conversion() {
4908 setup_test_env();
4910
4911 let client_id = ClientId::from("DYDX-SYMBOL-CONV");
4912 let config = DydxDataClientConfig::default();
4913 let http_client = DydxHttpClient::default();
4914 let _client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4915
4916 let test_cases = vec![
4918 ("BTC-USD-PERP.DYDX", "BTC-USD"),
4919 ("ETH-USD-PERP.DYDX", "ETH-USD"),
4920 ("SOL-USD-PERP.DYDX", "SOL-USD"),
4921 ];
4922
4923 for (instrument_id_str, expected_ticker) in test_cases {
4924 let instrument_id = InstrumentId::from(instrument_id_str);
4925 let ticker = instrument_id
4926 .symbol
4927 .as_str()
4928 .trim_end_matches("-PERP")
4929 .to_string();
4930 assert_eq!(ticker, expected_ticker);
4931 }
4932 }
4933
4934 #[tokio::test]
4935 async fn test_http_404_handling() {
4936 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4938 set_data_event_sender(sender);
4939
4940 let client_id = ClientId::from("DYDX-404");
4941 let config = DydxDataClientConfig {
4942 base_url_http: Some("http://localhost:1/nonexistent".to_string()),
4943 http_timeout_secs: Some(1),
4944 ..Default::default()
4945 };
4946
4947 let http_client = DydxHttpClient::new(
4948 config.base_url_http.clone(),
4949 config.http_timeout_secs,
4950 config.http_proxy_url.clone(),
4951 config.is_testnet,
4952 None,
4953 )
4954 .unwrap();
4955
4956 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
4957
4958 let request = RequestInstruments::new(
4959 None,
4960 None,
4961 Some(client_id),
4962 Some(*DYDX_VENUE),
4963 UUID4::new(),
4964 get_atomic_clock_realtime().get_time_ns(),
4965 None,
4966 );
4967
4968 assert!(client.request_instruments(&request).is_ok());
4969
4970 let timeout = tokio::time::Duration::from_secs(2);
4972 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
4973 tokio::time::timeout(timeout, rx.recv()).await
4974 {
4975 assert!(resp.data.is_empty(), "Expected empty response on 404");
4976 }
4977 }
4978
4979 #[tokio::test]
4980 async fn test_http_500_handling() {
4981 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
4983 set_data_event_sender(sender);
4984
4985 let client_id = ClientId::from("DYDX-500");
4986 let config = DydxDataClientConfig {
4987 base_url_http: Some("http://httpstat.us/500".to_string()),
4988 http_timeout_secs: Some(2),
4989 ..Default::default()
4990 };
4991
4992 let http_client = DydxHttpClient::new(
4993 config.base_url_http.clone(),
4994 config.http_timeout_secs,
4995 config.http_proxy_url.clone(),
4996 config.is_testnet,
4997 None,
4998 )
4999 .unwrap();
5000
5001 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5002
5003 let instrument = create_test_instrument_any();
5004 let instrument_id = instrument.id();
5005 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5006 client.instruments.insert(symbol_key, instrument);
5007
5008 let request = RequestTrades::new(
5009 instrument_id,
5010 None,
5011 None,
5012 None,
5013 Some(client_id),
5014 UUID4::new(),
5015 get_atomic_clock_realtime().get_time_ns(),
5016 None,
5017 );
5018
5019 assert!(client.request_trades(&request).is_ok());
5020
5021 let timeout = tokio::time::Duration::from_secs(3);
5023 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5024 tokio::time::timeout(timeout, rx.recv()).await
5025 {
5026 assert!(resp.data.is_empty(), "Expected empty response on 500");
5027 }
5028 }
5029
5030 #[tokio::test]
5031 async fn test_network_timeout_handling() {
5032 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5034 set_data_event_sender(sender);
5035
5036 let client_id = ClientId::from("DYDX-TIMEOUT");
5037 let config = DydxDataClientConfig {
5038 base_url_http: Some("http://10.255.255.1:81".to_string()), http_timeout_secs: Some(1), ..Default::default()
5041 };
5042
5043 let http_client = DydxHttpClient::new(
5044 config.base_url_http.clone(),
5045 config.http_timeout_secs,
5046 config.http_proxy_url.clone(),
5047 config.is_testnet,
5048 None,
5049 )
5050 .unwrap();
5051
5052 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5053
5054 let request = RequestInstruments::new(
5055 None,
5056 None,
5057 Some(client_id),
5058 Some(*DYDX_VENUE),
5059 UUID4::new(),
5060 get_atomic_clock_realtime().get_time_ns(),
5061 None,
5062 );
5063
5064 assert!(client.request_instruments(&request).is_ok());
5065
5066 let timeout = tokio::time::Duration::from_secs(3);
5068 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5069 tokio::time::timeout(timeout, rx.recv()).await
5070 {
5071 assert!(resp.data.is_empty(), "Expected empty response on timeout");
5072 }
5073 }
5074
5075 #[tokio::test]
5076 async fn test_connection_refused_handling() {
5077 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5079 set_data_event_sender(sender);
5080
5081 let client_id = ClientId::from("DYDX-REFUSED");
5082 let config = DydxDataClientConfig {
5083 base_url_http: Some("http://localhost:9999".to_string()), http_timeout_secs: Some(1),
5085 ..Default::default()
5086 };
5087
5088 let http_client = DydxHttpClient::new(
5089 config.base_url_http.clone(),
5090 config.http_timeout_secs,
5091 config.http_proxy_url.clone(),
5092 config.is_testnet,
5093 None,
5094 )
5095 .unwrap();
5096
5097 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5098
5099 let instrument = create_test_instrument_any();
5100 let instrument_id = instrument.id();
5101 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5102 client.instruments.insert(symbol_key, instrument);
5103
5104 let request = RequestInstrument::new(
5105 instrument_id,
5106 None,
5107 None,
5108 Some(client_id),
5109 UUID4::new(),
5110 get_atomic_clock_realtime().get_time_ns(),
5111 None,
5112 );
5113
5114 assert!(client.request_instrument(&request).is_ok());
5115
5116 let timeout = tokio::time::Duration::from_secs(2);
5118 let result = tokio::time::timeout(timeout, rx.recv()).await;
5119
5120 match result {
5123 Ok(Some(DataEvent::Response(_))) => {
5124 }
5126 Ok(None) | Err(_) => {
5127 }
5129 _ => {}
5130 }
5131 }
5132
5133 #[tokio::test]
5134 async fn test_dns_resolution_failure_handling() {
5135 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5137 set_data_event_sender(sender);
5138
5139 let client_id = ClientId::from("DYDX-DNS");
5140 let config = DydxDataClientConfig {
5141 base_url_http: Some(
5142 "http://this-domain-definitely-does-not-exist-12345.invalid".to_string(),
5143 ),
5144 http_timeout_secs: Some(2),
5145 ..Default::default()
5146 };
5147
5148 let http_client = DydxHttpClient::new(
5149 config.base_url_http.clone(),
5150 config.http_timeout_secs,
5151 config.http_proxy_url.clone(),
5152 config.is_testnet,
5153 None,
5154 )
5155 .unwrap();
5156
5157 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5158
5159 let request = RequestInstruments::new(
5160 None,
5161 None,
5162 Some(client_id),
5163 Some(*DYDX_VENUE),
5164 UUID4::new(),
5165 get_atomic_clock_realtime().get_time_ns(),
5166 None,
5167 );
5168
5169 assert!(client.request_instruments(&request).is_ok());
5170
5171 let timeout = tokio::time::Duration::from_secs(3);
5173 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5174 tokio::time::timeout(timeout, rx.recv()).await
5175 {
5176 assert!(
5177 resp.data.is_empty(),
5178 "Expected empty response on DNS failure"
5179 );
5180 }
5181 }
5182
5183 #[tokio::test]
5184 async fn test_http_502_503_handling() {
5185 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5187 set_data_event_sender(sender);
5188
5189 let client_id = ClientId::from("DYDX-503");
5190 let config = DydxDataClientConfig {
5191 base_url_http: Some("http://httpstat.us/503".to_string()),
5192 http_timeout_secs: Some(2),
5193 ..Default::default()
5194 };
5195
5196 let http_client = DydxHttpClient::new(
5197 config.base_url_http.clone(),
5198 config.http_timeout_secs,
5199 config.http_proxy_url.clone(),
5200 config.is_testnet,
5201 None,
5202 )
5203 .unwrap();
5204
5205 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5206
5207 let request = RequestInstruments::new(
5208 None,
5209 None,
5210 Some(client_id),
5211 Some(*DYDX_VENUE),
5212 UUID4::new(),
5213 get_atomic_clock_realtime().get_time_ns(),
5214 None,
5215 );
5216
5217 assert!(client.request_instruments(&request).is_ok());
5218
5219 let timeout = tokio::time::Duration::from_secs(3);
5221 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5222 tokio::time::timeout(timeout, rx.recv()).await
5223 {
5224 assert!(resp.data.is_empty(), "Expected empty response on 503");
5225 }
5226 }
5227
5228 #[tokio::test]
5229 async fn test_http_429_rate_limit_handling() {
5230 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5232 set_data_event_sender(sender);
5233
5234 let client_id = ClientId::from("DYDX-429");
5235 let config = DydxDataClientConfig {
5236 base_url_http: Some("http://httpstat.us/429".to_string()),
5237 http_timeout_secs: Some(2),
5238 ..Default::default()
5239 };
5240
5241 let http_client = DydxHttpClient::new(
5242 config.base_url_http.clone(),
5243 config.http_timeout_secs,
5244 config.http_proxy_url.clone(),
5245 config.is_testnet,
5246 None,
5247 )
5248 .unwrap();
5249
5250 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5251
5252 let instrument = create_test_instrument_any();
5253 let instrument_id = instrument.id();
5254 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5255 client.instruments.insert(symbol_key, instrument);
5256
5257 let request = RequestTrades::new(
5258 instrument_id,
5259 None,
5260 None,
5261 None,
5262 Some(client_id),
5263 UUID4::new(),
5264 get_atomic_clock_realtime().get_time_ns(),
5265 None,
5266 );
5267
5268 assert!(client.request_trades(&request).is_ok());
5269
5270 let timeout = tokio::time::Duration::from_secs(3);
5272 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5273 tokio::time::timeout(timeout, rx.recv()).await
5274 {
5275 assert!(
5276 resp.data.is_empty(),
5277 "Expected empty response on rate limit"
5278 );
5279 }
5280 }
5281
5282 #[tokio::test]
5283 async fn test_error_handling_does_not_panic() {
5284 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5286 set_data_event_sender(sender);
5287
5288 let client_id = ClientId::from("DYDX-NO-PANIC");
5289 let config = DydxDataClientConfig {
5290 base_url_http: Some("http://invalid".to_string()),
5291 ..Default::default()
5292 };
5293
5294 let http_client = DydxHttpClient::new(
5295 config.base_url_http.clone(),
5296 config.http_timeout_secs,
5297 config.http_proxy_url.clone(),
5298 config.is_testnet,
5299 None,
5300 )
5301 .unwrap();
5302
5303 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5304
5305 let request_instruments = RequestInstruments::new(
5307 None,
5308 None,
5309 Some(client_id),
5310 Some(*DYDX_VENUE),
5311 UUID4::new(),
5312 get_atomic_clock_realtime().get_time_ns(),
5313 None,
5314 );
5315 assert!(client.request_instruments(&request_instruments).is_ok());
5316
5317 let instrument_id = InstrumentId::from("INVALID.DYDX");
5318 let request_instrument = RequestInstrument::new(
5319 instrument_id,
5320 None,
5321 None,
5322 Some(client_id),
5323 UUID4::new(),
5324 get_atomic_clock_realtime().get_time_ns(),
5325 None,
5326 );
5327 assert!(client.request_instrument(&request_instrument).is_ok());
5328
5329 let request_trades = RequestTrades::new(
5330 instrument_id,
5331 None,
5332 None,
5333 None,
5334 Some(client_id),
5335 UUID4::new(),
5336 get_atomic_clock_realtime().get_time_ns(),
5337 None,
5338 );
5339 assert!(client.request_trades(&request_trades).is_ok());
5340 }
5341
5342 #[tokio::test]
5343 async fn test_malformed_json_response() {
5344 use axum::{Router, routing::get};
5346
5347 #[derive(Clone)]
5348 struct MalformedState;
5349
5350 async fn malformed_markets_handler() -> String {
5351 r#"{"markets": {"BTC-USD": {"ticker": "BTC-USD""#.to_string()
5353 }
5354
5355 let app = Router::new()
5356 .route("/v4/markets", get(malformed_markets_handler))
5357 .with_state(MalformedState);
5358
5359 let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5360 let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5361 let port = listener.local_addr().unwrap().port();
5362
5363 tokio::spawn(async move {
5364 axum::serve(listener, app).await.unwrap();
5365 });
5366
5367 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5368
5369 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5370 set_data_event_sender(sender);
5371
5372 let client_id = ClientId::from("DYDX-MALFORMED");
5373 let config = DydxDataClientConfig {
5374 base_url_http: Some(format!("http://127.0.0.1:{port}")),
5375 http_timeout_secs: Some(2),
5376 ..Default::default()
5377 };
5378
5379 let http_client = DydxHttpClient::new(
5380 config.base_url_http.clone(),
5381 config.http_timeout_secs,
5382 config.http_proxy_url.clone(),
5383 config.is_testnet,
5384 None,
5385 )
5386 .unwrap();
5387
5388 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5389
5390 let request = RequestInstruments::new(
5391 None,
5392 None,
5393 Some(client_id),
5394 Some(*DYDX_VENUE),
5395 UUID4::new(),
5396 get_atomic_clock_realtime().get_time_ns(),
5397 None,
5398 );
5399
5400 assert!(client.request_instruments(&request).is_ok());
5401
5402 let timeout = tokio::time::Duration::from_secs(3);
5404 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5405 tokio::time::timeout(timeout, rx.recv()).await
5406 {
5407 assert!(
5408 resp.data.is_empty(),
5409 "Expected empty response on malformed JSON"
5410 );
5411 }
5412 }
5413
5414 #[tokio::test]
5415 async fn test_missing_required_fields_in_response() {
5416 use axum::{Json, Router, routing::get};
5418 use serde_json::{Value, json};
5419
5420 #[derive(Clone)]
5421 struct MissingFieldsState;
5422
5423 async fn missing_fields_handler() -> Json<Value> {
5424 Json(json!({
5426 "markets": {
5427 "BTC-USD": {
5428 "status": "ACTIVE",
5430 "baseAsset": "BTC",
5431 "quoteAsset": "USD",
5432 }
5434 }
5435 }))
5436 }
5437
5438 let app = Router::new()
5439 .route("/v4/markets", get(missing_fields_handler))
5440 .with_state(MissingFieldsState);
5441
5442 let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5443 let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5444 let port = listener.local_addr().unwrap().port();
5445
5446 tokio::spawn(async move {
5447 axum::serve(listener, app).await.unwrap();
5448 });
5449
5450 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5451
5452 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5453 set_data_event_sender(sender);
5454
5455 let client_id = ClientId::from("DYDX-MISSING");
5456 let config = DydxDataClientConfig {
5457 base_url_http: Some(format!("http://127.0.0.1:{port}")),
5458 http_timeout_secs: Some(2),
5459 ..Default::default()
5460 };
5461
5462 let http_client = DydxHttpClient::new(
5463 config.base_url_http.clone(),
5464 config.http_timeout_secs,
5465 config.http_proxy_url.clone(),
5466 config.is_testnet,
5467 None,
5468 )
5469 .unwrap();
5470
5471 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5472
5473 let request = RequestInstruments::new(
5474 None,
5475 None,
5476 Some(client_id),
5477 Some(*DYDX_VENUE),
5478 UUID4::new(),
5479 get_atomic_clock_realtime().get_time_ns(),
5480 None,
5481 );
5482
5483 assert!(client.request_instruments(&request).is_ok());
5484
5485 let timeout = tokio::time::Duration::from_secs(3);
5487 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5488 tokio::time::timeout(timeout, rx.recv()).await
5489 {
5490 assert!(resp.correlation_id == request.request_id);
5493 }
5494 }
5495
5496 #[tokio::test]
5497 async fn test_invalid_data_types_in_response() {
5498 use axum::{Json, Router, routing::get};
5500 use serde_json::{Value, json};
5501
5502 #[derive(Clone)]
5503 struct InvalidTypesState;
5504
5505 async fn invalid_types_handler() -> Json<Value> {
5506 Json(json!({
5508 "markets": {
5509 "BTC-USD": {
5510 "ticker": "BTC-USD",
5511 "status": "ACTIVE",
5512 "baseAsset": "BTC",
5513 "quoteAsset": "USD",
5514 "stepSize": "not_a_number", "tickSize": true, "minOrderSize": ["array"], "market": 12345, }
5519 }
5520 }))
5521 }
5522
5523 let app = Router::new()
5524 .route("/v4/markets", get(invalid_types_handler))
5525 .with_state(InvalidTypesState);
5526
5527 let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5528 let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5529 let port = listener.local_addr().unwrap().port();
5530
5531 tokio::spawn(async move {
5532 axum::serve(listener, app).await.unwrap();
5533 });
5534
5535 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5536
5537 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5538 set_data_event_sender(sender);
5539
5540 let client_id = ClientId::from("DYDX-TYPES");
5541 let config = DydxDataClientConfig {
5542 base_url_http: Some(format!("http://127.0.0.1:{port}")),
5543 http_timeout_secs: Some(2),
5544 ..Default::default()
5545 };
5546
5547 let http_client = DydxHttpClient::new(
5548 config.base_url_http.clone(),
5549 config.http_timeout_secs,
5550 config.http_proxy_url.clone(),
5551 config.is_testnet,
5552 None,
5553 )
5554 .unwrap();
5555
5556 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5557
5558 let request = RequestInstruments::new(
5559 None,
5560 None,
5561 Some(client_id),
5562 Some(*DYDX_VENUE),
5563 UUID4::new(),
5564 get_atomic_clock_realtime().get_time_ns(),
5565 None,
5566 );
5567
5568 assert!(client.request_instruments(&request).is_ok());
5569
5570 let timeout = tokio::time::Duration::from_secs(3);
5572 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5573 tokio::time::timeout(timeout, rx.recv()).await
5574 {
5575 assert!(resp.correlation_id == request.request_id);
5577 }
5578 }
5579
5580 #[tokio::test]
5581 async fn test_unexpected_response_structure() {
5582 use axum::{Json, Router, routing::get};
5584 use serde_json::{Value, json};
5585
5586 #[derive(Clone)]
5587 struct UnexpectedState;
5588
5589 async fn unexpected_structure_handler() -> Json<Value> {
5590 Json(json!({
5592 "error": "Something went wrong",
5593 "code": 500,
5594 "data": null,
5595 "unexpected_field": {
5596 "nested": {
5597 "deeply": [1, 2, 3]
5598 }
5599 }
5600 }))
5601 }
5602
5603 let app = Router::new()
5604 .route("/v4/markets", get(unexpected_structure_handler))
5605 .with_state(UnexpectedState);
5606
5607 let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5608 let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5609 let port = listener.local_addr().unwrap().port();
5610
5611 tokio::spawn(async move {
5612 axum::serve(listener, app).await.unwrap();
5613 });
5614
5615 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5616
5617 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5618 set_data_event_sender(sender);
5619
5620 let client_id = ClientId::from("DYDX-STRUCT");
5621 let config = DydxDataClientConfig {
5622 base_url_http: Some(format!("http://127.0.0.1:{port}")),
5623 http_timeout_secs: Some(2),
5624 ..Default::default()
5625 };
5626
5627 let http_client = DydxHttpClient::new(
5628 config.base_url_http.clone(),
5629 config.http_timeout_secs,
5630 config.http_proxy_url.clone(),
5631 config.is_testnet,
5632 None,
5633 )
5634 .unwrap();
5635
5636 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5637
5638 let request = RequestInstruments::new(
5639 None,
5640 None,
5641 Some(client_id),
5642 Some(*DYDX_VENUE),
5643 UUID4::new(),
5644 get_atomic_clock_realtime().get_time_ns(),
5645 None,
5646 );
5647
5648 assert!(client.request_instruments(&request).is_ok());
5649
5650 let timeout = tokio::time::Duration::from_secs(3);
5652 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5653 tokio::time::timeout(timeout, rx.recv()).await
5654 {
5655 assert!(
5656 resp.data.is_empty(),
5657 "Expected empty response on unexpected structure"
5658 );
5659 assert!(resp.correlation_id == request.request_id);
5660 }
5661 }
5662
5663 #[tokio::test]
5664 async fn test_empty_markets_object_in_response() {
5665 use axum::{Json, Router, routing::get};
5667 use serde_json::{Value, json};
5668
5669 #[derive(Clone)]
5670 struct EmptyMarketsState;
5671
5672 async fn empty_markets_handler() -> Json<Value> {
5673 Json(json!({
5674 "markets": {}
5675 }))
5676 }
5677
5678 let app = Router::new()
5679 .route("/v4/markets", get(empty_markets_handler))
5680 .with_state(EmptyMarketsState);
5681
5682 let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5683 let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5684 let port = listener.local_addr().unwrap().port();
5685
5686 tokio::spawn(async move {
5687 axum::serve(listener, app).await.unwrap();
5688 });
5689
5690 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5691
5692 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5693 set_data_event_sender(sender);
5694
5695 let client_id = ClientId::from("DYDX-EMPTY");
5696 let config = DydxDataClientConfig {
5697 base_url_http: Some(format!("http://127.0.0.1:{port}")),
5698 http_timeout_secs: Some(2),
5699 ..Default::default()
5700 };
5701
5702 let http_client = DydxHttpClient::new(
5703 config.base_url_http.clone(),
5704 config.http_timeout_secs,
5705 config.http_proxy_url.clone(),
5706 config.is_testnet,
5707 None,
5708 )
5709 .unwrap();
5710
5711 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5712
5713 let request = RequestInstruments::new(
5714 None,
5715 None,
5716 Some(client_id),
5717 Some(*DYDX_VENUE),
5718 UUID4::new(),
5719 get_atomic_clock_realtime().get_time_ns(),
5720 None,
5721 );
5722
5723 assert!(client.request_instruments(&request).is_ok());
5724
5725 let timeout = tokio::time::Duration::from_secs(3);
5727 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5728 tokio::time::timeout(timeout, rx.recv()).await
5729 {
5730 assert!(
5731 resp.data.is_empty(),
5732 "Expected empty response for empty markets"
5733 );
5734 assert!(resp.correlation_id == request.request_id);
5735 }
5736 }
5737
5738 #[tokio::test]
5739 async fn test_null_values_in_response() {
5740 use axum::{Json, Router, routing::get};
5742 use serde_json::{Value, json};
5743
5744 #[derive(Clone)]
5745 struct NullValuesState;
5746
5747 async fn null_values_handler() -> Json<Value> {
5748 Json(json!({
5749 "markets": {
5750 "BTC-USD": {
5751 "ticker": null,
5752 "status": "ACTIVE",
5753 "baseAsset": null,
5754 "quoteAsset": "USD",
5755 "stepSize": null,
5756 }
5757 }
5758 }))
5759 }
5760
5761 let app = Router::new()
5762 .route("/v4/markets", get(null_values_handler))
5763 .with_state(NullValuesState);
5764
5765 let addr = SocketAddr::from(([127, 0, 0, 1], 0));
5766 let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
5767 let port = listener.local_addr().unwrap().port();
5768
5769 tokio::spawn(async move {
5770 axum::serve(listener, app).await.unwrap();
5771 });
5772
5773 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
5774
5775 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5776 set_data_event_sender(sender);
5777
5778 let client_id = ClientId::from("DYDX-NULL");
5779 let config = DydxDataClientConfig {
5780 base_url_http: Some(format!("http://127.0.0.1:{port}")),
5781 http_timeout_secs: Some(2),
5782 ..Default::default()
5783 };
5784
5785 let http_client = DydxHttpClient::new(
5786 config.base_url_http.clone(),
5787 config.http_timeout_secs,
5788 config.http_proxy_url.clone(),
5789 config.is_testnet,
5790 None,
5791 )
5792 .unwrap();
5793
5794 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5795
5796 let request = RequestInstruments::new(
5797 None,
5798 None,
5799 Some(client_id),
5800 Some(*DYDX_VENUE),
5801 UUID4::new(),
5802 get_atomic_clock_realtime().get_time_ns(),
5803 None,
5804 );
5805
5806 assert!(client.request_instruments(&request).is_ok());
5807
5808 let timeout = tokio::time::Duration::from_secs(3);
5810 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
5811 tokio::time::timeout(timeout, rx.recv()).await
5812 {
5813 assert!(resp.correlation_id == request.request_id);
5815 }
5816 }
5817
5818 #[tokio::test]
5819 async fn test_invalid_instrument_id_format() {
5820 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5822 set_data_event_sender(sender);
5823
5824 let client_id = ClientId::from("DYDX-INVALID-ID");
5825 let config = DydxDataClientConfig::default();
5826
5827 let http_client = DydxHttpClient::new(
5828 config.base_url_http.clone(),
5829 config.http_timeout_secs,
5830 config.http_proxy_url.clone(),
5831 config.is_testnet,
5832 None,
5833 )
5834 .unwrap();
5835
5836 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5837
5838 let non_existent_id = InstrumentId::from("NONEXISTENT-USD.DYDX");
5840
5841 let request = RequestInstrument::new(
5842 non_existent_id,
5843 None,
5844 None,
5845 Some(client_id),
5846 UUID4::new(),
5847 get_atomic_clock_realtime().get_time_ns(),
5848 None,
5849 );
5850
5851 assert!(client.request_instrument(&request).is_ok());
5852
5853 let timeout = tokio::time::Duration::from_secs(2);
5855 let result = tokio::time::timeout(timeout, rx.recv()).await;
5856
5857 match result {
5859 Ok(Some(DataEvent::Response(DataResponse::Instrument(_)))) => {
5860 }
5862 Ok(None) | Err(_) => {
5863 }
5865 _ => {}
5866 }
5867 }
5868
5869 #[tokio::test]
5870 async fn test_invalid_date_range_end_before_start() {
5871 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5873 set_data_event_sender(sender);
5874
5875 let client_id = ClientId::from("DYDX-DATE-RANGE");
5876 let config = DydxDataClientConfig::default();
5877
5878 let http_client = DydxHttpClient::new(
5879 config.base_url_http.clone(),
5880 config.http_timeout_secs,
5881 config.http_proxy_url.clone(),
5882 config.is_testnet,
5883 None,
5884 )
5885 .unwrap();
5886
5887 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5888
5889 let instrument = create_test_instrument_any();
5890 let instrument_id = instrument.id();
5891 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5892 client.instruments.insert(symbol_key, instrument);
5893
5894 let start = Utc::now();
5896 let end = start - chrono::Duration::hours(24); let request = RequestTrades::new(
5899 instrument_id,
5900 Some(start),
5901 Some(end),
5902 None,
5903 Some(client_id),
5904 UUID4::new(),
5905 get_atomic_clock_realtime().get_time_ns(),
5906 None,
5907 );
5908
5909 assert!(client.request_trades(&request).is_ok());
5910
5911 let timeout = tokio::time::Duration::from_secs(2);
5913 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
5914 tokio::time::timeout(timeout, rx.recv()).await
5915 {
5916 assert!(resp.correlation_id == request.request_id);
5918 }
5919 }
5920
5921 #[tokio::test]
5922 async fn test_negative_limit_value() {
5923 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5926 set_data_event_sender(sender);
5927
5928 let client_id = ClientId::from("DYDX-NEG-LIMIT");
5929 let config = DydxDataClientConfig::default();
5930
5931 let http_client = DydxHttpClient::new(
5932 config.base_url_http.clone(),
5933 config.http_timeout_secs,
5934 config.http_proxy_url.clone(),
5935 config.is_testnet,
5936 None,
5937 )
5938 .unwrap();
5939
5940 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5941
5942 let instrument = create_test_instrument_any();
5943 let instrument_id = instrument.id();
5944 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5945 client.instruments.insert(symbol_key, instrument);
5946
5947 let request = RequestTrades::new(
5949 instrument_id,
5950 None,
5951 None,
5952 std::num::NonZeroUsize::new(1), Some(client_id),
5954 UUID4::new(),
5955 get_atomic_clock_realtime().get_time_ns(),
5956 None,
5957 );
5958
5959 assert!(client.request_trades(&request).is_ok());
5961 }
5962
5963 #[tokio::test]
5964 async fn test_zero_limit_value() {
5965 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
5968 set_data_event_sender(sender);
5969
5970 let client_id = ClientId::from("DYDX-ZERO-LIMIT");
5971 let config = DydxDataClientConfig::default();
5972
5973 let http_client = DydxHttpClient::new(
5974 config.base_url_http.clone(),
5975 config.http_timeout_secs,
5976 config.http_proxy_url.clone(),
5977 config.is_testnet,
5978 None,
5979 )
5980 .unwrap();
5981
5982 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
5983
5984 let instrument = create_test_instrument_any();
5985 let instrument_id = instrument.id();
5986 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
5987 client.instruments.insert(symbol_key, instrument);
5988
5989 let request = RequestTrades::new(
5990 instrument_id,
5991 None,
5992 None,
5993 None, Some(client_id),
5995 UUID4::new(),
5996 get_atomic_clock_realtime().get_time_ns(),
5997 None,
5998 );
5999
6000 assert!(client.request_trades(&request).is_ok());
6001
6002 let timeout = tokio::time::Duration::from_secs(2);
6004 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6005 tokio::time::timeout(timeout, rx.recv()).await
6006 {
6007 assert!(resp.correlation_id == request.request_id);
6008 }
6009 }
6010
6011 #[tokio::test]
6012 async fn test_very_large_limit_value() {
6013 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6015 set_data_event_sender(sender);
6016
6017 let client_id = ClientId::from("DYDX-LARGE-LIMIT");
6018 let config = DydxDataClientConfig::default();
6019
6020 let http_client = DydxHttpClient::new(
6021 config.base_url_http.clone(),
6022 config.http_timeout_secs,
6023 config.http_proxy_url.clone(),
6024 config.is_testnet,
6025 None,
6026 )
6027 .unwrap();
6028
6029 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6030
6031 let instrument = create_test_instrument_any();
6032 let instrument_id = instrument.id();
6033 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6034 client.instruments.insert(symbol_key, instrument);
6035
6036 let request = RequestTrades::new(
6037 instrument_id,
6038 None,
6039 None,
6040 std::num::NonZeroUsize::new(1_000_000), Some(client_id),
6042 UUID4::new(),
6043 get_atomic_clock_realtime().get_time_ns(),
6044 None,
6045 );
6046
6047 assert!(client.request_trades(&request).is_ok());
6049
6050 let timeout = tokio::time::Duration::from_secs(2);
6052 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6053 tokio::time::timeout(timeout, rx.recv()).await
6054 {
6055 assert!(resp.correlation_id == request.request_id);
6056 }
6057 }
6058
6059 #[tokio::test]
6060 async fn test_none_limit_uses_default() {
6061 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6063 set_data_event_sender(sender);
6064
6065 let client_id = ClientId::from("DYDX-NONE-LIMIT");
6066 let config = DydxDataClientConfig::default();
6067
6068 let http_client = DydxHttpClient::new(
6069 config.base_url_http.clone(),
6070 config.http_timeout_secs,
6071 config.http_proxy_url.clone(),
6072 config.is_testnet,
6073 None,
6074 )
6075 .unwrap();
6076
6077 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6078
6079 let instrument = create_test_instrument_any();
6080 let instrument_id = instrument.id();
6081 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6082 client.instruments.insert(symbol_key, instrument);
6083
6084 let request = RequestTrades::new(
6085 instrument_id,
6086 None,
6087 None,
6088 None, Some(client_id),
6090 UUID4::new(),
6091 get_atomic_clock_realtime().get_time_ns(),
6092 None,
6093 );
6094
6095 assert!(client.request_trades(&request).is_ok());
6097
6098 let timeout = tokio::time::Duration::from_secs(2);
6099 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
6100 tokio::time::timeout(timeout, rx.recv()).await
6101 {
6102 assert!(resp.correlation_id == request.request_id);
6103 }
6104 }
6105
6106 #[tokio::test]
6107 async fn test_validation_does_not_panic() {
6108 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6110 set_data_event_sender(sender);
6111
6112 let client_id = ClientId::from("DYDX-VALIDATION");
6113 let config = DydxDataClientConfig::default();
6114
6115 let http_client = DydxHttpClient::new(
6116 config.base_url_http.clone(),
6117 config.http_timeout_secs,
6118 config.http_proxy_url.clone(),
6119 config.is_testnet,
6120 None,
6121 )
6122 .unwrap();
6123
6124 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6125
6126 let instrument = create_test_instrument_any();
6127 let instrument_id = instrument.id();
6128 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6129 client.instruments.insert(symbol_key, instrument);
6130
6131 let invalid_id = InstrumentId::from("INVALID.WRONG");
6133 let req1 = RequestInstrument::new(
6134 invalid_id,
6135 None,
6136 None,
6137 Some(client_id),
6138 UUID4::new(),
6139 get_atomic_clock_realtime().get_time_ns(),
6140 None,
6141 );
6142 assert!(client.request_instrument(&req1).is_ok());
6143
6144 let start = Utc::now();
6146 let end = start - chrono::Duration::hours(1);
6147 let req2 = RequestTrades::new(
6148 instrument_id,
6149 Some(start),
6150 Some(end),
6151 None,
6152 Some(client_id),
6153 UUID4::new(),
6154 get_atomic_clock_realtime().get_time_ns(),
6155 None,
6156 );
6157 assert!(client.request_trades(&req2).is_ok());
6158
6159 let req3 = RequestTrades::new(
6161 instrument_id,
6162 None,
6163 None,
6164 std::num::NonZeroUsize::new(1),
6165 Some(client_id),
6166 UUID4::new(),
6167 get_atomic_clock_realtime().get_time_ns(),
6168 None,
6169 );
6170 assert!(client.request_trades(&req3).is_ok());
6171
6172 let req4 = RequestTrades::new(
6174 instrument_id,
6175 None,
6176 None,
6177 std::num::NonZeroUsize::new(usize::MAX),
6178 Some(client_id),
6179 UUID4::new(),
6180 get_atomic_clock_realtime().get_time_ns(),
6181 None,
6182 );
6183 assert!(client.request_trades(&req4).is_ok());
6184
6185 }
6187
6188 #[tokio::test]
6189 async fn test_instruments_response_has_correct_venue() {
6190 use axum::{Json, Router, routing::get};
6192 use serde_json::{Value, json};
6193
6194 #[derive(Clone)]
6195 struct VenueTestState;
6196
6197 async fn venue_handler() -> Json<Value> {
6198 Json(json!({
6199 "markets": {
6200 "BTC-USD": {
6201 "ticker": "BTC-USD",
6202 "status": "ACTIVE",
6203 "baseAsset": "BTC",
6204 "quoteAsset": "USD",
6205 "stepSize": "0.0001",
6206 "tickSize": "1",
6207 "indexPrice": "50000",
6208 "oraclePrice": "50000",
6209 "priceChange24H": "1000",
6210 "nextFundingRate": "0.0001",
6211 "nextFundingAt": "2024-01-01T00:00:00.000Z",
6212 "minOrderSize": "0.001",
6213 "type": "PERPETUAL",
6214 "initialMarginFraction": "0.05",
6215 "maintenanceMarginFraction": "0.03",
6216 "volume24H": "1000000",
6217 "trades24H": "10000",
6218 "openInterest": "5000000",
6219 "incrementalInitialMarginFraction": "0.01",
6220 "incrementalPositionSize": "10",
6221 "maxPositionSize": "1000",
6222 "baselinePositionSize": "100",
6223 "assetResolution": "10000000000"
6224 }
6225 }
6226 }))
6227 }
6228
6229 let app = Router::new()
6230 .route("/v4/markets", get(venue_handler))
6231 .with_state(VenueTestState);
6232
6233 let addr = SocketAddr::from(([127, 0, 0, 1], 0));
6234 let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
6235 let port = listener.local_addr().unwrap().port();
6236
6237 tokio::spawn(async move {
6238 axum::serve(listener, app).await.unwrap();
6239 });
6240
6241 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
6242
6243 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6244 set_data_event_sender(sender);
6245
6246 let client_id = ClientId::from("DYDX-VENUE-TEST");
6247 let config = DydxDataClientConfig {
6248 base_url_http: Some(format!("http://127.0.0.1:{port}")),
6249 http_timeout_secs: Some(2),
6250 ..Default::default()
6251 };
6252
6253 let http_client = DydxHttpClient::new(
6254 config.base_url_http.clone(),
6255 config.http_timeout_secs,
6256 config.http_proxy_url.clone(),
6257 config.is_testnet,
6258 None,
6259 )
6260 .unwrap();
6261
6262 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6263
6264 let request = RequestInstruments::new(
6265 None,
6266 None,
6267 Some(client_id),
6268 Some(*DYDX_VENUE),
6269 UUID4::new(),
6270 get_atomic_clock_realtime().get_time_ns(),
6271 None,
6272 );
6273
6274 assert!(client.request_instruments(&request).is_ok());
6275
6276 let timeout = tokio::time::Duration::from_secs(3);
6277 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6278 tokio::time::timeout(timeout, rx.recv()).await
6279 {
6280 assert_eq!(resp.venue, *DYDX_VENUE, "Response should have DYDX venue");
6282 }
6283 }
6284
6285 #[tokio::test]
6286 async fn test_instruments_response_contains_vec_instrument_any() {
6287 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6289 set_data_event_sender(sender);
6290
6291 let client_id = ClientId::from("DYDX-VEC-TEST");
6292 let config = DydxDataClientConfig::default();
6293
6294 let http_client = DydxHttpClient::new(
6295 config.base_url_http.clone(),
6296 config.http_timeout_secs,
6297 config.http_proxy_url.clone(),
6298 config.is_testnet,
6299 None,
6300 )
6301 .unwrap();
6302
6303 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6304
6305 let request = RequestInstruments::new(
6306 None,
6307 None,
6308 Some(client_id),
6309 Some(*DYDX_VENUE),
6310 UUID4::new(),
6311 get_atomic_clock_realtime().get_time_ns(),
6312 None,
6313 );
6314
6315 assert!(client.request_instruments(&request).is_ok());
6316
6317 let timeout = tokio::time::Duration::from_secs(2);
6318 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6319 tokio::time::timeout(timeout, rx.recv()).await
6320 {
6321 assert!(
6323 resp.data.is_empty() || !resp.data.is_empty(),
6324 "data should be Vec<InstrumentAny>"
6325 );
6326 }
6327 }
6328
6329 #[tokio::test]
6330 async fn test_instruments_response_includes_correlation_id() {
6331 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6333 set_data_event_sender(sender);
6334
6335 let client_id = ClientId::from("DYDX-CORR-TEST");
6336 let config = DydxDataClientConfig::default();
6337
6338 let http_client = DydxHttpClient::new(
6339 config.base_url_http.clone(),
6340 config.http_timeout_secs,
6341 config.http_proxy_url.clone(),
6342 config.is_testnet,
6343 None,
6344 )
6345 .unwrap();
6346
6347 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6348
6349 let request_id = UUID4::new();
6350 let request = RequestInstruments::new(
6351 None,
6352 None,
6353 Some(client_id),
6354 Some(*DYDX_VENUE),
6355 request_id,
6356 get_atomic_clock_realtime().get_time_ns(),
6357 None,
6358 );
6359
6360 assert!(client.request_instruments(&request).is_ok());
6361
6362 let timeout = tokio::time::Duration::from_secs(2);
6363 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6364 tokio::time::timeout(timeout, rx.recv()).await
6365 {
6366 assert_eq!(
6368 resp.correlation_id, request_id,
6369 "correlation_id should match request_id"
6370 );
6371 }
6372 }
6373
6374 #[tokio::test]
6375 async fn test_instruments_response_includes_client_id() {
6376 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6378 set_data_event_sender(sender);
6379
6380 let client_id = ClientId::from("DYDX-CLIENT-TEST");
6381 let config = DydxDataClientConfig::default();
6382
6383 let http_client = DydxHttpClient::new(
6384 config.base_url_http.clone(),
6385 config.http_timeout_secs,
6386 config.http_proxy_url.clone(),
6387 config.is_testnet,
6388 None,
6389 )
6390 .unwrap();
6391
6392 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6393
6394 let request = RequestInstruments::new(
6395 None,
6396 None,
6397 Some(client_id),
6398 Some(*DYDX_VENUE),
6399 UUID4::new(),
6400 get_atomic_clock_realtime().get_time_ns(),
6401 None,
6402 );
6403
6404 assert!(client.request_instruments(&request).is_ok());
6405
6406 let timeout = tokio::time::Duration::from_secs(2);
6407 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6408 tokio::time::timeout(timeout, rx.recv()).await
6409 {
6410 assert_eq!(
6412 resp.client_id, client_id,
6413 "client_id should be included in response"
6414 );
6415 }
6416 }
6417
6418 #[tokio::test]
6419 async fn test_instruments_response_includes_timestamps() {
6420 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6422 set_data_event_sender(sender);
6423
6424 let client_id = ClientId::from("DYDX-TS-TEST");
6425 let config = DydxDataClientConfig::default();
6426
6427 let http_client = DydxHttpClient::new(
6428 config.base_url_http.clone(),
6429 config.http_timeout_secs,
6430 config.http_proxy_url.clone(),
6431 config.is_testnet,
6432 None,
6433 )
6434 .unwrap();
6435
6436 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6437
6438 let start = Some(Utc::now() - chrono::Duration::days(1));
6439 let end = Some(Utc::now());
6440 let ts_init = get_atomic_clock_realtime().get_time_ns();
6441
6442 let request = RequestInstruments::new(
6443 start,
6444 end,
6445 Some(client_id),
6446 Some(*DYDX_VENUE),
6447 UUID4::new(),
6448 ts_init,
6449 None,
6450 );
6451
6452 assert!(client.request_instruments(&request).is_ok());
6453
6454 let timeout = tokio::time::Duration::from_secs(2);
6455 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6456 tokio::time::timeout(timeout, rx.recv()).await
6457 {
6458 assert!(
6460 resp.start.is_some() || resp.start.is_none(),
6461 "start timestamp field exists"
6462 );
6463 assert!(
6464 resp.end.is_some() || resp.end.is_none(),
6465 "end timestamp field exists"
6466 );
6467 assert!(resp.ts_init > 0, "ts_init should be greater than 0");
6468 }
6469 }
6470
6471 #[tokio::test]
6472 async fn test_instruments_response_includes_params_when_provided() {
6473 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6475 set_data_event_sender(sender);
6476
6477 let client_id = ClientId::from("DYDX-PARAMS-TEST");
6478 let config = DydxDataClientConfig::default();
6479
6480 let http_client = DydxHttpClient::new(
6481 config.base_url_http.clone(),
6482 config.http_timeout_secs,
6483 config.http_proxy_url.clone(),
6484 config.is_testnet,
6485 None,
6486 )
6487 .unwrap();
6488
6489 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6490
6491 let request = RequestInstruments::new(
6494 None,
6495 None,
6496 Some(client_id),
6497 Some(*DYDX_VENUE),
6498 UUID4::new(),
6499 get_atomic_clock_realtime().get_time_ns(),
6500 None, );
6502
6503 assert!(client.request_instruments(&request).is_ok());
6504
6505 let timeout = tokio::time::Duration::from_secs(2);
6506 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6507 tokio::time::timeout(timeout, rx.recv()).await
6508 {
6509 let _params = resp.params;
6511 }
6512 }
6513
6514 #[tokio::test]
6515 async fn test_instruments_response_params_none_when_not_provided() {
6516 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6518 set_data_event_sender(sender);
6519
6520 let client_id = ClientId::from("DYDX-NO-PARAMS");
6521 let config = DydxDataClientConfig::default();
6522
6523 let http_client = DydxHttpClient::new(
6524 config.base_url_http.clone(),
6525 config.http_timeout_secs,
6526 config.http_proxy_url.clone(),
6527 config.is_testnet,
6528 None,
6529 )
6530 .unwrap();
6531
6532 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6533
6534 let request = RequestInstruments::new(
6535 None,
6536 None,
6537 Some(client_id),
6538 Some(*DYDX_VENUE),
6539 UUID4::new(),
6540 get_atomic_clock_realtime().get_time_ns(),
6541 None, );
6543
6544 assert!(client.request_instruments(&request).is_ok());
6545
6546 let timeout = tokio::time::Duration::from_secs(2);
6547 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6548 tokio::time::timeout(timeout, rx.recv()).await
6549 {
6550 assert!(
6552 resp.params.is_none(),
6553 "params should be None when not provided"
6554 );
6555 }
6556 }
6557
6558 #[tokio::test]
6559 async fn test_instruments_response_complete_structure() {
6560 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6562 set_data_event_sender(sender);
6563
6564 let client_id = ClientId::from("DYDX-FULL-TEST");
6565 let config = DydxDataClientConfig::default();
6566
6567 let http_client = DydxHttpClient::new(
6568 config.base_url_http.clone(),
6569 config.http_timeout_secs,
6570 config.http_proxy_url.clone(),
6571 config.is_testnet,
6572 None,
6573 )
6574 .unwrap();
6575
6576 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6577
6578 let request_id = UUID4::new();
6579 let start = Some(Utc::now() - chrono::Duration::hours(1));
6580 let end = Some(Utc::now());
6581 let ts_init = get_atomic_clock_realtime().get_time_ns();
6582
6583 let request = RequestInstruments::new(
6584 start,
6585 end,
6586 Some(client_id),
6587 Some(*DYDX_VENUE),
6588 request_id,
6589 ts_init,
6590 None,
6591 );
6592
6593 assert!(client.request_instruments(&request).is_ok());
6594
6595 let timeout = tokio::time::Duration::from_secs(2);
6596 if let Ok(Some(DataEvent::Response(DataResponse::Instruments(resp)))) =
6597 tokio::time::timeout(timeout, rx.recv()).await
6598 {
6599 assert_eq!(resp.venue, *DYDX_VENUE, "venue should be DYDX");
6601 assert_eq!(
6602 resp.correlation_id, request_id,
6603 "correlation_id should match"
6604 );
6605 assert_eq!(resp.client_id, client_id, "client_id should match");
6606 assert!(resp.ts_init > 0, "ts_init should be set");
6607
6608 let _data: Vec<InstrumentAny> = resp.data;
6610
6611 let _start = resp.start;
6613 let _end = resp.end;
6614 let _params = resp.params;
6615 }
6616 }
6617
6618 #[tokio::test]
6619 async fn test_instrument_response_properly_boxed() {
6620 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6622 set_data_event_sender(sender);
6623
6624 let client_id = ClientId::from("DYDX-BOXED-TEST");
6625 let config = DydxDataClientConfig::default();
6626
6627 let http_client = DydxHttpClient::new(
6628 config.base_url_http.clone(),
6629 config.http_timeout_secs,
6630 config.http_proxy_url.clone(),
6631 config.is_testnet,
6632 None,
6633 )
6634 .unwrap();
6635
6636 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6637
6638 let instrument = create_test_instrument_any();
6639 let instrument_id = instrument.id();
6640 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6641 client.instruments.insert(symbol_key, instrument);
6642
6643 let request = RequestInstrument::new(
6644 instrument_id,
6645 None,
6646 None,
6647 Some(client_id),
6648 UUID4::new(),
6649 get_atomic_clock_realtime().get_time_ns(),
6650 None,
6651 );
6652
6653 assert!(client.request_instrument(&request).is_ok());
6654
6655 let timeout = tokio::time::Duration::from_secs(2);
6656 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(boxed_resp)))) =
6657 tokio::time::timeout(timeout, rx.recv()).await
6658 {
6659 let _response: Box<InstrumentResponse> = boxed_resp;
6661 }
6663 }
6664
6665 #[tokio::test]
6666 async fn test_instrument_response_contains_single_instrument() {
6667 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6669 set_data_event_sender(sender);
6670
6671 let client_id = ClientId::from("DYDX-SINGLE-TEST");
6672 let config = DydxDataClientConfig::default();
6673
6674 let http_client = DydxHttpClient::new(
6675 config.base_url_http.clone(),
6676 config.http_timeout_secs,
6677 config.http_proxy_url.clone(),
6678 config.is_testnet,
6679 None,
6680 )
6681 .unwrap();
6682
6683 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6684
6685 let instrument = create_test_instrument_any();
6686 let instrument_id = instrument.id();
6687 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6688 client.instruments.insert(symbol_key, instrument.clone());
6689
6690 let request = RequestInstrument::new(
6691 instrument_id,
6692 None,
6693 None,
6694 Some(client_id),
6695 UUID4::new(),
6696 get_atomic_clock_realtime().get_time_ns(),
6697 None,
6698 );
6699
6700 assert!(client.request_instrument(&request).is_ok());
6701
6702 let timeout = tokio::time::Duration::from_secs(2);
6703 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6704 tokio::time::timeout(timeout, rx.recv()).await
6705 {
6706 let _instrument: InstrumentAny = resp.data;
6708 }
6710 }
6711
6712 #[tokio::test]
6713 async fn test_instrument_response_has_correct_instrument_id() {
6714 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6716 set_data_event_sender(sender);
6717
6718 let client_id = ClientId::from("DYDX-ID-TEST");
6719 let config = DydxDataClientConfig::default();
6720
6721 let http_client = DydxHttpClient::new(
6722 config.base_url_http.clone(),
6723 config.http_timeout_secs,
6724 config.http_proxy_url.clone(),
6725 config.is_testnet,
6726 None,
6727 )
6728 .unwrap();
6729
6730 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6731
6732 let instrument = create_test_instrument_any();
6733 let instrument_id = instrument.id();
6734 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6735 client.instruments.insert(symbol_key, instrument);
6736
6737 let request = RequestInstrument::new(
6738 instrument_id,
6739 None,
6740 None,
6741 Some(client_id),
6742 UUID4::new(),
6743 get_atomic_clock_realtime().get_time_ns(),
6744 None,
6745 );
6746
6747 assert!(client.request_instrument(&request).is_ok());
6748
6749 let timeout = tokio::time::Duration::from_secs(2);
6750 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6751 tokio::time::timeout(timeout, rx.recv()).await
6752 {
6753 assert_eq!(
6755 resp.instrument_id, instrument_id,
6756 "instrument_id should match requested ID"
6757 );
6758 }
6759 }
6760
6761 #[tokio::test]
6762 async fn test_instrument_response_includes_metadata() {
6763 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6765 set_data_event_sender(sender);
6766
6767 let client_id = ClientId::from("DYDX-META-TEST");
6768 let config = DydxDataClientConfig::default();
6769
6770 let http_client = DydxHttpClient::new(
6771 config.base_url_http.clone(),
6772 config.http_timeout_secs,
6773 config.http_proxy_url.clone(),
6774 config.is_testnet,
6775 None,
6776 )
6777 .unwrap();
6778
6779 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6780
6781 let instrument = create_test_instrument_any();
6782 let instrument_id = instrument.id();
6783 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6784 client.instruments.insert(symbol_key, instrument);
6785
6786 let request_id = UUID4::new();
6787 let start = Some(Utc::now() - chrono::Duration::hours(1));
6788 let end = Some(Utc::now());
6789 let ts_init = get_atomic_clock_realtime().get_time_ns();
6790
6791 let request = RequestInstrument::new(
6792 instrument_id,
6793 start,
6794 end,
6795 Some(client_id),
6796 request_id,
6797 ts_init,
6798 None,
6799 );
6800
6801 assert!(client.request_instrument(&request).is_ok());
6802
6803 let timeout = tokio::time::Duration::from_secs(2);
6804 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6805 tokio::time::timeout(timeout, rx.recv()).await
6806 {
6807 assert_eq!(
6809 resp.correlation_id, request_id,
6810 "correlation_id should match"
6811 );
6812 assert_eq!(resp.client_id, client_id, "client_id should match");
6813 assert!(resp.ts_init > 0, "ts_init should be set");
6814
6815 let _start = resp.start;
6817 let _end = resp.end;
6818
6819 let _params = resp.params;
6821 }
6822 }
6823
6824 #[tokio::test]
6825 async fn test_instrument_response_matches_requested_instrument() {
6826 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6828 set_data_event_sender(sender);
6829
6830 let client_id = ClientId::from("DYDX-MATCH-TEST");
6831 let config = DydxDataClientConfig::default();
6832
6833 let http_client = DydxHttpClient::new(
6834 config.base_url_http.clone(),
6835 config.http_timeout_secs,
6836 config.http_proxy_url.clone(),
6837 config.is_testnet,
6838 None,
6839 )
6840 .unwrap();
6841
6842 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6843
6844 let instrument = create_test_instrument_any();
6845 let instrument_id = instrument.id();
6846 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6847 client.instruments.insert(symbol_key, instrument.clone());
6848
6849 let request = RequestInstrument::new(
6850 instrument_id,
6851 None,
6852 None,
6853 Some(client_id),
6854 UUID4::new(),
6855 get_atomic_clock_realtime().get_time_ns(),
6856 None,
6857 );
6858
6859 assert!(client.request_instrument(&request).is_ok());
6860
6861 let timeout = tokio::time::Duration::from_secs(2);
6862 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6863 tokio::time::timeout(timeout, rx.recv()).await
6864 {
6865 assert_eq!(
6867 resp.data.id(),
6868 instrument_id,
6869 "Returned instrument should match requested"
6870 );
6871 assert_eq!(
6872 resp.instrument_id, instrument_id,
6873 "instrument_id field should match"
6874 );
6875
6876 assert_eq!(
6878 resp.data.id(),
6879 resp.instrument_id,
6880 "data.id() should match instrument_id field"
6881 );
6882 }
6883 }
6884
6885 #[tokio::test]
6886 async fn test_instrument_response_complete_structure() {
6887 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6889 set_data_event_sender(sender);
6890
6891 let client_id = ClientId::from("DYDX-FULL-INST-TEST");
6892 let config = DydxDataClientConfig::default();
6893
6894 let http_client = DydxHttpClient::new(
6895 config.base_url_http.clone(),
6896 config.http_timeout_secs,
6897 config.http_proxy_url.clone(),
6898 config.is_testnet,
6899 None,
6900 )
6901 .unwrap();
6902
6903 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
6904
6905 let instrument = create_test_instrument_any();
6906 let instrument_id = instrument.id();
6907 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
6908 client.instruments.insert(symbol_key, instrument.clone());
6909
6910 let request_id = UUID4::new();
6911 let ts_init = get_atomic_clock_realtime().get_time_ns();
6912
6913 let request = RequestInstrument::new(
6914 instrument_id,
6915 None,
6916 None,
6917 Some(client_id),
6918 request_id,
6919 ts_init,
6920 None,
6921 );
6922
6923 assert!(client.request_instrument(&request).is_ok());
6924
6925 let timeout = tokio::time::Duration::from_secs(2);
6926 if let Ok(Some(DataEvent::Response(DataResponse::Instrument(resp)))) =
6927 tokio::time::timeout(timeout, rx.recv()).await
6928 {
6929 let _boxed: Box<InstrumentResponse> = resp.clone();
6932
6933 assert_eq!(resp.correlation_id, request_id);
6935 assert_eq!(resp.client_id, client_id);
6936 assert_eq!(resp.instrument_id, instrument_id);
6937 assert!(resp.ts_init > 0);
6938
6939 let returned_instrument: InstrumentAny = resp.data;
6941 assert_eq!(returned_instrument.id(), instrument_id);
6942
6943 let _start = resp.start;
6945 let _end = resp.end;
6946 let _params = resp.params;
6947 }
6948 }
6949
6950 #[tokio::test]
6951 async fn test_trades_response_contains_vec_trade_tick() {
6952 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6954 set_data_event_sender(sender);
6955
6956 let created_at = Utc::now();
6957 let http_trades = vec![
6958 crate::http::models::Trade {
6959 id: "trade-1".to_string(),
6960 side: OrderSide::Buy,
6961 size: dec!(1.0),
6962 price: dec!(100.0),
6963 created_at,
6964 created_at_height: 100,
6965 trade_type: crate::common::enums::DydxTradeType::Limit,
6966 },
6967 crate::http::models::Trade {
6968 id: "trade-2".to_string(),
6969 side: OrderSide::Sell,
6970 size: dec!(2.0),
6971 price: dec!(101.0),
6972 created_at: created_at + chrono::Duration::seconds(1),
6973 created_at_height: 101,
6974 trade_type: crate::common::enums::DydxTradeType::Limit,
6975 },
6976 ];
6977
6978 let trades_response = crate::http::models::TradesResponse {
6979 trades: http_trades,
6980 };
6981
6982 let state = TradesTestState {
6983 response: Arc::new(trades_response),
6984 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
6985 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
6986 };
6987
6988 let addr = start_trades_test_server(state).await;
6989 let base_url = format!("http://{addr}");
6990
6991 let client_id = ClientId::from("DYDX-VEC-TEST");
6992 let config = DydxDataClientConfig {
6993 base_url_http: Some(base_url),
6994 is_testnet: true,
6995 ..Default::default()
6996 };
6997
6998 let http_client = DydxHttpClient::new(
6999 config.base_url_http.clone(),
7000 config.http_timeout_secs,
7001 config.http_proxy_url.clone(),
7002 config.is_testnet,
7003 None,
7004 )
7005 .unwrap();
7006
7007 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7008
7009 let instrument = create_test_instrument_any();
7010 let instrument_id = instrument.id();
7011 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7012 client.instruments.insert(symbol_key, instrument);
7013
7014 let request = RequestTrades::new(
7015 instrument_id,
7016 None,
7017 None,
7018 None,
7019 Some(client_id),
7020 UUID4::new(),
7021 get_atomic_clock_realtime().get_time_ns(),
7022 None,
7023 );
7024
7025 assert!(client.request_trades(&request).is_ok());
7026
7027 let timeout = tokio::time::Duration::from_millis(500);
7028 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7029 tokio::time::timeout(timeout, rx.recv()).await
7030 {
7031 let trade_ticks: Vec<TradeTick> = resp.data;
7033 assert_eq!(trade_ticks.len(), 2, "Should contain 2 TradeTick elements");
7034
7035 for tick in &trade_ticks {
7037 assert_eq!(tick.instrument_id, instrument_id);
7038 }
7039 }
7040 }
7041
7042 #[tokio::test]
7043 async fn test_trades_response_has_correct_instrument_id() {
7044 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7046 set_data_event_sender(sender);
7047
7048 let created_at = Utc::now();
7049 let http_trade = crate::http::models::Trade {
7050 id: "instrument-id-test".to_string(),
7051 side: OrderSide::Buy,
7052 size: dec!(1.0),
7053 price: dec!(100.0),
7054 created_at,
7055 created_at_height: 100,
7056 trade_type: crate::common::enums::DydxTradeType::Limit,
7057 };
7058
7059 let trades_response = crate::http::models::TradesResponse {
7060 trades: vec![http_trade],
7061 };
7062
7063 let state = TradesTestState {
7064 response: Arc::new(trades_response),
7065 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7066 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7067 };
7068
7069 let addr = start_trades_test_server(state).await;
7070 let base_url = format!("http://{addr}");
7071
7072 let client_id = ClientId::from("DYDX-INSTID-TEST");
7073 let config = DydxDataClientConfig {
7074 base_url_http: Some(base_url),
7075 is_testnet: true,
7076 ..Default::default()
7077 };
7078
7079 let http_client = DydxHttpClient::new(
7080 config.base_url_http.clone(),
7081 config.http_timeout_secs,
7082 config.http_proxy_url.clone(),
7083 config.is_testnet,
7084 None,
7085 )
7086 .unwrap();
7087
7088 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7089
7090 let instrument = create_test_instrument_any();
7091 let instrument_id = instrument.id();
7092 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7093 client.instruments.insert(symbol_key, instrument);
7094
7095 let request = RequestTrades::new(
7096 instrument_id,
7097 None,
7098 None,
7099 None,
7100 Some(client_id),
7101 UUID4::new(),
7102 get_atomic_clock_realtime().get_time_ns(),
7103 None,
7104 );
7105
7106 assert!(client.request_trades(&request).is_ok());
7107
7108 let timeout = tokio::time::Duration::from_millis(500);
7109 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7110 tokio::time::timeout(timeout, rx.recv()).await
7111 {
7112 assert_eq!(
7114 resp.instrument_id, instrument_id,
7115 "TradesResponse.instrument_id should match request"
7116 );
7117
7118 for tick in &resp.data {
7120 assert_eq!(
7121 tick.instrument_id, instrument_id,
7122 "Each TradeTick should have correct instrument_id"
7123 );
7124 }
7125 }
7126 }
7127
7128 #[tokio::test]
7129 async fn test_trades_response_properly_ordered_by_timestamp() {
7130 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7132 set_data_event_sender(sender);
7133
7134 let base_time = Utc::now();
7135 let http_trades = vec![
7136 crate::http::models::Trade {
7137 id: "trade-oldest".to_string(),
7138 side: OrderSide::Buy,
7139 size: dec!(1.0),
7140 price: dec!(100.0),
7141 created_at: base_time,
7142 created_at_height: 100,
7143 trade_type: crate::common::enums::DydxTradeType::Limit,
7144 },
7145 crate::http::models::Trade {
7146 id: "trade-middle".to_string(),
7147 side: OrderSide::Sell,
7148 size: dec!(2.0),
7149 price: dec!(101.0),
7150 created_at: base_time + chrono::Duration::seconds(1),
7151 created_at_height: 101,
7152 trade_type: crate::common::enums::DydxTradeType::Limit,
7153 },
7154 crate::http::models::Trade {
7155 id: "trade-newest".to_string(),
7156 side: OrderSide::Buy,
7157 size: dec!(3.0),
7158 price: dec!(102.0),
7159 created_at: base_time + chrono::Duration::seconds(2),
7160 created_at_height: 102,
7161 trade_type: crate::common::enums::DydxTradeType::Limit,
7162 },
7163 ];
7164
7165 let trades_response = crate::http::models::TradesResponse {
7166 trades: http_trades,
7167 };
7168
7169 let state = TradesTestState {
7170 response: Arc::new(trades_response),
7171 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7172 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7173 };
7174
7175 let addr = start_trades_test_server(state).await;
7176 let base_url = format!("http://{addr}");
7177
7178 let client_id = ClientId::from("DYDX-ORDER-TEST");
7179 let config = DydxDataClientConfig {
7180 base_url_http: Some(base_url),
7181 is_testnet: true,
7182 ..Default::default()
7183 };
7184
7185 let http_client = DydxHttpClient::new(
7186 config.base_url_http.clone(),
7187 config.http_timeout_secs,
7188 config.http_proxy_url.clone(),
7189 config.is_testnet,
7190 None,
7191 )
7192 .unwrap();
7193
7194 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7195
7196 let instrument = create_test_instrument_any();
7197 let instrument_id = instrument.id();
7198 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7199 client.instruments.insert(symbol_key, instrument);
7200
7201 let request = RequestTrades::new(
7202 instrument_id,
7203 None,
7204 None,
7205 None,
7206 Some(client_id),
7207 UUID4::new(),
7208 get_atomic_clock_realtime().get_time_ns(),
7209 None,
7210 );
7211
7212 assert!(client.request_trades(&request).is_ok());
7213
7214 let timeout = tokio::time::Duration::from_millis(500);
7215 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7216 tokio::time::timeout(timeout, rx.recv()).await
7217 {
7218 let trade_ticks = resp.data;
7220 assert_eq!(trade_ticks.len(), 3, "Should have 3 trades");
7221
7222 for i in 1..trade_ticks.len() {
7224 assert!(
7225 trade_ticks[i].ts_event >= trade_ticks[i - 1].ts_event,
7226 "Trades should be ordered by timestamp (ts_event) in ascending order"
7227 );
7228 }
7229
7230 assert!(
7232 trade_ticks[0].ts_event < trade_ticks[1].ts_event,
7233 "First trade should be before second"
7234 );
7235 assert!(
7236 trade_ticks[1].ts_event < trade_ticks[2].ts_event,
7237 "Second trade should be before third"
7238 );
7239 }
7240 }
7241
7242 #[tokio::test]
7243 async fn test_trades_response_all_trade_tick_fields_populated() {
7244 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7246 set_data_event_sender(sender);
7247
7248 let created_at = Utc::now();
7249 let http_trade = crate::http::models::Trade {
7250 id: "field-test".to_string(),
7251 side: OrderSide::Buy,
7252 size: dec!(5.5),
7253 price: dec!(12345.67),
7254 created_at,
7255 created_at_height: 999,
7256 trade_type: crate::common::enums::DydxTradeType::Limit,
7257 };
7258
7259 let trades_response = crate::http::models::TradesResponse {
7260 trades: vec![http_trade],
7261 };
7262
7263 let state = TradesTestState {
7264 response: Arc::new(trades_response),
7265 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7266 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7267 };
7268
7269 let addr = start_trades_test_server(state).await;
7270 let base_url = format!("http://{addr}");
7271
7272 let client_id = ClientId::from("DYDX-FIELDS-TEST");
7273 let config = DydxDataClientConfig {
7274 base_url_http: Some(base_url),
7275 is_testnet: true,
7276 ..Default::default()
7277 };
7278
7279 let http_client = DydxHttpClient::new(
7280 config.base_url_http.clone(),
7281 config.http_timeout_secs,
7282 config.http_proxy_url.clone(),
7283 config.is_testnet,
7284 None,
7285 )
7286 .unwrap();
7287
7288 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7289
7290 let instrument = create_test_instrument_any();
7291 let instrument_id = instrument.id();
7292 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7293 client.instruments.insert(symbol_key, instrument);
7294
7295 let request = RequestTrades::new(
7296 instrument_id,
7297 None,
7298 None,
7299 None,
7300 Some(client_id),
7301 UUID4::new(),
7302 get_atomic_clock_realtime().get_time_ns(),
7303 None,
7304 );
7305
7306 assert!(client.request_trades(&request).is_ok());
7307
7308 let timeout = tokio::time::Duration::from_millis(500);
7309 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7310 tokio::time::timeout(timeout, rx.recv()).await
7311 {
7312 assert_eq!(resp.data.len(), 1, "Should have 1 trade");
7313 let tick = &resp.data[0];
7314
7315 assert_eq!(
7317 tick.instrument_id, instrument_id,
7318 "instrument_id should be set"
7319 );
7320 assert!(tick.price.as_f64() > 0.0, "price should be positive");
7321 assert!(tick.size.as_f64() > 0.0, "size should be positive");
7322
7323 match tick.aggressor_side {
7325 AggressorSide::Buyer | AggressorSide::Seller => {
7326 }
7328 AggressorSide::NoAggressor => {
7329 panic!("aggressor_side should be Buyer or Seller, not NoAggressor")
7330 }
7331 }
7332
7333 assert!(
7335 !tick.trade_id.to_string().is_empty(),
7336 "trade_id should be set"
7337 );
7338
7339 assert!(tick.ts_event > 0, "ts_event should be set");
7341 assert!(tick.ts_init > 0, "ts_init should be set");
7342 assert!(
7343 tick.ts_init >= tick.ts_event,
7344 "ts_init should be >= ts_event"
7345 );
7346 }
7347 }
7348
7349 #[tokio::test]
7350 async fn test_trades_response_includes_metadata() {
7351 let (sender, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7353 set_data_event_sender(sender);
7354
7355 let created_at = Utc::now();
7356 let http_trade = crate::http::models::Trade {
7357 id: "metadata-test".to_string(),
7358 side: OrderSide::Buy,
7359 size: dec!(1.0),
7360 price: dec!(100.0),
7361 created_at,
7362 created_at_height: 100,
7363 trade_type: crate::common::enums::DydxTradeType::Limit,
7364 };
7365
7366 let trades_response = crate::http::models::TradesResponse {
7367 trades: vec![http_trade],
7368 };
7369
7370 let state = TradesTestState {
7371 response: Arc::new(trades_response),
7372 last_ticker: Arc::new(tokio::sync::Mutex::new(None)),
7373 last_limit: Arc::new(tokio::sync::Mutex::new(None)),
7374 };
7375
7376 let addr = start_trades_test_server(state).await;
7377 let base_url = format!("http://{addr}");
7378
7379 let client_id = ClientId::from("DYDX-META-TEST");
7380 let config = DydxDataClientConfig {
7381 base_url_http: Some(base_url),
7382 is_testnet: true,
7383 ..Default::default()
7384 };
7385
7386 let http_client = DydxHttpClient::new(
7387 config.base_url_http.clone(),
7388 config.http_timeout_secs,
7389 config.http_proxy_url.clone(),
7390 config.is_testnet,
7391 None,
7392 )
7393 .unwrap();
7394
7395 let client = DydxDataClient::new(client_id, config, http_client, None).unwrap();
7396
7397 let instrument = create_test_instrument_any();
7398 let instrument_id = instrument.id();
7399 let symbol_key = Ustr::from(instrument_id.symbol.as_str());
7400 client.instruments.insert(symbol_key, instrument);
7401
7402 let request_id = UUID4::new();
7403 let request = RequestTrades::new(
7404 instrument_id,
7405 None,
7406 None,
7407 None,
7408 Some(client_id),
7409 request_id,
7410 get_atomic_clock_realtime().get_time_ns(),
7411 None,
7412 );
7413
7414 assert!(client.request_trades(&request).is_ok());
7415
7416 let timeout = tokio::time::Duration::from_millis(500);
7417 if let Ok(Some(DataEvent::Response(DataResponse::Trades(resp)))) =
7418 tokio::time::timeout(timeout, rx.recv()).await
7419 {
7420 assert_eq!(
7422 resp.correlation_id, request_id,
7423 "correlation_id should match request"
7424 );
7425 assert_eq!(resp.client_id, client_id, "client_id should be set");
7426 assert_eq!(
7427 resp.instrument_id, instrument_id,
7428 "instrument_id should be set"
7429 );
7430 assert!(resp.ts_init > 0, "ts_init should be set");
7431
7432 let _start = resp.start;
7433 let _end = resp.end;
7434 let _params = resp.params;
7435 }
7436 }
7437
7438 #[tokio::test]
7439 async fn test_orderbook_cache_growth_with_many_instruments() {
7440 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7441 set_data_event_sender(sender);
7442
7443 let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7444 let config = DydxDataClientConfig {
7445 base_url_http: Some(base_url),
7446 is_testnet: true,
7447 ..Default::default()
7448 };
7449
7450 let http_client = DydxHttpClient::new(
7451 config.base_url_http.clone(),
7452 config.http_timeout_secs,
7453 config.http_proxy_url.clone(),
7454 config.is_testnet,
7455 None,
7456 )
7457 .unwrap();
7458
7459 let client =
7460 DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7461
7462 let initial_capacity = client.order_books.capacity();
7463
7464 for i in 0..100 {
7465 let symbol = format!("INSTRUMENT-{i}");
7466 let instrument_id = InstrumentId::from(format!("{symbol}-PERP.DYDX").as_str());
7467 client.order_books.insert(
7468 instrument_id,
7469 OrderBook::new(instrument_id, BookType::L2_MBP),
7470 );
7471 }
7472
7473 assert_eq!(client.order_books.len(), 100);
7474 assert!(client.order_books.capacity() >= initial_capacity);
7475
7476 client.order_books.clear();
7477 assert_eq!(client.order_books.len(), 0);
7478 }
7479
7480 #[rstest]
7481 fn test_instrument_id_validation_rejects_invalid_formats() {
7482 let test_cases = vec![
7484 ("", "Empty string missing separator"),
7485 ("INVALID", "No venue separator"),
7486 ("NO-VENUE", "No venue separator"),
7487 (".DYDX", "Empty symbol"),
7488 ("SYMBOL.", "Empty venue"),
7489 ];
7490
7491 for (invalid_id, description) in test_cases {
7492 let result = std::panic::catch_unwind(|| InstrumentId::from(invalid_id));
7493 assert!(
7494 result.is_err(),
7495 "Expected {invalid_id} to panic: {description}"
7496 );
7497 }
7498 }
7499
7500 #[rstest]
7501 fn test_instrument_id_validation_accepts_valid_formats() {
7502 let valid_ids = vec![
7503 "BTC-USD-PERP.DYDX",
7504 "ETH-USD-PERP.DYDX",
7505 "SOL-USD.DYDX",
7506 "AVAX-USD-PERP.DYDX",
7507 ];
7508
7509 for valid_id in valid_ids {
7510 let instrument_id = InstrumentId::from(valid_id);
7511 assert!(
7512 !instrument_id.symbol.as_str().is_empty()
7513 && !instrument_id.venue.as_str().is_empty(),
7514 "Expected {valid_id} to have non-empty symbol and venue"
7515 );
7516 }
7517 }
7518
7519 #[tokio::test]
7520 async fn test_request_bars_with_inverted_date_range() {
7521 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7522 set_data_event_sender(sender);
7523
7524 let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7525 let config = DydxDataClientConfig {
7526 base_url_http: Some(base_url),
7527 is_testnet: true,
7528 ..Default::default()
7529 };
7530
7531 let http_client = DydxHttpClient::new(
7532 config.base_url_http.clone(),
7533 config.http_timeout_secs,
7534 config.http_proxy_url.clone(),
7535 config.is_testnet,
7536 None,
7537 )
7538 .unwrap();
7539
7540 let client =
7541 DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7542
7543 let instrument = create_test_instrument_any();
7544 let instrument_id = instrument.id();
7545 client
7546 .instruments
7547 .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7548
7549 let spec = BarSpecification {
7550 step: std::num::NonZeroUsize::new(1).unwrap(),
7551 aggregation: BarAggregation::Minute,
7552 price_type: PriceType::Last,
7553 };
7554 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
7555
7556 let now = Utc::now();
7557 let start = Some(now);
7558 let end = Some(now - chrono::Duration::hours(1));
7559
7560 let request = RequestBars::new(
7561 bar_type,
7562 start,
7563 end,
7564 None,
7565 Some(ClientId::from("dydx_test")),
7566 UUID4::new(),
7567 get_atomic_clock_realtime().get_time_ns(),
7568 None,
7569 );
7570
7571 let result = client.request_bars(&request);
7572 assert!(result.is_ok());
7573 }
7574
7575 #[tokio::test]
7576 async fn test_request_bars_with_zero_limit() {
7577 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7578 set_data_event_sender(sender);
7579
7580 let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7581 let config = DydxDataClientConfig {
7582 base_url_http: Some(base_url),
7583 is_testnet: true,
7584 ..Default::default()
7585 };
7586
7587 let http_client = DydxHttpClient::new(
7588 config.base_url_http.clone(),
7589 config.http_timeout_secs,
7590 config.http_proxy_url.clone(),
7591 config.is_testnet,
7592 None,
7593 )
7594 .unwrap();
7595
7596 let client =
7597 DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7598
7599 let instrument = create_test_instrument_any();
7600 let instrument_id = instrument.id();
7601 client
7602 .instruments
7603 .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7604
7605 let spec = BarSpecification {
7606 step: std::num::NonZeroUsize::new(1).unwrap(),
7607 aggregation: BarAggregation::Minute,
7608 price_type: PriceType::Last,
7609 };
7610 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
7611
7612 let request = RequestBars::new(
7613 bar_type,
7614 None,
7615 None,
7616 Some(std::num::NonZeroUsize::new(1).unwrap()),
7617 Some(ClientId::from("dydx_test")),
7618 UUID4::new(),
7619 get_atomic_clock_realtime().get_time_ns(),
7620 None,
7621 );
7622
7623 let result = client.request_bars(&request);
7624 assert!(result.is_ok());
7625 }
7626
7627 #[tokio::test]
7628 async fn test_request_trades_with_excessive_limit() {
7629 let (sender, _rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
7630 set_data_event_sender(sender);
7631
7632 let base_url = String::from("https://indexer.v4testnet.dydx.exchange");
7633 let config = DydxDataClientConfig {
7634 base_url_http: Some(base_url),
7635 is_testnet: true,
7636 ..Default::default()
7637 };
7638
7639 let http_client = DydxHttpClient::new(
7640 config.base_url_http.clone(),
7641 config.http_timeout_secs,
7642 config.http_proxy_url.clone(),
7643 config.is_testnet,
7644 None,
7645 )
7646 .unwrap();
7647
7648 let client =
7649 DydxDataClient::new(ClientId::from("dydx_test"), config, http_client, None).unwrap();
7650
7651 let instrument = create_test_instrument_any();
7652 let instrument_id = instrument.id();
7653 client
7654 .instruments
7655 .insert(Ustr::from(instrument_id.symbol.as_str()), instrument);
7656
7657 let request = RequestTrades::new(
7658 instrument_id,
7659 None,
7660 None,
7661 Some(std::num::NonZeroUsize::new(100_000).unwrap()),
7662 Some(ClientId::from("dydx_test")),
7663 UUID4::new(),
7664 get_atomic_clock_realtime().get_time_ns(),
7665 None,
7666 );
7667
7668 let result = client.request_trades(&request);
7669 assert!(result.is_ok());
7670 }
7671
7672 #[rstest]
7673 fn test_candle_topic_format() {
7674 let instrument_id = InstrumentId::new(Symbol::from("BTC-USD-PERP"), Venue::from("DYDX"));
7675 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
7676 let resolution = "1MIN";
7677 let topic = format!("{ticker}/{resolution}");
7678
7679 assert_eq!(topic, "BTC-USD/1MIN");
7680 assert!(!topic.contains("-PERP"));
7681 assert!(!topic.contains(".DYDX"));
7682 }
7683}