1use std::{
54 collections::HashMap,
55 fmt::{Debug, Formatter},
56 num::NonZeroU32,
57 sync::{
58 Arc, LazyLock,
59 atomic::{AtomicBool, Ordering},
60 },
61};
62
63use chrono::{DateTime, Utc};
64use dashmap::DashMap;
65use nautilus_core::consts::NAUTILUS_USER_AGENT;
66use nautilus_model::{
67 identifiers::InstrumentId,
68 instruments::{Instrument, InstrumentAny},
69};
70use nautilus_network::{
71 http::HttpClient,
72 ratelimiter::quota::Quota,
73 retry::{RetryConfig, RetryManager},
74};
75use reqwest::{Method, header::USER_AGENT};
76use serde::{Deserialize, Serialize, de::DeserializeOwned};
77use tokio_util::sync::CancellationToken;
78use ustr::Ustr;
79
80use super::error::DydxHttpError;
81use crate::common::{
82 consts::{DYDX_HTTP_URL, DYDX_TESTNET_HTTP_URL},
83 enums::DydxCandleResolution,
84};
85
86pub static DYDX_REST_QUOTA: LazyLock<Quota> =
92 LazyLock::new(|| Quota::per_second(NonZeroU32::new(10).unwrap()));
93
94#[derive(Debug, Serialize, Deserialize)]
99pub struct DydxResponse<T> {
100 pub data: T,
102}
103
104pub struct DydxRawHttpClient {
114 base_url: String,
115 client: HttpClient,
116 retry_manager: RetryManager<DydxHttpError>,
117 cancellation_token: CancellationToken,
118 is_testnet: bool,
119}
120
121impl Default for DydxRawHttpClient {
122 fn default() -> Self {
123 Self::new(None, Some(60), None, false, None)
124 .expect("Failed to create default DydxRawHttpClient")
125 }
126}
127
128impl Debug for DydxRawHttpClient {
129 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
130 f.debug_struct(stringify!(DydxRawHttpClient))
131 .field("base_url", &self.base_url)
132 .field("is_testnet", &self.is_testnet)
133 .finish_non_exhaustive()
134 }
135}
136
137impl DydxRawHttpClient {
138 pub fn cancel_all_requests(&self) {
140 self.cancellation_token.cancel();
141 }
142
143 pub fn cancellation_token(&self) -> &CancellationToken {
145 &self.cancellation_token
146 }
147
148 pub fn new(
157 base_url: Option<String>,
158 timeout_secs: Option<u64>,
159 proxy_url: Option<String>,
160 is_testnet: bool,
161 retry_config: Option<RetryConfig>,
162 ) -> anyhow::Result<Self> {
163 let base_url = if is_testnet {
164 base_url.unwrap_or_else(|| DYDX_TESTNET_HTTP_URL.to_string())
165 } else {
166 base_url.unwrap_or_else(|| DYDX_HTTP_URL.to_string())
167 };
168
169 let retry_manager = RetryManager::new(retry_config.unwrap_or_default());
170
171 let mut headers = HashMap::new();
173 headers.insert(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
174
175 let client = HttpClient::new(
176 headers,
177 vec![], vec![], Some(*DYDX_REST_QUOTA),
180 timeout_secs,
181 proxy_url,
182 )
183 .map_err(|e| {
184 DydxHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
185 })?;
186
187 Ok(Self {
188 base_url,
189 client,
190 retry_manager,
191 cancellation_token: CancellationToken::new(),
192 is_testnet,
193 })
194 }
195
196 #[must_use]
198 pub const fn is_testnet(&self) -> bool {
199 self.is_testnet
200 }
201
202 #[must_use]
204 pub fn base_url(&self) -> &str {
205 &self.base_url
206 }
207
208 pub async fn send_request<T>(
220 &self,
221 method: Method,
222 endpoint: &str,
223 query_params: Option<&str>,
224 ) -> Result<T, DydxHttpError>
225 where
226 T: DeserializeOwned,
227 {
228 let url = if let Some(params) = query_params {
229 format!("{}{endpoint}?{params}", self.base_url)
230 } else {
231 format!("{}{endpoint}", self.base_url)
232 };
233
234 let operation = || async {
235 let request = self
236 .client
237 .request_with_ustr_keys(
238 method.clone(),
239 url.clone(),
240 None, None, None, None, None, )
246 .await
247 .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
248
249 if !request.status.is_success() {
251 return Err(DydxHttpError::HttpStatus {
252 status: request.status.as_u16(),
253 message: String::from_utf8_lossy(&request.body).to_string(),
254 });
255 }
256
257 Ok(request)
258 };
259
260 let should_retry = |error: &DydxHttpError| -> bool {
265 match error {
266 DydxHttpError::HttpClientError(_) => true,
267 DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
268 _ => false,
269 }
270 };
271
272 let create_error = |msg: String| -> DydxHttpError {
273 if msg == "canceled" {
274 DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
275 } else {
276 DydxHttpError::ValidationError(msg)
277 }
278 };
279
280 let response = self
282 .retry_manager
283 .execute_with_retry_with_cancel(
284 endpoint,
285 operation,
286 should_retry,
287 create_error,
288 &self.cancellation_token,
289 )
290 .await?;
291
292 serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
294 error: e.to_string(),
295 body: String::from_utf8_lossy(&response.body).to_string(),
296 })
297 }
298
299 pub async fn send_post_request<T, B>(
312 &self,
313 endpoint: &str,
314 body: &B,
315 ) -> Result<T, DydxHttpError>
316 where
317 T: DeserializeOwned,
318 B: Serialize,
319 {
320 let url = format!("{}{endpoint}", self.base_url);
321
322 let body_bytes = serde_json::to_vec(body).map_err(|e| DydxHttpError::Serialization {
323 error: e.to_string(),
324 })?;
325
326 let operation = || async {
327 let request = self
328 .client
329 .request_with_ustr_keys(
330 Method::POST,
331 url.clone(),
332 None, None, Some(body_bytes.clone()),
335 None, None, )
338 .await
339 .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
340
341 if !request.status.is_success() {
343 return Err(DydxHttpError::HttpStatus {
344 status: request.status.as_u16(),
345 message: String::from_utf8_lossy(&request.body).to_string(),
346 });
347 }
348
349 Ok(request)
350 };
351
352 let should_retry = |error: &DydxHttpError| -> bool {
354 match error {
355 DydxHttpError::HttpClientError(_) => true,
356 DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
357 _ => false,
358 }
359 };
360
361 let create_error = |msg: String| -> DydxHttpError {
362 if msg == "canceled" {
363 DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
364 } else {
365 DydxHttpError::ValidationError(msg)
366 }
367 };
368
369 let response = self
371 .retry_manager
372 .execute_with_retry_with_cancel(
373 endpoint,
374 operation,
375 should_retry,
376 create_error,
377 &self.cancellation_token,
378 )
379 .await?;
380
381 serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
383 error: e.to_string(),
384 body: String::from_utf8_lossy(&response.body).to_string(),
385 })
386 }
387
388 pub async fn get_markets(&self) -> Result<super::models::MarketsResponse, DydxHttpError> {
398 self.send_request(Method::GET, "/v4/perpetualMarkets", None)
399 .await
400 }
401
402 pub async fn fetch_instruments(
415 &self,
416 maker_fee: Option<rust_decimal::Decimal>,
417 taker_fee: Option<rust_decimal::Decimal>,
418 ) -> Result<Vec<InstrumentAny>, DydxHttpError> {
419 use nautilus_core::time::get_atomic_clock_realtime;
420
421 let markets_response = self.get_markets().await?;
422 let ts_init = get_atomic_clock_realtime().get_time_ns();
423
424 let mut instruments = Vec::new();
425 let mut skipped = 0;
426
427 for (ticker, market) in markets_response.markets {
428 match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
429 Ok(instrument) => {
430 instruments.push(instrument);
431 }
432 Err(e) => {
433 tracing::warn!("Failed to parse instrument {ticker}: {e}");
434 skipped += 1;
435 }
436 }
437 }
438
439 if skipped > 0 {
440 tracing::info!(
441 "Parsed {} instruments, skipped {} (inactive or invalid)",
442 instruments.len(),
443 skipped
444 );
445 } else {
446 tracing::info!("Parsed {} instruments", instruments.len());
447 }
448
449 Ok(instruments)
450 }
451
452 pub async fn get_orderbook(
462 &self,
463 ticker: &str,
464 ) -> Result<super::models::OrderbookResponse, DydxHttpError> {
465 let endpoint = format!("/v4/orderbooks/perpetualMarket/{ticker}");
466 self.send_request(Method::GET, &endpoint, None).await
467 }
468
469 pub async fn get_trades(
475 &self,
476 ticker: &str,
477 limit: Option<u32>,
478 ) -> Result<super::models::TradesResponse, DydxHttpError> {
479 let endpoint = format!("/v4/trades/perpetualMarket/{ticker}");
480 let query = limit.map(|l| format!("limit={l}"));
481 self.send_request(Method::GET, &endpoint, query.as_deref())
482 .await
483 }
484
485 pub async fn get_candles(
491 &self,
492 ticker: &str,
493 resolution: DydxCandleResolution,
494 limit: Option<u32>,
495 from_iso: Option<DateTime<Utc>>,
496 to_iso: Option<DateTime<Utc>>,
497 ) -> Result<super::models::CandlesResponse, DydxHttpError> {
498 let endpoint = format!("/v4/candles/perpetualMarkets/{ticker}");
499 let mut query_parts = vec![format!("resolution={}", resolution)];
500 if let Some(l) = limit {
501 query_parts.push(format!("limit={l}"));
502 }
503 if let Some(from) = from_iso {
504 let from_str = from.to_rfc3339();
505 query_parts.push(format!("fromISO={}", urlencoding::encode(&from_str)));
506 }
507 if let Some(to) = to_iso {
508 let to_str = to.to_rfc3339();
509 query_parts.push(format!("toISO={}", urlencoding::encode(&to_str)));
510 }
511 let query = query_parts.join("&");
512 self.send_request(Method::GET, &endpoint, Some(&query))
513 .await
514 }
515
516 pub async fn get_subaccount(
526 &self,
527 address: &str,
528 subaccount_number: u32,
529 ) -> Result<super::models::SubaccountResponse, DydxHttpError> {
530 let endpoint = format!("/v4/addresses/{address}/subaccountNumber/{subaccount_number}");
531 self.send_request(Method::GET, &endpoint, None).await
532 }
533
534 pub async fn get_fills(
540 &self,
541 address: &str,
542 subaccount_number: u32,
543 market: Option<&str>,
544 limit: Option<u32>,
545 ) -> Result<super::models::FillsResponse, DydxHttpError> {
546 let endpoint = "/v4/fills";
547 let mut query_parts = vec![
548 format!("address={address}"),
549 format!("subaccountNumber={subaccount_number}"),
550 ];
551 if let Some(m) = market {
552 query_parts.push(format!("market={m}"));
553 }
554 if let Some(l) = limit {
555 query_parts.push(format!("limit={l}"));
556 }
557 let query = query_parts.join("&");
558 self.send_request(Method::GET, endpoint, Some(&query)).await
559 }
560
561 pub async fn get_orders(
567 &self,
568 address: &str,
569 subaccount_number: u32,
570 market: Option<&str>,
571 limit: Option<u32>,
572 ) -> Result<super::models::OrdersResponse, DydxHttpError> {
573 let endpoint = "/v4/orders";
574 let mut query_parts = vec![
575 format!("address={address}"),
576 format!("subaccountNumber={subaccount_number}"),
577 ];
578 if let Some(m) = market {
579 query_parts.push(format!("market={m}"));
580 }
581 if let Some(l) = limit {
582 query_parts.push(format!("limit={l}"));
583 }
584 let query = query_parts.join("&");
585 self.send_request(Method::GET, endpoint, Some(&query)).await
586 }
587
588 pub async fn get_transfers(
594 &self,
595 address: &str,
596 subaccount_number: u32,
597 limit: Option<u32>,
598 ) -> Result<super::models::TransfersResponse, DydxHttpError> {
599 let endpoint = "/v4/transfers";
600 let mut query_parts = vec![
601 format!("address={address}"),
602 format!("subaccountNumber={subaccount_number}"),
603 ];
604 if let Some(l) = limit {
605 query_parts.push(format!("limit={l}"));
606 }
607 let query = query_parts.join("&");
608 self.send_request(Method::GET, endpoint, Some(&query)).await
609 }
610
611 pub async fn get_time(&self) -> Result<super::models::TimeResponse, DydxHttpError> {
621 self.send_request(Method::GET, "/v4/time", None).await
622 }
623
624 pub async fn get_height(&self) -> Result<super::models::HeightResponse, DydxHttpError> {
630 self.send_request(Method::GET, "/v4/height", None).await
631 }
632}
633
634#[derive(Debug)]
650#[cfg_attr(
651 feature = "python",
652 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
653)]
654pub struct DydxHttpClient {
655 pub(crate) inner: Arc<DydxRawHttpClient>,
657 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
659 pub(crate) clob_pair_id_to_instrument: Arc<DashMap<u32, InstrumentId>>,
664 cache_initialized: AtomicBool,
666}
667
668impl Clone for DydxHttpClient {
669 fn clone(&self) -> Self {
670 let cache_initialized = AtomicBool::new(false);
671 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
672 if is_initialized {
673 cache_initialized.store(true, Ordering::Release);
674 }
675
676 Self {
677 inner: self.inner.clone(),
678 instruments_cache: self.instruments_cache.clone(),
679 clob_pair_id_to_instrument: self.clob_pair_id_to_instrument.clone(),
680 cache_initialized,
681 }
682 }
683}
684
685impl Default for DydxHttpClient {
686 fn default() -> Self {
687 Self::new(None, Some(60), None, false, None)
688 .expect("Failed to create default DydxHttpClient")
689 }
690}
691
692impl DydxHttpClient {
693 pub fn new(
703 base_url: Option<String>,
704 timeout_secs: Option<u64>,
705 proxy_url: Option<String>,
706 is_testnet: bool,
707 retry_config: Option<RetryConfig>,
708 ) -> anyhow::Result<Self> {
709 Ok(Self {
710 inner: Arc::new(DydxRawHttpClient::new(
711 base_url,
712 timeout_secs,
713 proxy_url,
714 is_testnet,
715 retry_config,
716 )?),
717 instruments_cache: Arc::new(DashMap::new()),
718 clob_pair_id_to_instrument: Arc::new(DashMap::new()),
719 cache_initialized: AtomicBool::new(false),
720 })
721 }
722
723 pub async fn request_instruments(
733 &self,
734 symbol: Option<String>,
735 maker_fee: Option<rust_decimal::Decimal>,
736 taker_fee: Option<rust_decimal::Decimal>,
737 ) -> anyhow::Result<Vec<InstrumentAny>> {
738 use nautilus_core::time::get_atomic_clock_realtime;
739
740 let markets_response = self.inner.get_markets().await?;
741 let ts_init = get_atomic_clock_realtime().get_time_ns();
742
743 let mut instruments = Vec::new();
744 let mut skipped = 0;
745
746 for (ticker, market) in markets_response.markets {
747 if let Some(ref sym) = symbol
749 && ticker != *sym
750 {
751 continue;
752 }
753
754 match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
756 Ok(instrument) => {
757 instruments.push(instrument);
758 }
759 Err(e) => {
760 tracing::warn!("Failed to parse instrument {ticker}: {e}");
761 skipped += 1;
762 }
763 }
764 }
765
766 if skipped > 0 {
767 tracing::info!(
768 "Parsed {} instruments, skipped {} (inactive or invalid)",
769 instruments.len(),
770 skipped
771 );
772 } else {
773 tracing::debug!("Parsed {} instruments", instruments.len());
774 }
775
776 Ok(instruments)
777 }
778
779 pub async fn fetch_and_cache_instruments(&self) -> anyhow::Result<()> {
788 use nautilus_core::time::get_atomic_clock_realtime;
789
790 self.instruments_cache.clear();
791 self.clob_pair_id_to_instrument.clear();
792
793 let markets_response = self.inner.get_markets().await?;
794 let ts_init = get_atomic_clock_realtime().get_time_ns();
795
796 let mut instruments = Vec::new();
797 let mut skipped = 0;
798
799 for (ticker, market) in markets_response.markets {
800 match super::parse::parse_instrument_any(&market, None, None, ts_init) {
802 Ok(instrument) => {
803 let instrument_id = instrument.id();
804 let symbol = instrument_id.symbol.inner();
805 self.instruments_cache.insert(symbol, instrument.clone());
806
807 self.clob_pair_id_to_instrument
809 .insert(market.clob_pair_id, instrument_id);
810
811 instruments.push(instrument);
812 }
813 Err(e) => {
814 tracing::warn!("Failed to parse instrument {ticker}: {e}");
815 skipped += 1;
816 }
817 }
818 }
819
820 if !instruments.is_empty() {
821 self.cache_initialized.store(true, Ordering::Release);
822 }
823
824 if skipped > 0 {
825 tracing::info!(
826 "Cached {} instruments, skipped {} (inactive or invalid)",
827 instruments.len(),
828 skipped
829 );
830 } else {
831 tracing::info!("Cached {} instruments", instruments.len());
832 }
833
834 Ok(())
835 }
836
837 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
841 for inst in instruments {
842 let symbol = inst.id().symbol.inner();
843 self.instruments_cache.insert(symbol, inst);
844 }
845 self.cache_initialized.store(true, Ordering::Release);
846 }
847
848 pub fn cache_instrument(&self, instrument: InstrumentAny) {
852 let symbol = instrument.id().symbol.inner();
853 self.instruments_cache.insert(symbol, instrument);
854 self.cache_initialized.store(true, Ordering::Release);
855 }
856
857 #[must_use]
859 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
860 self.instruments_cache
861 .get(symbol)
862 .map(|entry| entry.clone())
863 }
864
865 #[must_use]
869 pub fn get_instrument_by_clob_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
870 let instrument_id = self
872 .clob_pair_id_to_instrument
873 .get(&clob_pair_id)
874 .map(|entry| *entry)?;
875
876 self.get_instrument(&instrument_id.symbol.inner())
878 }
879
880 pub async fn request_trades(
889 &self,
890 symbol: &str,
891 limit: Option<u32>,
892 ) -> anyhow::Result<super::models::TradesResponse> {
893 self.inner
894 .get_trades(symbol, limit)
895 .await
896 .map_err(Into::into)
897 }
898
899 pub async fn request_candles(
908 &self,
909 symbol: &str,
910 resolution: DydxCandleResolution,
911 limit: Option<u32>,
912 from_iso: Option<DateTime<Utc>>,
913 to_iso: Option<DateTime<Utc>>,
914 ) -> anyhow::Result<super::models::CandlesResponse> {
915 self.inner
916 .get_candles(symbol, resolution, limit, from_iso, to_iso)
917 .await
918 .map_err(Into::into)
919 }
920
921 #[allow(dead_code)]
934 async fn instrument_or_fetch(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
935 if let Some(instrument) = self.get_instrument(&symbol) {
936 return Ok(instrument);
937 }
938
939 let instruments = self
941 .request_instruments(Some(symbol.to_string()), None, None)
942 .await?;
943
944 instruments
945 .into_iter()
946 .next()
947 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {symbol}"))
948 }
949
950 #[must_use]
956 pub fn raw_client(&self) -> &Arc<DydxRawHttpClient> {
957 &self.inner
958 }
959
960 #[must_use]
962 pub fn is_testnet(&self) -> bool {
963 self.inner.is_testnet()
964 }
965
966 #[must_use]
968 pub fn base_url(&self) -> &str {
969 self.inner.base_url()
970 }
971
972 #[must_use]
974 pub fn is_cache_initialized(&self) -> bool {
975 self.cache_initialized.load(Ordering::Acquire)
976 }
977
978 #[must_use]
980 pub fn cached_instruments_count(&self) -> usize {
981 self.instruments_cache.len()
982 }
983
984 #[must_use]
986 pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
987 &self.instruments_cache
988 }
989
990 #[must_use]
995 pub fn clob_pair_id_mapping(&self) -> &Arc<DashMap<u32, InstrumentId>> {
996 &self.clob_pair_id_to_instrument
997 }
998}
999
1000#[cfg(test)]
1005mod tests {
1006 use nautilus_core::UnixNanos;
1007 use rstest::rstest;
1008
1009 use super::*;
1010 use crate::http::error;
1011
1012 #[tokio::test]
1017 async fn test_raw_client_creation() {
1018 let client = DydxRawHttpClient::new(None, Some(30), None, false, None);
1019 assert!(client.is_ok());
1020
1021 let client = client.unwrap();
1022 assert!(!client.is_testnet());
1023 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1024 }
1025
1026 #[tokio::test]
1027 async fn test_raw_client_testnet() {
1028 let client = DydxRawHttpClient::new(None, Some(30), None, true, None);
1029 assert!(client.is_ok());
1030
1031 let client = client.unwrap();
1032 assert!(client.is_testnet());
1033 assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1034 }
1035
1036 #[tokio::test]
1041 async fn test_domain_client_creation() {
1042 let client = DydxHttpClient::new(None, Some(30), None, false, None);
1043 assert!(client.is_ok());
1044
1045 let client = client.unwrap();
1046 assert!(!client.is_testnet());
1047 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1048 assert!(!client.is_cache_initialized());
1049 assert_eq!(client.cached_instruments_count(), 0);
1050 }
1051
1052 #[tokio::test]
1053 async fn test_domain_client_testnet() {
1054 let client = DydxHttpClient::new(None, Some(30), None, true, None);
1055 assert!(client.is_ok());
1056
1057 let client = client.unwrap();
1058 assert!(client.is_testnet());
1059 assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1060 }
1061
1062 #[tokio::test]
1063 async fn test_domain_client_default() {
1064 let client = DydxHttpClient::default();
1065 assert!(!client.is_testnet());
1066 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1067 assert!(!client.is_cache_initialized());
1068 }
1069
1070 #[tokio::test]
1071 async fn test_domain_client_clone() {
1072 let client = DydxHttpClient::new(None, Some(30), None, false, None).unwrap();
1073
1074 let cloned = client.clone();
1076 assert!(!cloned.is_cache_initialized());
1077
1078 client.cache_initialized.store(true, Ordering::Release);
1080
1081 #[allow(clippy::redundant_clone)]
1083 let cloned_after = client.clone();
1084 assert!(cloned_after.is_cache_initialized());
1085 }
1086
1087 #[rstest]
1088 fn test_domain_client_cache_instrument() {
1089 use nautilus_model::{
1090 identifiers::{InstrumentId, Symbol},
1091 instruments::CryptoPerpetual,
1092 types::{Currency, Price, Quantity},
1093 };
1094
1095 let client = DydxHttpClient::default();
1096 assert_eq!(client.cached_instruments_count(), 0);
1097
1098 let instrument_id =
1100 InstrumentId::new(Symbol::from("BTC-USD"), *crate::common::consts::DYDX_VENUE);
1101 let price = Price::from("1.0");
1102 let size = Quantity::from("0.001");
1103 let instrument = CryptoPerpetual::new(
1104 instrument_id,
1105 Symbol::from("BTC-USD"),
1106 Currency::BTC(),
1107 Currency::USD(),
1108 Currency::USD(),
1109 false,
1110 price.precision,
1111 size.precision,
1112 price,
1113 size,
1114 None,
1115 None,
1116 None,
1117 None,
1118 None,
1119 None,
1120 None,
1121 None,
1122 None,
1123 None,
1124 None,
1125 None,
1126 UnixNanos::default(),
1127 UnixNanos::default(),
1128 );
1129
1130 client.cache_instrument(InstrumentAny::CryptoPerpetual(instrument));
1132 assert_eq!(client.cached_instruments_count(), 1);
1133 assert!(client.is_cache_initialized());
1134
1135 let btc_usd = Ustr::from("BTC-USD");
1137 let cached = client.get_instrument(&btc_usd);
1138 assert!(cached.is_some());
1139 }
1140
1141 #[rstest]
1142 fn test_domain_client_get_instrument_not_found() {
1143 let client = DydxHttpClient::default();
1144 let eth_usd = Ustr::from("ETH-USD");
1145 let result = client.get_instrument(ð_usd);
1146 assert!(result.is_none());
1147 }
1148
1149 #[tokio::test]
1150 async fn test_http_timeout_respects_configuration_and_does_not_block() {
1151 use axum::{Router, routing::get};
1152 use tokio::net::TcpListener;
1153
1154 async fn slow_handler() -> &'static str {
1155 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1157 "ok"
1158 }
1159
1160 let router = Router::new().route("/v4/slow", get(slow_handler));
1161
1162 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1163 let addr = listener.local_addr().unwrap();
1164
1165 tokio::spawn(async move {
1166 axum::serve(listener, router.into_make_service())
1167 .await
1168 .unwrap();
1169 });
1170
1171 let base_url = format!("http://{}", addr);
1172
1173 let retry_config = RetryConfig {
1176 max_retries: 0,
1177 initial_delay_ms: 0,
1178 max_delay_ms: 0,
1179 backoff_factor: 1.0,
1180 jitter_ms: 0,
1181 operation_timeout_ms: Some(500),
1182 immediate_first: true,
1183 max_elapsed_ms: Some(1_000),
1184 };
1185
1186 let client =
1189 DydxRawHttpClient::new(Some(base_url), Some(60), None, false, Some(retry_config))
1190 .unwrap();
1191
1192 let start = std::time::Instant::now();
1193 let result: Result<serde_json::Value, error::DydxHttpError> =
1194 client.send_request(Method::GET, "/v4/slow", None).await;
1195 let elapsed = start.elapsed();
1196
1197 assert!(result.is_err());
1200 assert!(elapsed < std::time::Duration::from_secs(3));
1201 }
1202}