nautilus_dydx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides an ergonomic wrapper around the **dYdX v4 Indexer REST API** –
17//! <https://docs.dydx.exchange/api_integration-indexer/indexer_api>.
18//!
19//! This module exports two complementary HTTP clients following the standardized
20//! two-layer architecture pattern established in OKX, Bybit, and BitMEX adapters:
21//!
22//! - [`DydxRawHttpClient`]: Low-level HTTP methods matching dYdX Indexer API endpoints.
23//! - [`DydxHttpClient`]: High-level methods using Nautilus domain types with instrument caching.
24//!
25//! ## Two-Layer Architecture
26//!
27//! The raw client handles HTTP communication, rate limiting, retries, and basic response parsing.
28//! The domain client wraps the raw client in an `Arc`, maintains an instrument cache using `DashMap`,
29//! and provides high-level methods that work with Nautilus domain types.
30//!
31//! ## Key Responsibilities
32//!
33//! - Rate-limiting based on the public dYdX specification.
34//! - Zero-copy deserialization of large JSON payloads into domain models.
35//! - Conversion of raw exchange errors into the rich [`DydxHttpError`] enum.
36//! - Instrument caching with standard methods: `cache_instruments()`, `cache_instrument()`, `get_instrument()`.
37//!
38//! # Important Note
39//!
40//! The dYdX v4 Indexer REST API does **NOT** require authentication or request signing.
41//! All endpoints are publicly accessible using only wallet addresses and subaccount numbers
42//! as query parameters. Order submission and trading operations use gRPC with blockchain
43//! transaction signing, not REST API.
44//!
45//! # Official documentation
46//!
47//! | Endpoint                             | Reference                                              |
48//! |--------------------------------------|--------------------------------------------------------|
49//! | Market data                          | <https://docs.dydx.exchange/api_integration-indexer/indexer_api#markets> |
50//! | Account data                         | <https://docs.dydx.exchange/api_integration-indexer/indexer_api#accounts> |
51//! | Utility endpoints                    | <https://docs.dydx.exchange/api_integration-indexer/indexer_api#utility> |
52
53use std::{
54    collections::HashMap,
55    fmt::{Debug, Formatter},
56    num::NonZeroU32,
57    sync::{
58        Arc, LazyLock,
59        atomic::{AtomicBool, Ordering},
60    },
61};
62
63use chrono::{DateTime, Utc};
64use dashmap::DashMap;
65use nautilus_core::consts::NAUTILUS_USER_AGENT;
66use nautilus_model::{
67    identifiers::InstrumentId,
68    instruments::{Instrument, InstrumentAny},
69};
70use nautilus_network::{
71    http::HttpClient,
72    ratelimiter::quota::Quota,
73    retry::{RetryConfig, RetryManager},
74};
75use reqwest::{Method, header::USER_AGENT};
76use serde::{Deserialize, Serialize, de::DeserializeOwned};
77use tokio_util::sync::CancellationToken;
78use ustr::Ustr;
79
80use super::error::DydxHttpError;
81use crate::common::{
82    consts::{DYDX_HTTP_URL, DYDX_TESTNET_HTTP_URL},
83    enums::DydxCandleResolution,
84};
85
86/// Default dYdX Indexer REST API rate limit.
87///
88/// The dYdX Indexer API rate limits are generous for read-only operations:
89/// - General: 100 requests per 10 seconds per IP
90/// - We use a conservative 10 requests per second as the default quota.
91pub static DYDX_REST_QUOTA: LazyLock<Quota> =
92    LazyLock::new(|| Quota::per_second(NonZeroU32::new(10).unwrap()));
93
94/// Represents a dYdX HTTP response wrapper.
95///
96/// Most dYdX Indexer API endpoints return data directly without a wrapper,
97/// but some endpoints may use this structure for consistency.
98#[derive(Debug, Serialize, Deserialize)]
99pub struct DydxResponse<T> {
100    /// The typed data returned by the dYdX endpoint.
101    pub data: T,
102}
103
104/// Provides a raw HTTP client for interacting with the [dYdX v4](https://dydx.exchange) Indexer REST API.
105///
106/// This client wraps the underlying [`HttpClient`] to handle functionality
107/// specific to dYdX Indexer API, such as rate-limiting, forming request URLs,
108/// and deserializing responses into dYdX specific data models.
109///
110/// **Note**: Unlike traditional centralized exchanges, the dYdX v4 Indexer REST API
111/// does NOT require authentication, API keys, or request signing. All endpoints are
112/// publicly accessible.
113pub struct DydxRawHttpClient {
114    base_url: String,
115    client: HttpClient,
116    retry_manager: RetryManager<DydxHttpError>,
117    cancellation_token: CancellationToken,
118    is_testnet: bool,
119}
120
121impl Default for DydxRawHttpClient {
122    fn default() -> Self {
123        Self::new(None, Some(60), None, false, None)
124            .expect("Failed to create default DydxRawHttpClient")
125    }
126}
127
128impl Debug for DydxRawHttpClient {
129    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
130        f.debug_struct(stringify!(DydxRawHttpClient))
131            .field("base_url", &self.base_url)
132            .field("is_testnet", &self.is_testnet)
133            .finish_non_exhaustive()
134    }
135}
136
137impl DydxRawHttpClient {
138    /// Cancel all pending HTTP requests.
139    pub fn cancel_all_requests(&self) {
140        self.cancellation_token.cancel();
141    }
142
143    /// Get the cancellation token for this client.
144    pub fn cancellation_token(&self) -> &CancellationToken {
145        &self.cancellation_token
146    }
147
148    /// Creates a new [`DydxRawHttpClient`] using the default dYdX Indexer HTTP URL,
149    /// optionally overridden with a custom base URL.
150    ///
151    /// **Note**: No credentials are required as the dYdX Indexer API is publicly accessible.
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if the retry manager cannot be created.
156    pub fn new(
157        base_url: Option<String>,
158        timeout_secs: Option<u64>,
159        proxy_url: Option<String>,
160        is_testnet: bool,
161        retry_config: Option<RetryConfig>,
162    ) -> anyhow::Result<Self> {
163        let base_url = if is_testnet {
164            base_url.unwrap_or_else(|| DYDX_TESTNET_HTTP_URL.to_string())
165        } else {
166            base_url.unwrap_or_else(|| DYDX_HTTP_URL.to_string())
167        };
168
169        let retry_manager = RetryManager::new(retry_config.unwrap_or_default());
170
171        // Build headers
172        let mut headers = HashMap::new();
173        headers.insert(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
174
175        let client = HttpClient::new(
176            headers,
177            vec![], // No specific headers to extract from responses
178            vec![], // No keyed quotas (we use a single global quota)
179            Some(*DYDX_REST_QUOTA),
180            timeout_secs,
181            proxy_url,
182        )
183        .map_err(|e| {
184            DydxHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
185        })?;
186
187        Ok(Self {
188            base_url,
189            client,
190            retry_manager,
191            cancellation_token: CancellationToken::new(),
192            is_testnet,
193        })
194    }
195
196    /// Check if this client is configured for testnet.
197    #[must_use]
198    pub const fn is_testnet(&self) -> bool {
199        self.is_testnet
200    }
201
202    /// Get the base URL being used by this client.
203    #[must_use]
204    pub fn base_url(&self) -> &str {
205        &self.base_url
206    }
207
208    /// Send a request to a dYdX Indexer API endpoint.
209    ///
210    /// **Note**: dYdX Indexer API does not require authentication headers.
211    ///
212    /// # Errors
213    ///
214    /// Returns an error if:
215    /// - The HTTP request fails.
216    /// - The response has a non-success HTTP status code.
217    /// - The response body cannot be deserialized to type `T`.
218    /// - The request is canceled.
219    pub async fn send_request<T>(
220        &self,
221        method: Method,
222        endpoint: &str,
223        query_params: Option<&str>,
224    ) -> Result<T, DydxHttpError>
225    where
226        T: DeserializeOwned,
227    {
228        let url = if let Some(params) = query_params {
229            format!("{}{endpoint}?{params}", self.base_url)
230        } else {
231            format!("{}{endpoint}", self.base_url)
232        };
233
234        let operation = || async {
235            let request = self
236                .client
237                .request_with_ustr_keys(
238                    method.clone(),
239                    url.clone(),
240                    None, // No params
241                    None, // No additional headers
242                    None, // No body for GET requests
243                    None, // Use default timeout
244                    None, // No specific rate limit keys (using global quota)
245                )
246                .await
247                .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
248
249            // Check for HTTP errors
250            if !request.status.is_success() {
251                return Err(DydxHttpError::HttpStatus {
252                    status: request.status.as_u16(),
253                    message: String::from_utf8_lossy(&request.body).to_string(),
254                });
255            }
256
257            Ok(request)
258        };
259
260        // Retry strategy for dYdX Indexer API:
261        // 1. Network errors: always retry (transient connection issues)
262        // 2. HTTP 429/5xx: rate limiting and server errors should be retried
263        // 3. Client errors (4xx except 429): should NOT be retried
264        let should_retry = |error: &DydxHttpError| -> bool {
265            match error {
266                DydxHttpError::HttpClientError(_) => true,
267                DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
268                _ => false,
269            }
270        };
271
272        let create_error = |msg: String| -> DydxHttpError {
273            if msg == "canceled" {
274                DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
275            } else {
276                DydxHttpError::ValidationError(msg)
277            }
278        };
279
280        // Execute request with retry logic
281        let response = self
282            .retry_manager
283            .execute_with_retry_with_cancel(
284                endpoint,
285                operation,
286                should_retry,
287                create_error,
288                &self.cancellation_token,
289            )
290            .await?;
291
292        // Deserialize response
293        serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
294            error: e.to_string(),
295            body: String::from_utf8_lossy(&response.body).to_string(),
296        })
297    }
298
299    /// Send a POST request to a dYdX Indexer API endpoint.
300    ///
301    /// Note: Most dYdX Indexer endpoints are GET-based. POST is rarely used.
302    ///
303    /// # Errors
304    ///
305    /// Returns an error if:
306    /// - The request body cannot be serialized to JSON.
307    /// - The HTTP request fails.
308    /// - The response has a non-success HTTP status code.
309    /// - The response body cannot be deserialized to type `T`.
310    /// - The request is canceled.
311    pub async fn send_post_request<T, B>(
312        &self,
313        endpoint: &str,
314        body: &B,
315    ) -> Result<T, DydxHttpError>
316    where
317        T: DeserializeOwned,
318        B: Serialize,
319    {
320        let url = format!("{}{endpoint}", self.base_url);
321
322        let body_bytes = serde_json::to_vec(body).map_err(|e| DydxHttpError::Serialization {
323            error: e.to_string(),
324        })?;
325
326        let operation = || async {
327            let request = self
328                .client
329                .request_with_ustr_keys(
330                    Method::POST,
331                    url.clone(),
332                    None, // No params
333                    None, // No additional headers (content-type handled by body)
334                    Some(body_bytes.clone()),
335                    None, // Use default timeout
336                    None, // No specific rate limit keys (using global quota)
337                )
338                .await
339                .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
340
341            // Check for HTTP errors
342            if !request.status.is_success() {
343                return Err(DydxHttpError::HttpStatus {
344                    status: request.status.as_u16(),
345                    message: String::from_utf8_lossy(&request.body).to_string(),
346                });
347            }
348
349            Ok(request)
350        };
351
352        // Retry strategy (same as GET requests)
353        let should_retry = |error: &DydxHttpError| -> bool {
354            match error {
355                DydxHttpError::HttpClientError(_) => true,
356                DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
357                _ => false,
358            }
359        };
360
361        let create_error = |msg: String| -> DydxHttpError {
362            if msg == "canceled" {
363                DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
364            } else {
365                DydxHttpError::ValidationError(msg)
366            }
367        };
368
369        // Execute request with retry logic
370        let response = self
371            .retry_manager
372            .execute_with_retry_with_cancel(
373                endpoint,
374                operation,
375                should_retry,
376                create_error,
377                &self.cancellation_token,
378            )
379            .await?;
380
381        // Deserialize response
382        serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
383            error: e.to_string(),
384            body: String::from_utf8_lossy(&response.body).to_string(),
385        })
386    }
387
388    // ========================================================================
389    // Markets Endpoints
390    // ========================================================================
391
392    /// Fetch all perpetual markets from dYdX.
393    ///
394    /// # Errors
395    ///
396    /// Returns an error if the HTTP request fails or response parsing fails.
397    pub async fn get_markets(&self) -> Result<super::models::MarketsResponse, DydxHttpError> {
398        self.send_request(Method::GET, "/v4/perpetualMarkets", None)
399            .await
400    }
401
402    /// Fetch all instruments and parse them into Nautilus `InstrumentAny` types.
403    ///
404    /// This method fetches all perpetual markets from dYdX and converts them
405    /// into Nautilus instrument definitions using the `parse_instrument_any` function.
406    ///
407    /// # Errors
408    ///
409    /// Returns an error if:
410    /// - The HTTP request fails.
411    /// - The response cannot be parsed.
412    /// - Any instrument parsing fails.
413    ///
414    pub async fn fetch_instruments(
415        &self,
416        maker_fee: Option<rust_decimal::Decimal>,
417        taker_fee: Option<rust_decimal::Decimal>,
418    ) -> Result<Vec<InstrumentAny>, DydxHttpError> {
419        use nautilus_core::time::get_atomic_clock_realtime;
420
421        let markets_response = self.get_markets().await?;
422        let ts_init = get_atomic_clock_realtime().get_time_ns();
423
424        let mut instruments = Vec::new();
425        let mut skipped = 0;
426
427        for (ticker, market) in markets_response.markets {
428            match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
429                Ok(instrument) => {
430                    instruments.push(instrument);
431                }
432                Err(e) => {
433                    tracing::warn!("Failed to parse instrument {ticker}: {e}");
434                    skipped += 1;
435                }
436            }
437        }
438
439        if skipped > 0 {
440            tracing::info!(
441                "Parsed {} instruments, skipped {} (inactive or invalid)",
442                instruments.len(),
443                skipped
444            );
445        } else {
446            tracing::info!("Parsed {} instruments", instruments.len());
447        }
448
449        Ok(instruments)
450    }
451
452    // ========================================================================
453    // Account Endpoints
454    // ========================================================================
455
456    /// Fetch orderbook for a specific market.
457    ///
458    /// # Errors
459    ///
460    /// Returns an error if the HTTP request fails or response parsing fails.
461    pub async fn get_orderbook(
462        &self,
463        ticker: &str,
464    ) -> Result<super::models::OrderbookResponse, DydxHttpError> {
465        let endpoint = format!("/v4/orderbooks/perpetualMarket/{ticker}");
466        self.send_request(Method::GET, &endpoint, None).await
467    }
468
469    /// Fetch recent trades for a market.
470    ///
471    /// # Errors
472    ///
473    /// Returns an error if the HTTP request fails or response parsing fails.
474    pub async fn get_trades(
475        &self,
476        ticker: &str,
477        limit: Option<u32>,
478    ) -> Result<super::models::TradesResponse, DydxHttpError> {
479        let endpoint = format!("/v4/trades/perpetualMarket/{ticker}");
480        let query = limit.map(|l| format!("limit={l}"));
481        self.send_request(Method::GET, &endpoint, query.as_deref())
482            .await
483    }
484
485    /// Fetch candles/klines for a market.
486    ///
487    /// # Errors
488    ///
489    /// Returns an error if the HTTP request fails or response parsing fails.
490    pub async fn get_candles(
491        &self,
492        ticker: &str,
493        resolution: DydxCandleResolution,
494        limit: Option<u32>,
495        from_iso: Option<DateTime<Utc>>,
496        to_iso: Option<DateTime<Utc>>,
497    ) -> Result<super::models::CandlesResponse, DydxHttpError> {
498        let endpoint = format!("/v4/candles/perpetualMarkets/{ticker}");
499        let mut query_parts = vec![format!("resolution={}", resolution)];
500        if let Some(l) = limit {
501            query_parts.push(format!("limit={l}"));
502        }
503        if let Some(from) = from_iso {
504            let from_str = from.to_rfc3339();
505            query_parts.push(format!("fromISO={}", urlencoding::encode(&from_str)));
506        }
507        if let Some(to) = to_iso {
508            let to_str = to.to_rfc3339();
509            query_parts.push(format!("toISO={}", urlencoding::encode(&to_str)));
510        }
511        let query = query_parts.join("&");
512        self.send_request(Method::GET, &endpoint, Some(&query))
513            .await
514    }
515
516    // ========================================================================
517    // Account Endpoints
518    // ========================================================================
519
520    /// Fetch subaccount information.
521    ///
522    /// # Errors
523    ///
524    /// Returns an error if the HTTP request fails or response parsing fails.
525    pub async fn get_subaccount(
526        &self,
527        address: &str,
528        subaccount_number: u32,
529    ) -> Result<super::models::SubaccountResponse, DydxHttpError> {
530        let endpoint = format!("/v4/addresses/{address}/subaccountNumber/{subaccount_number}");
531        self.send_request(Method::GET, &endpoint, None).await
532    }
533
534    /// Fetch fills for a subaccount.
535    ///
536    /// # Errors
537    ///
538    /// Returns an error if the HTTP request fails or response parsing fails.
539    pub async fn get_fills(
540        &self,
541        address: &str,
542        subaccount_number: u32,
543        market: Option<&str>,
544        limit: Option<u32>,
545    ) -> Result<super::models::FillsResponse, DydxHttpError> {
546        let endpoint = "/v4/fills";
547        let mut query_parts = vec![
548            format!("address={address}"),
549            format!("subaccountNumber={subaccount_number}"),
550        ];
551        if let Some(m) = market {
552            query_parts.push(format!("market={m}"));
553        }
554        if let Some(l) = limit {
555            query_parts.push(format!("limit={l}"));
556        }
557        let query = query_parts.join("&");
558        self.send_request(Method::GET, endpoint, Some(&query)).await
559    }
560
561    /// Fetch orders for a subaccount.
562    ///
563    /// # Errors
564    ///
565    /// Returns an error if the HTTP request fails or response parsing fails.
566    pub async fn get_orders(
567        &self,
568        address: &str,
569        subaccount_number: u32,
570        market: Option<&str>,
571        limit: Option<u32>,
572    ) -> Result<super::models::OrdersResponse, DydxHttpError> {
573        let endpoint = "/v4/orders";
574        let mut query_parts = vec![
575            format!("address={address}"),
576            format!("subaccountNumber={subaccount_number}"),
577        ];
578        if let Some(m) = market {
579            query_parts.push(format!("market={m}"));
580        }
581        if let Some(l) = limit {
582            query_parts.push(format!("limit={l}"));
583        }
584        let query = query_parts.join("&");
585        self.send_request(Method::GET, endpoint, Some(&query)).await
586    }
587
588    /// Fetch transfers for a subaccount.
589    ///
590    /// # Errors
591    ///
592    /// Returns an error if the HTTP request fails or response parsing fails.
593    pub async fn get_transfers(
594        &self,
595        address: &str,
596        subaccount_number: u32,
597        limit: Option<u32>,
598    ) -> Result<super::models::TransfersResponse, DydxHttpError> {
599        let endpoint = "/v4/transfers";
600        let mut query_parts = vec![
601            format!("address={address}"),
602            format!("subaccountNumber={subaccount_number}"),
603        ];
604        if let Some(l) = limit {
605            query_parts.push(format!("limit={l}"));
606        }
607        let query = query_parts.join("&");
608        self.send_request(Method::GET, endpoint, Some(&query)).await
609    }
610
611    // ========================================================================
612    // Utility Endpoints
613    // ========================================================================
614
615    /// Get current server time.
616    ///
617    /// # Errors
618    ///
619    /// Returns an error if the HTTP request fails or response parsing fails.
620    pub async fn get_time(&self) -> Result<super::models::TimeResponse, DydxHttpError> {
621        self.send_request(Method::GET, "/v4/time", None).await
622    }
623
624    /// Get current blockchain height.
625    ///
626    /// # Errors
627    ///
628    /// Returns an error if the HTTP request fails or response parsing fails.
629    pub async fn get_height(&self) -> Result<super::models::HeightResponse, DydxHttpError> {
630        self.send_request(Method::GET, "/v4/height", None).await
631    }
632}
633
634/// Provides a higher-level HTTP client for the [dYdX v4](https://dydx.exchange) Indexer REST API.
635///
636/// This client wraps the underlying `DydxRawHttpClient` to handle conversions
637/// into the Nautilus domain model, following the two-layer pattern established
638/// in OKX, Bybit, and BitMEX adapters.
639///
640/// **Architecture:**
641/// - **Raw client** (`DydxRawHttpClient`): Low-level HTTP methods matching dYdX Indexer API endpoints.
642/// - **Domain client** (`DydxHttpClient`): High-level methods using Nautilus domain types.
643///
644/// The domain client:
645/// - Wraps the raw client in an `Arc` for efficient cloning (required for Python bindings).
646/// - Maintains an instrument cache using `DashMap` for thread-safe concurrent access.
647/// - Provides standard cache methods: `cache_instruments()`, `cache_instrument()`, `get_instrument()`.
648/// - Tracks cache initialization state for optimizations.
649#[derive(Debug)]
650#[cfg_attr(
651    feature = "python",
652    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
653)]
654pub struct DydxHttpClient {
655    /// Raw HTTP client wrapped in Arc for efficient cloning.
656    pub(crate) inner: Arc<DydxRawHttpClient>,
657    /// Instrument cache shared across the adapter using DashMap for thread-safe access.
658    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
659    /// Cached mapping from CLOB pair ID → InstrumentId for efficient lookups.
660    ///
661    /// This is populated from HTTP PerpetualMarket metadata (`clob_pair_id`) alongside
662    /// instrument creation to avoid re-deriving IDs from symbols or other heuristics.
663    pub(crate) clob_pair_id_to_instrument: Arc<DashMap<u32, InstrumentId>>,
664    /// Tracks whether the instrument cache has been initialized.
665    cache_initialized: AtomicBool,
666}
667
668impl Clone for DydxHttpClient {
669    fn clone(&self) -> Self {
670        let cache_initialized = AtomicBool::new(false);
671        let is_initialized = self.cache_initialized.load(Ordering::Acquire);
672        if is_initialized {
673            cache_initialized.store(true, Ordering::Release);
674        }
675
676        Self {
677            inner: self.inner.clone(),
678            instruments_cache: self.instruments_cache.clone(),
679            clob_pair_id_to_instrument: self.clob_pair_id_to_instrument.clone(),
680            cache_initialized,
681        }
682    }
683}
684
685impl Default for DydxHttpClient {
686    fn default() -> Self {
687        Self::new(None, Some(60), None, false, None)
688            .expect("Failed to create default DydxHttpClient")
689    }
690}
691
692impl DydxHttpClient {
693    /// Creates a new [`DydxHttpClient`] using the default dYdX Indexer HTTP URL,
694    /// optionally overridden with a custom base URL.
695    ///
696    /// **Note**: No credentials are required as the dYdX Indexer API is publicly accessible.
697    /// Order submission and trading operations use gRPC with blockchain transaction signing.
698    ///
699    /// # Errors
700    ///
701    /// Returns an error if the underlying HTTP client or retry manager cannot be created.
702    pub fn new(
703        base_url: Option<String>,
704        timeout_secs: Option<u64>,
705        proxy_url: Option<String>,
706        is_testnet: bool,
707        retry_config: Option<RetryConfig>,
708    ) -> anyhow::Result<Self> {
709        Ok(Self {
710            inner: Arc::new(DydxRawHttpClient::new(
711                base_url,
712                timeout_secs,
713                proxy_url,
714                is_testnet,
715                retry_config,
716            )?),
717            instruments_cache: Arc::new(DashMap::new()),
718            clob_pair_id_to_instrument: Arc::new(DashMap::new()),
719            cache_initialized: AtomicBool::new(false),
720        })
721    }
722
723    /// Requests instruments from the dYdX Indexer API and returns Nautilus domain types.
724    ///
725    /// This method does NOT automatically cache results. Use `fetch_and_cache_instruments()`
726    /// for automatic caching, or call `cache_instruments()` manually with the results.
727    ///
728    /// # Errors
729    ///
730    /// Returns an error if the HTTP request or parsing fails.
731    /// Individual instrument parsing errors are logged as warnings.
732    pub async fn request_instruments(
733        &self,
734        symbol: Option<String>,
735        maker_fee: Option<rust_decimal::Decimal>,
736        taker_fee: Option<rust_decimal::Decimal>,
737    ) -> anyhow::Result<Vec<InstrumentAny>> {
738        use nautilus_core::time::get_atomic_clock_realtime;
739
740        let markets_response = self.inner.get_markets().await?;
741        let ts_init = get_atomic_clock_realtime().get_time_ns();
742
743        let mut instruments = Vec::new();
744        let mut skipped = 0;
745
746        for (ticker, market) in markets_response.markets {
747            // Filter by symbol if specified
748            if let Some(ref sym) = symbol
749                && ticker != *sym
750            {
751                continue;
752            }
753
754            // Parse using http/parse.rs
755            match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
756                Ok(instrument) => {
757                    instruments.push(instrument);
758                }
759                Err(e) => {
760                    tracing::warn!("Failed to parse instrument {ticker}: {e}");
761                    skipped += 1;
762                }
763            }
764        }
765
766        if skipped > 0 {
767            tracing::info!(
768                "Parsed {} instruments, skipped {} (inactive or invalid)",
769                instruments.len(),
770                skipped
771            );
772        } else {
773            tracing::debug!("Parsed {} instruments", instruments.len());
774        }
775
776        Ok(instruments)
777    }
778
779    /// Fetches instruments from the API and caches them.
780    ///
781    /// This is a convenience method that fetches instruments and populates both
782    /// the symbol-based and CLOB pair ID-based caches.
783    ///
784    /// # Errors
785    ///
786    /// Returns an error if the HTTP request or parsing fails.
787    pub async fn fetch_and_cache_instruments(&self) -> anyhow::Result<()> {
788        use nautilus_core::time::get_atomic_clock_realtime;
789
790        self.instruments_cache.clear();
791        self.clob_pair_id_to_instrument.clear();
792
793        let markets_response = self.inner.get_markets().await?;
794        let ts_init = get_atomic_clock_realtime().get_time_ns();
795
796        let mut instruments = Vec::new();
797        let mut skipped = 0;
798
799        for (ticker, market) in markets_response.markets {
800            // Parse using http/parse.rs
801            match super::parse::parse_instrument_any(&market, None, None, ts_init) {
802                Ok(instrument) => {
803                    let instrument_id = instrument.id();
804                    let symbol = instrument_id.symbol.inner();
805                    self.instruments_cache.insert(symbol, instrument.clone());
806
807                    // Also cache by clob_pair_id for efficient WebSocket lookups
808                    self.clob_pair_id_to_instrument
809                        .insert(market.clob_pair_id, instrument_id);
810
811                    instruments.push(instrument);
812                }
813                Err(e) => {
814                    tracing::warn!("Failed to parse instrument {ticker}: {e}");
815                    skipped += 1;
816                }
817            }
818        }
819
820        if !instruments.is_empty() {
821            self.cache_initialized.store(true, Ordering::Release);
822        }
823
824        if skipped > 0 {
825            tracing::info!(
826                "Cached {} instruments, skipped {} (inactive or invalid)",
827                instruments.len(),
828                skipped
829            );
830        } else {
831            tracing::info!("Cached {} instruments", instruments.len());
832        }
833
834        Ok(())
835    }
836
837    /// Caches multiple instruments.
838    ///
839    /// Any existing instruments with the same symbols will be replaced.
840    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
841        for inst in instruments {
842            let symbol = inst.id().symbol.inner();
843            self.instruments_cache.insert(symbol, inst);
844        }
845        self.cache_initialized.store(true, Ordering::Release);
846    }
847
848    /// Caches a single instrument.
849    ///
850    /// Any existing instrument with the same symbol will be replaced.
851    pub fn cache_instrument(&self, instrument: InstrumentAny) {
852        let symbol = instrument.id().symbol.inner();
853        self.instruments_cache.insert(symbol, instrument);
854        self.cache_initialized.store(true, Ordering::Release);
855    }
856
857    /// Gets an instrument from the cache by symbol.
858    #[must_use]
859    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
860        self.instruments_cache
861            .get(symbol)
862            .map(|entry| entry.clone())
863    }
864
865    /// Gets an instrument by CLOB pair ID.
866    ///
867    /// This uses the internal clob_pair_id mapping populated during `fetch_and_cache_instruments()`.
868    #[must_use]
869    pub fn get_instrument_by_clob_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
870        // First get the InstrumentId from clob_pair_id mapping
871        let instrument_id = self
872            .clob_pair_id_to_instrument
873            .get(&clob_pair_id)
874            .map(|entry| *entry)?;
875
876        // Then look up the full instrument by symbol
877        self.get_instrument(&instrument_id.symbol.inner())
878    }
879
880    /// Requests historical trades for a symbol.
881    ///
882    /// Fetches trade data from the dYdX Indexer API's `/v4/trades/perpetualMarket/:ticker` endpoint.
883    /// Results are ordered by creation time descending (newest first).
884    ///
885    /// # Errors
886    ///
887    /// Returns an error if the HTTP request fails or response cannot be parsed.
888    pub async fn request_trades(
889        &self,
890        symbol: &str,
891        limit: Option<u32>,
892    ) -> anyhow::Result<super::models::TradesResponse> {
893        self.inner
894            .get_trades(symbol, limit)
895            .await
896            .map_err(Into::into)
897    }
898
899    /// Requests historical candles for a symbol.
900    ///
901    /// Fetches candle data from the dYdX Indexer API's `/v4/candles/perpetualMarkets/:ticker` endpoint.
902    /// Results are ordered by start time ascending (oldest first).
903    ///
904    /// # Errors
905    ///
906    /// Returns an error if the HTTP request fails or response cannot be parsed.
907    pub async fn request_candles(
908        &self,
909        symbol: &str,
910        resolution: DydxCandleResolution,
911        limit: Option<u32>,
912        from_iso: Option<DateTime<Utc>>,
913        to_iso: Option<DateTime<Utc>>,
914    ) -> anyhow::Result<super::models::CandlesResponse> {
915        self.inner
916            .get_candles(symbol, resolution, limit, from_iso, to_iso)
917            .await
918            .map_err(Into::into)
919    }
920
921    /// Helper to get instrument or fetch if not cached.
922    ///
923    /// This is a convenience method that first checks the cache, and if the
924    /// instrument is not found, fetches it from the API. This is useful for
925    /// ensuring an instrument is available without explicitly managing the cache.
926    ///
927    /// # Errors
928    ///
929    /// Returns an error if:
930    /// - The HTTP request fails.
931    /// - The instrument is not found on the exchange.
932    ///
933    #[allow(dead_code)]
934    async fn instrument_or_fetch(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
935        if let Some(instrument) = self.get_instrument(&symbol) {
936            return Ok(instrument);
937        }
938
939        // Fetch from API
940        let instruments = self
941            .request_instruments(Some(symbol.to_string()), None, None)
942            .await?;
943
944        instruments
945            .into_iter()
946            .next()
947            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {symbol}"))
948    }
949
950    /// Exposes raw HTTP client for testing and advanced use cases.
951    ///
952    /// This provides access to the underlying [`DydxRawHttpClient`] for cases
953    /// where low-level API access is needed. Most users should use the domain
954    /// client methods instead.
955    #[must_use]
956    pub fn raw_client(&self) -> &Arc<DydxRawHttpClient> {
957        &self.inner
958    }
959
960    /// Check if this client is configured for testnet.
961    #[must_use]
962    pub fn is_testnet(&self) -> bool {
963        self.inner.is_testnet()
964    }
965
966    /// Get the base URL being used by this client.
967    #[must_use]
968    pub fn base_url(&self) -> &str {
969        self.inner.base_url()
970    }
971
972    /// Check if the instrument cache has been initialized.
973    #[must_use]
974    pub fn is_cache_initialized(&self) -> bool {
975        self.cache_initialized.load(Ordering::Acquire)
976    }
977
978    /// Get the number of instruments currently cached.
979    #[must_use]
980    pub fn cached_instruments_count(&self) -> usize {
981        self.instruments_cache.len()
982    }
983
984    /// Returns a reference to the instruments cache.
985    #[must_use]
986    pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
987        &self.instruments_cache
988    }
989
990    /// Get the mapping from CLOB pair ID to `InstrumentId`.
991    ///
992    /// This map is populated when instruments are fetched via `request_instruments` /
993    /// `cache_instruments()` using the Indexer `PerpetualMarket.clob_pair_id` field.
994    #[must_use]
995    pub fn clob_pair_id_mapping(&self) -> &Arc<DashMap<u32, InstrumentId>> {
996        &self.clob_pair_id_to_instrument
997    }
998}
999
1000////////////////////////////////////////////////////////////////////////////////
1001// Tests
1002////////////////////////////////////////////////////////////////////////////////
1003
1004#[cfg(test)]
1005mod tests {
1006    use nautilus_core::UnixNanos;
1007    use rstest::rstest;
1008
1009    use super::*;
1010    use crate::http::error;
1011
1012    // ========================================================================
1013    // Raw Client Tests
1014    // ========================================================================
1015
1016    #[tokio::test]
1017    async fn test_raw_client_creation() {
1018        let client = DydxRawHttpClient::new(None, Some(30), None, false, None);
1019        assert!(client.is_ok());
1020
1021        let client = client.unwrap();
1022        assert!(!client.is_testnet());
1023        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1024    }
1025
1026    #[tokio::test]
1027    async fn test_raw_client_testnet() {
1028        let client = DydxRawHttpClient::new(None, Some(30), None, true, None);
1029        assert!(client.is_ok());
1030
1031        let client = client.unwrap();
1032        assert!(client.is_testnet());
1033        assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1034    }
1035
1036    // ========================================================================
1037    // Domain Client Tests
1038    // ========================================================================
1039
1040    #[tokio::test]
1041    async fn test_domain_client_creation() {
1042        let client = DydxHttpClient::new(None, Some(30), None, false, None);
1043        assert!(client.is_ok());
1044
1045        let client = client.unwrap();
1046        assert!(!client.is_testnet());
1047        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1048        assert!(!client.is_cache_initialized());
1049        assert_eq!(client.cached_instruments_count(), 0);
1050    }
1051
1052    #[tokio::test]
1053    async fn test_domain_client_testnet() {
1054        let client = DydxHttpClient::new(None, Some(30), None, true, None);
1055        assert!(client.is_ok());
1056
1057        let client = client.unwrap();
1058        assert!(client.is_testnet());
1059        assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1060    }
1061
1062    #[tokio::test]
1063    async fn test_domain_client_default() {
1064        let client = DydxHttpClient::default();
1065        assert!(!client.is_testnet());
1066        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1067        assert!(!client.is_cache_initialized());
1068    }
1069
1070    #[tokio::test]
1071    async fn test_domain_client_clone() {
1072        let client = DydxHttpClient::new(None, Some(30), None, false, None).unwrap();
1073
1074        // Clone before initialization
1075        let cloned = client.clone();
1076        assert!(!cloned.is_cache_initialized());
1077
1078        // Simulate cache initialization
1079        client.cache_initialized.store(true, Ordering::Release);
1080
1081        // Clone after initialization
1082        #[allow(clippy::redundant_clone)]
1083        let cloned_after = client.clone();
1084        assert!(cloned_after.is_cache_initialized());
1085    }
1086
1087    #[rstest]
1088    fn test_domain_client_cache_instrument() {
1089        use nautilus_model::{
1090            identifiers::{InstrumentId, Symbol},
1091            instruments::CryptoPerpetual,
1092            types::{Currency, Price, Quantity},
1093        };
1094
1095        let client = DydxHttpClient::default();
1096        assert_eq!(client.cached_instruments_count(), 0);
1097
1098        // Create a test instrument
1099        let instrument_id =
1100            InstrumentId::new(Symbol::from("BTC-USD"), *crate::common::consts::DYDX_VENUE);
1101        let price = Price::from("1.0");
1102        let size = Quantity::from("0.001");
1103        let instrument = CryptoPerpetual::new(
1104            instrument_id,
1105            Symbol::from("BTC-USD"),
1106            Currency::BTC(),
1107            Currency::USD(),
1108            Currency::USD(),
1109            false,
1110            price.precision,
1111            size.precision,
1112            price,
1113            size,
1114            None,
1115            None,
1116            None,
1117            None,
1118            None,
1119            None,
1120            None,
1121            None,
1122            None,
1123            None,
1124            None,
1125            None,
1126            UnixNanos::default(),
1127            UnixNanos::default(),
1128        );
1129
1130        // Cache the instrument
1131        client.cache_instrument(InstrumentAny::CryptoPerpetual(instrument));
1132        assert_eq!(client.cached_instruments_count(), 1);
1133        assert!(client.is_cache_initialized());
1134
1135        // Retrieve it
1136        let btc_usd = Ustr::from("BTC-USD");
1137        let cached = client.get_instrument(&btc_usd);
1138        assert!(cached.is_some());
1139    }
1140
1141    #[rstest]
1142    fn test_domain_client_get_instrument_not_found() {
1143        let client = DydxHttpClient::default();
1144        let eth_usd = Ustr::from("ETH-USD");
1145        let result = client.get_instrument(&eth_usd);
1146        assert!(result.is_none());
1147    }
1148
1149    #[tokio::test]
1150    async fn test_http_timeout_respects_configuration_and_does_not_block() {
1151        use axum::{Router, routing::get};
1152        use tokio::net::TcpListener;
1153
1154        async fn slow_handler() -> &'static str {
1155            // Sleep longer than the configured HTTP timeout.
1156            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1157            "ok"
1158        }
1159
1160        let router = Router::new().route("/v4/slow", get(slow_handler));
1161
1162        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1163        let addr = listener.local_addr().unwrap();
1164
1165        tokio::spawn(async move {
1166            axum::serve(listener, router.into_make_service())
1167                .await
1168                .unwrap();
1169        });
1170
1171        let base_url = format!("http://{}", addr);
1172
1173        // Configure a small operation timeout and no retries so the request
1174        // fails quickly even though the handler sleeps for 5 seconds.
1175        let retry_config = RetryConfig {
1176            max_retries: 0,
1177            initial_delay_ms: 0,
1178            max_delay_ms: 0,
1179            backoff_factor: 1.0,
1180            jitter_ms: 0,
1181            operation_timeout_ms: Some(500),
1182            immediate_first: true,
1183            max_elapsed_ms: Some(1_000),
1184        };
1185
1186        // Keep HTTP client timeout at a typical value; rely on RetryManager
1187        // operation timeout to enforce non-blocking behavior.
1188        let client =
1189            DydxRawHttpClient::new(Some(base_url), Some(60), None, false, Some(retry_config))
1190                .unwrap();
1191
1192        let start = std::time::Instant::now();
1193        let result: Result<serde_json::Value, error::DydxHttpError> =
1194            client.send_request(Method::GET, "/v4/slow", None).await;
1195        let elapsed = start.elapsed();
1196
1197        // Request should fail (timeout or client error), but without blocking the thread
1198        // for the full handler duration.
1199        assert!(result.is_err());
1200        assert!(elapsed < std::time::Duration::from_secs(3));
1201    }
1202}