nautilus_dydx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides an ergonomic wrapper around the **dYdX v4 Indexer REST API** –
17//! <https://docs.dydx.exchange/api_integration-indexer/indexer_api>.
18//!
19//! This module exports two complementary HTTP clients following the standardized
20//! two-layer architecture pattern established in OKX, Bybit, and BitMEX adapters:
21//!
22//! - [`DydxRawHttpClient`]: Low-level HTTP methods matching dYdX Indexer API endpoints.
23//! - [`DydxHttpClient`]: High-level methods using Nautilus domain types with instrument caching.
24//!
25//! ## Two-Layer Architecture
26//!
27//! The raw client handles HTTP communication, rate limiting, retries, and basic response parsing.
28//! The domain client wraps the raw client in an `Arc`, maintains an instrument cache using `DashMap`,
29//! and provides high-level methods that work with Nautilus domain types.
30//!
31//! ## Key Responsibilities
32//!
33//! - Rate-limiting based on the public dYdX specification.
34//! - Zero-copy deserialization of large JSON payloads into domain models.
35//! - Conversion of raw exchange errors into the rich [`DydxHttpError`] enum.
36//! - Instrument caching with standard methods: `cache_instruments()`, `cache_instrument()`, `get_instrument()`.
37//!
38//! # Important Note
39//!
40//! The dYdX v4 Indexer REST API does **NOT** require authentication or request signing.
41//! All endpoints are publicly accessible using only wallet addresses and subaccount numbers
42//! as query parameters. Order submission and trading operations use gRPC with blockchain
43//! transaction signing, not REST API.
44//!
45//! # Official documentation
46//!
47//! | Endpoint                             | Reference                                              |
48//! |--------------------------------------|--------------------------------------------------------|
49//! | Market data                          | <https://docs.dydx.exchange/api_integration-indexer/indexer_api#markets> |
50//! | Account data                         | <https://docs.dydx.exchange/api_integration-indexer/indexer_api#accounts> |
51//! | Utility endpoints                    | <https://docs.dydx.exchange/api_integration-indexer/indexer_api#utility> |
52
53use std::{
54    collections::HashMap,
55    fmt::{Debug, Formatter},
56    num::NonZeroU32,
57    sync::{
58        Arc, LazyLock,
59        atomic::{AtomicBool, Ordering},
60    },
61};
62
63use chrono::{DateTime, Utc};
64use dashmap::DashMap;
65use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
66use nautilus_model::{
67    data::{
68        Bar, BarType, BookOrder, OrderBookDelta, OrderBookDeltas, TradeTick,
69        bar::get_bar_interval_ns,
70    },
71    enums::{AggressorSide, BookAction, OrderSide as NautilusOrderSide, RecordFlag},
72    identifiers::{AccountId, InstrumentId, TradeId},
73    instruments::{Instrument, InstrumentAny},
74    reports::{FillReport, OrderStatusReport, PositionStatusReport},
75    types::{Price, Quantity},
76};
77use nautilus_network::{
78    http::{HttpClient, Method, USER_AGENT},
79    ratelimiter::quota::Quota,
80    retry::{RetryConfig, RetryManager},
81};
82use serde::{Deserialize, Serialize, de::DeserializeOwned};
83use tokio_util::sync::CancellationToken;
84use ustr::Ustr;
85
86use super::error::DydxHttpError;
87use crate::common::{
88    consts::{DYDX_HTTP_URL, DYDX_TESTNET_HTTP_URL},
89    enums::DydxCandleResolution,
90    parse::extract_raw_symbol,
91};
92
93/// Default dYdX Indexer REST API rate limit.
94///
95/// The dYdX Indexer API rate limits are generous for read-only operations:
96/// - General: 100 requests per 10 seconds per IP
97/// - We use a conservative 10 requests per second as the default quota.
98pub static DYDX_REST_QUOTA: LazyLock<Quota> =
99    LazyLock::new(|| Quota::per_second(NonZeroU32::new(10).unwrap()));
100
101/// Represents a dYdX HTTP response wrapper.
102///
103/// Most dYdX Indexer API endpoints return data directly without a wrapper,
104/// but some endpoints may use this structure for consistency.
105#[derive(Debug, Serialize, Deserialize)]
106pub struct DydxResponse<T> {
107    /// The typed data returned by the dYdX endpoint.
108    pub data: T,
109}
110
111/// Provides a raw HTTP client for interacting with the [dYdX v4](https://dydx.exchange) Indexer REST API.
112///
113/// This client wraps the underlying [`HttpClient`] to handle functionality
114/// specific to dYdX Indexer API, such as rate-limiting, forming request URLs,
115/// and deserializing responses into dYdX specific data models.
116///
117/// **Note**: Unlike traditional centralized exchanges, the dYdX v4 Indexer REST API
118/// does NOT require authentication, API keys, or request signing. All endpoints are
119/// publicly accessible.
120pub struct DydxRawHttpClient {
121    base_url: String,
122    client: HttpClient,
123    retry_manager: RetryManager<DydxHttpError>,
124    cancellation_token: CancellationToken,
125    is_testnet: bool,
126}
127
128impl Default for DydxRawHttpClient {
129    fn default() -> Self {
130        Self::new(None, Some(60), None, false, None)
131            .expect("Failed to create default DydxRawHttpClient")
132    }
133}
134
135impl Debug for DydxRawHttpClient {
136    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
137        f.debug_struct(stringify!(DydxRawHttpClient))
138            .field("base_url", &self.base_url)
139            .field("is_testnet", &self.is_testnet)
140            .finish_non_exhaustive()
141    }
142}
143
144impl DydxRawHttpClient {
145    /// Cancel all pending HTTP requests.
146    pub fn cancel_all_requests(&self) {
147        self.cancellation_token.cancel();
148    }
149
150    /// Get the cancellation token for this client.
151    pub fn cancellation_token(&self) -> &CancellationToken {
152        &self.cancellation_token
153    }
154
155    /// Creates a new [`DydxRawHttpClient`] using the default dYdX Indexer HTTP URL,
156    /// optionally overridden with a custom base URL.
157    ///
158    /// **Note**: No credentials are required as the dYdX Indexer API is publicly accessible.
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if the retry manager cannot be created.
163    pub fn new(
164        base_url: Option<String>,
165        timeout_secs: Option<u64>,
166        proxy_url: Option<String>,
167        is_testnet: bool,
168        retry_config: Option<RetryConfig>,
169    ) -> anyhow::Result<Self> {
170        let base_url = if is_testnet {
171            base_url.unwrap_or_else(|| DYDX_TESTNET_HTTP_URL.to_string())
172        } else {
173            base_url.unwrap_or_else(|| DYDX_HTTP_URL.to_string())
174        };
175
176        let retry_manager = RetryManager::new(retry_config.unwrap_or_default());
177
178        // Build headers
179        let mut headers = HashMap::new();
180        headers.insert(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
181
182        let client = HttpClient::new(
183            headers,
184            vec![], // No specific headers to extract from responses
185            vec![], // No keyed quotas (we use a single global quota)
186            Some(*DYDX_REST_QUOTA),
187            timeout_secs,
188            proxy_url,
189        )
190        .map_err(|e| {
191            DydxHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
192        })?;
193
194        Ok(Self {
195            base_url,
196            client,
197            retry_manager,
198            cancellation_token: CancellationToken::new(),
199            is_testnet,
200        })
201    }
202
203    /// Check if this client is configured for testnet.
204    #[must_use]
205    pub const fn is_testnet(&self) -> bool {
206        self.is_testnet
207    }
208
209    /// Get the base URL being used by this client.
210    #[must_use]
211    pub fn base_url(&self) -> &str {
212        &self.base_url
213    }
214
215    /// Send a request to a dYdX Indexer API endpoint.
216    ///
217    /// **Note**: dYdX Indexer API does not require authentication headers.
218    ///
219    /// # Errors
220    ///
221    /// Returns an error if:
222    /// - The HTTP request fails.
223    /// - The response has a non-success HTTP status code.
224    /// - The response body cannot be deserialized to type `T`.
225    /// - The request is canceled.
226    pub async fn send_request<T>(
227        &self,
228        method: Method,
229        endpoint: &str,
230        query_params: Option<&str>,
231    ) -> Result<T, DydxHttpError>
232    where
233        T: DeserializeOwned,
234    {
235        let url = if let Some(params) = query_params {
236            format!("{}{endpoint}?{params}", self.base_url)
237        } else {
238            format!("{}{endpoint}", self.base_url)
239        };
240
241        let operation = || async {
242            let request = self
243                .client
244                .request_with_ustr_keys(
245                    method.clone(),
246                    url.clone(),
247                    None, // No params
248                    None, // No additional headers
249                    None, // No body for GET requests
250                    None, // Use default timeout
251                    None, // No specific rate limit keys (using global quota)
252                )
253                .await
254                .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
255
256            // Check for HTTP errors
257            if !request.status.is_success() {
258                return Err(DydxHttpError::HttpStatus {
259                    status: request.status.as_u16(),
260                    message: String::from_utf8_lossy(&request.body).to_string(),
261                });
262            }
263
264            Ok(request)
265        };
266
267        // Retry strategy for dYdX Indexer API:
268        // 1. Network errors: always retry (transient connection issues)
269        // 2. HTTP 429/5xx: rate limiting and server errors should be retried
270        // 3. Client errors (4xx except 429): should NOT be retried
271        let should_retry = |error: &DydxHttpError| -> bool {
272            match error {
273                DydxHttpError::HttpClientError(_) => true,
274                DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
275                _ => false,
276            }
277        };
278
279        let create_error = |msg: String| -> DydxHttpError {
280            if msg == "canceled" {
281                DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
282            } else {
283                DydxHttpError::ValidationError(msg)
284            }
285        };
286
287        // Execute request with retry logic
288        let response = self
289            .retry_manager
290            .execute_with_retry_with_cancel(
291                endpoint,
292                operation,
293                should_retry,
294                create_error,
295                &self.cancellation_token,
296            )
297            .await?;
298
299        // Deserialize response
300        serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
301            error: e.to_string(),
302            body: String::from_utf8_lossy(&response.body).to_string(),
303        })
304    }
305
306    /// Send a POST request to a dYdX Indexer API endpoint.
307    ///
308    /// Note: Most dYdX Indexer endpoints are GET-based. POST is rarely used.
309    ///
310    /// # Errors
311    ///
312    /// Returns an error if:
313    /// - The request body cannot be serialized to JSON.
314    /// - The HTTP request fails.
315    /// - The response has a non-success HTTP status code.
316    /// - The response body cannot be deserialized to type `T`.
317    /// - The request is canceled.
318    pub async fn send_post_request<T, B>(
319        &self,
320        endpoint: &str,
321        body: &B,
322    ) -> Result<T, DydxHttpError>
323    where
324        T: DeserializeOwned,
325        B: Serialize,
326    {
327        let url = format!("{}{endpoint}", self.base_url);
328
329        let body_bytes = serde_json::to_vec(body).map_err(|e| DydxHttpError::Serialization {
330            error: e.to_string(),
331        })?;
332
333        let operation = || async {
334            let request = self
335                .client
336                .request_with_ustr_keys(
337                    Method::POST,
338                    url.clone(),
339                    None, // No params
340                    None, // No additional headers (content-type handled by body)
341                    Some(body_bytes.clone()),
342                    None, // Use default timeout
343                    None, // No specific rate limit keys (using global quota)
344                )
345                .await
346                .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
347
348            // Check for HTTP errors
349            if !request.status.is_success() {
350                return Err(DydxHttpError::HttpStatus {
351                    status: request.status.as_u16(),
352                    message: String::from_utf8_lossy(&request.body).to_string(),
353                });
354            }
355
356            Ok(request)
357        };
358
359        // Retry strategy (same as GET requests)
360        let should_retry = |error: &DydxHttpError| -> bool {
361            match error {
362                DydxHttpError::HttpClientError(_) => true,
363                DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
364                _ => false,
365            }
366        };
367
368        let create_error = |msg: String| -> DydxHttpError {
369            if msg == "canceled" {
370                DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
371            } else {
372                DydxHttpError::ValidationError(msg)
373            }
374        };
375
376        // Execute request with retry logic
377        let response = self
378            .retry_manager
379            .execute_with_retry_with_cancel(
380                endpoint,
381                operation,
382                should_retry,
383                create_error,
384                &self.cancellation_token,
385            )
386            .await?;
387
388        // Deserialize response
389        serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
390            error: e.to_string(),
391            body: String::from_utf8_lossy(&response.body).to_string(),
392        })
393    }
394
395    // ========================================================================
396    // Markets Endpoints
397    // ========================================================================
398
399    /// Fetch all perpetual markets from dYdX.
400    ///
401    /// # Errors
402    ///
403    /// Returns an error if the HTTP request fails or response parsing fails.
404    pub async fn get_markets(&self) -> Result<super::models::MarketsResponse, DydxHttpError> {
405        self.send_request(Method::GET, "/v4/perpetualMarkets", None)
406            .await
407    }
408
409    /// Fetch all instruments and parse them into Nautilus `InstrumentAny` types.
410    ///
411    /// This method fetches all perpetual markets from dYdX and converts them
412    /// into Nautilus instrument definitions using the `parse_instrument_any` function.
413    ///
414    /// # Errors
415    ///
416    /// Returns an error if:
417    /// - The HTTP request fails.
418    /// - The response cannot be parsed.
419    /// - Any instrument parsing fails.
420    ///
421    pub async fn fetch_instruments(
422        &self,
423        maker_fee: Option<rust_decimal::Decimal>,
424        taker_fee: Option<rust_decimal::Decimal>,
425    ) -> Result<Vec<InstrumentAny>, DydxHttpError> {
426        use nautilus_core::time::get_atomic_clock_realtime;
427
428        let markets_response = self.get_markets().await?;
429        let ts_init = get_atomic_clock_realtime().get_time_ns();
430
431        let mut instruments = Vec::new();
432        let mut skipped = 0;
433
434        for (ticker, market) in markets_response.markets {
435            match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
436                Ok(instrument) => {
437                    instruments.push(instrument);
438                }
439                Err(e) => {
440                    tracing::warn!("Failed to parse instrument {ticker}: {e}");
441                    skipped += 1;
442                }
443            }
444        }
445
446        if skipped > 0 {
447            tracing::info!(
448                "Parsed {} instruments, skipped {} (inactive or invalid)",
449                instruments.len(),
450                skipped
451            );
452        } else {
453            tracing::info!("Parsed {} instruments", instruments.len());
454        }
455
456        Ok(instruments)
457    }
458
459    // ========================================================================
460    // Account Endpoints
461    // ========================================================================
462
463    /// Fetch orderbook for a specific market.
464    ///
465    /// # Errors
466    ///
467    /// Returns an error if the HTTP request fails or response parsing fails.
468    pub async fn get_orderbook(
469        &self,
470        ticker: &str,
471    ) -> Result<super::models::OrderbookResponse, DydxHttpError> {
472        let endpoint = format!("/v4/orderbooks/perpetualMarket/{ticker}");
473        self.send_request(Method::GET, &endpoint, None).await
474    }
475
476    /// Fetch recent trades for a market.
477    ///
478    /// # Errors
479    ///
480    /// Returns an error if the HTTP request fails or response parsing fails.
481    pub async fn get_trades(
482        &self,
483        ticker: &str,
484        limit: Option<u32>,
485    ) -> Result<super::models::TradesResponse, DydxHttpError> {
486        let endpoint = format!("/v4/trades/perpetualMarket/{ticker}");
487        let query = limit.map(|l| format!("limit={l}"));
488        self.send_request(Method::GET, &endpoint, query.as_deref())
489            .await
490    }
491
492    /// Fetch candles/klines for a market.
493    ///
494    /// # Errors
495    ///
496    /// Returns an error if the HTTP request fails or response parsing fails.
497    pub async fn get_candles(
498        &self,
499        ticker: &str,
500        resolution: DydxCandleResolution,
501        limit: Option<u32>,
502        from_iso: Option<DateTime<Utc>>,
503        to_iso: Option<DateTime<Utc>>,
504    ) -> Result<super::models::CandlesResponse, DydxHttpError> {
505        let endpoint = format!("/v4/candles/perpetualMarkets/{ticker}");
506        let mut query_parts = vec![format!("resolution={}", resolution)];
507        if let Some(l) = limit {
508            query_parts.push(format!("limit={l}"));
509        }
510        if let Some(from) = from_iso {
511            let from_str = from.to_rfc3339();
512            query_parts.push(format!("fromISO={}", urlencoding::encode(&from_str)));
513        }
514        if let Some(to) = to_iso {
515            let to_str = to.to_rfc3339();
516            query_parts.push(format!("toISO={}", urlencoding::encode(&to_str)));
517        }
518        let query = query_parts.join("&");
519        self.send_request(Method::GET, &endpoint, Some(&query))
520            .await
521    }
522
523    // ========================================================================
524    // Account Endpoints
525    // ========================================================================
526
527    /// Fetch subaccount information.
528    ///
529    /// # Errors
530    ///
531    /// Returns an error if the HTTP request fails or response parsing fails.
532    pub async fn get_subaccount(
533        &self,
534        address: &str,
535        subaccount_number: u32,
536    ) -> Result<super::models::SubaccountResponse, DydxHttpError> {
537        let endpoint = format!("/v4/addresses/{address}/subaccountNumber/{subaccount_number}");
538        self.send_request(Method::GET, &endpoint, None).await
539    }
540
541    /// Fetch fills for a subaccount.
542    ///
543    /// # Errors
544    ///
545    /// Returns an error if the HTTP request fails or response parsing fails.
546    pub async fn get_fills(
547        &self,
548        address: &str,
549        subaccount_number: u32,
550        market: Option<&str>,
551        limit: Option<u32>,
552    ) -> Result<super::models::FillsResponse, DydxHttpError> {
553        let endpoint = "/v4/fills";
554        let mut query_parts = vec![
555            format!("address={address}"),
556            format!("subaccountNumber={subaccount_number}"),
557        ];
558        if let Some(m) = market {
559            query_parts.push(format!("market={m}"));
560        }
561        if let Some(l) = limit {
562            query_parts.push(format!("limit={l}"));
563        }
564        let query = query_parts.join("&");
565        self.send_request(Method::GET, endpoint, Some(&query)).await
566    }
567
568    /// Fetch orders for a subaccount.
569    ///
570    /// # Errors
571    ///
572    /// Returns an error if the HTTP request fails or response parsing fails.
573    pub async fn get_orders(
574        &self,
575        address: &str,
576        subaccount_number: u32,
577        market: Option<&str>,
578        limit: Option<u32>,
579    ) -> Result<super::models::OrdersResponse, DydxHttpError> {
580        let endpoint = "/v4/orders";
581        let mut query_parts = vec![
582            format!("address={address}"),
583            format!("subaccountNumber={subaccount_number}"),
584        ];
585        if let Some(m) = market {
586            query_parts.push(format!("market={m}"));
587        }
588        if let Some(l) = limit {
589            query_parts.push(format!("limit={l}"));
590        }
591        let query = query_parts.join("&");
592        self.send_request(Method::GET, endpoint, Some(&query)).await
593    }
594
595    /// Fetch transfers for a subaccount.
596    ///
597    /// # Errors
598    ///
599    /// Returns an error if the HTTP request fails or response parsing fails.
600    pub async fn get_transfers(
601        &self,
602        address: &str,
603        subaccount_number: u32,
604        limit: Option<u32>,
605    ) -> Result<super::models::TransfersResponse, DydxHttpError> {
606        let endpoint = "/v4/transfers";
607        let mut query_parts = vec![
608            format!("address={address}"),
609            format!("subaccountNumber={subaccount_number}"),
610        ];
611        if let Some(l) = limit {
612            query_parts.push(format!("limit={l}"));
613        }
614        let query = query_parts.join("&");
615        self.send_request(Method::GET, endpoint, Some(&query)).await
616    }
617
618    // ========================================================================
619    // Utility Endpoints
620    // ========================================================================
621
622    /// Get current server time.
623    ///
624    /// # Errors
625    ///
626    /// Returns an error if the HTTP request fails or response parsing fails.
627    pub async fn get_time(&self) -> Result<super::models::TimeResponse, DydxHttpError> {
628        self.send_request(Method::GET, "/v4/time", None).await
629    }
630
631    /// Get current blockchain height.
632    ///
633    /// # Errors
634    ///
635    /// Returns an error if the HTTP request fails or response parsing fails.
636    pub async fn get_height(&self) -> Result<super::models::HeightResponse, DydxHttpError> {
637        self.send_request(Method::GET, "/v4/height", None).await
638    }
639}
640
641/// Provides a higher-level HTTP client for the [dYdX v4](https://dydx.exchange) Indexer REST API.
642///
643/// This client wraps the underlying `DydxRawHttpClient` to handle conversions
644/// into the Nautilus domain model, following the two-layer pattern established
645/// in OKX, Bybit, and BitMEX adapters.
646///
647/// **Architecture:**
648/// - **Raw client** (`DydxRawHttpClient`): Low-level HTTP methods matching dYdX Indexer API endpoints.
649/// - **Domain client** (`DydxHttpClient`): High-level methods using Nautilus domain types.
650///
651/// The domain client:
652/// - Wraps the raw client in an `Arc` for efficient cloning (required for Python bindings).
653/// - Maintains an instrument cache using `DashMap` for thread-safe concurrent access.
654/// - Provides standard cache methods: `cache_instruments()`, `cache_instrument()`, `get_instrument()`.
655/// - Tracks cache initialization state for optimizations.
656#[derive(Debug)]
657#[cfg_attr(
658    feature = "python",
659    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
660)]
661pub struct DydxHttpClient {
662    /// Raw HTTP client wrapped in Arc for efficient cloning.
663    pub(crate) inner: Arc<DydxRawHttpClient>,
664    /// Instrument cache shared across the adapter using DashMap for thread-safe access.
665    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
666    /// Cached mapping from CLOB pair ID → InstrumentId for efficient lookups.
667    ///
668    /// This is populated from HTTP PerpetualMarket metadata (`clob_pair_id`) alongside
669    /// instrument creation to avoid re-deriving IDs from symbols or other heuristics.
670    pub(crate) clob_pair_id_to_instrument: Arc<DashMap<u32, InstrumentId>>,
671    /// Cached mapping from InstrumentId → PerpetualMarket for market params extraction.
672    ///
673    /// This stores the raw market data from the HTTP API for later extraction of
674    /// quantization parameters (atomic_resolution, subticks_per_tick, etc.) needed
675    /// for order submission.
676    pub(crate) market_params_cache: Arc<DashMap<InstrumentId, super::models::PerpetualMarket>>,
677    /// Tracks whether the instrument cache has been initialized.
678    cache_initialized: AtomicBool,
679}
680
681impl Clone for DydxHttpClient {
682    fn clone(&self) -> Self {
683        let cache_initialized = AtomicBool::new(false);
684        let is_initialized = self.cache_initialized.load(Ordering::Acquire);
685        if is_initialized {
686            cache_initialized.store(true, Ordering::Release);
687        }
688
689        Self {
690            inner: self.inner.clone(),
691            instruments_cache: self.instruments_cache.clone(),
692            clob_pair_id_to_instrument: self.clob_pair_id_to_instrument.clone(),
693            market_params_cache: self.market_params_cache.clone(),
694            cache_initialized,
695        }
696    }
697}
698
699impl Default for DydxHttpClient {
700    fn default() -> Self {
701        Self::new(None, Some(60), None, false, None)
702            .expect("Failed to create default DydxHttpClient")
703    }
704}
705
706impl DydxHttpClient {
707    /// Creates a new [`DydxHttpClient`] using the default dYdX Indexer HTTP URL,
708    /// optionally overridden with a custom base URL.
709    ///
710    /// **Note**: No credentials are required as the dYdX Indexer API is publicly accessible.
711    /// Order submission and trading operations use gRPC with blockchain transaction signing.
712    ///
713    /// # Errors
714    ///
715    /// Returns an error if the underlying HTTP client or retry manager cannot be created.
716    pub fn new(
717        base_url: Option<String>,
718        timeout_secs: Option<u64>,
719        proxy_url: Option<String>,
720        is_testnet: bool,
721        retry_config: Option<RetryConfig>,
722    ) -> anyhow::Result<Self> {
723        Ok(Self {
724            inner: Arc::new(DydxRawHttpClient::new(
725                base_url,
726                timeout_secs,
727                proxy_url,
728                is_testnet,
729                retry_config,
730            )?),
731            instruments_cache: Arc::new(DashMap::new()),
732            clob_pair_id_to_instrument: Arc::new(DashMap::new()),
733            market_params_cache: Arc::new(DashMap::new()),
734            cache_initialized: AtomicBool::new(false),
735        })
736    }
737
738    /// Requests instruments from the dYdX Indexer API and returns Nautilus domain types.
739    ///
740    /// This method does NOT automatically cache results. Use `fetch_and_cache_instruments()`
741    /// for automatic caching, or call `cache_instruments()` manually with the results.
742    ///
743    /// # Errors
744    ///
745    /// Returns an error if the HTTP request or parsing fails.
746    /// Individual instrument parsing errors are logged as warnings.
747    pub async fn request_instruments(
748        &self,
749        symbol: Option<String>,
750        maker_fee: Option<rust_decimal::Decimal>,
751        taker_fee: Option<rust_decimal::Decimal>,
752    ) -> anyhow::Result<Vec<InstrumentAny>> {
753        use nautilus_core::time::get_atomic_clock_realtime;
754
755        let markets_response = self.inner.get_markets().await?;
756        let ts_init = get_atomic_clock_realtime().get_time_ns();
757
758        let mut instruments = Vec::new();
759        let mut skipped = 0;
760
761        for (ticker, market) in markets_response.markets {
762            // Filter by symbol if specified
763            if let Some(ref sym) = symbol
764                && ticker != *sym
765            {
766                continue;
767            }
768
769            // Parse using http/parse.rs
770            match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
771                Ok(instrument) => {
772                    instruments.push(instrument);
773                }
774                Err(e) => {
775                    tracing::warn!("Failed to parse instrument {ticker}: {e}");
776                    skipped += 1;
777                }
778            }
779        }
780
781        if skipped > 0 {
782            tracing::info!(
783                "Parsed {} instruments, skipped {} (inactive or invalid)",
784                instruments.len(),
785                skipped
786            );
787        } else {
788            tracing::debug!("Parsed {} instruments", instruments.len());
789        }
790
791        Ok(instruments)
792    }
793
794    /// Fetches instruments from the API and caches them.
795    ///
796    /// This is a convenience method that fetches instruments and populates both
797    /// the symbol-based and CLOB pair ID-based caches.
798    ///
799    /// # Errors
800    ///
801    /// Returns an error if the HTTP request or parsing fails.
802    pub async fn fetch_and_cache_instruments(&self) -> anyhow::Result<()> {
803        use nautilus_core::time::get_atomic_clock_realtime;
804
805        self.instruments_cache.clear();
806        self.clob_pair_id_to_instrument.clear();
807        self.market_params_cache.clear();
808
809        let markets_response = self.inner.get_markets().await?;
810        let ts_init = get_atomic_clock_realtime().get_time_ns();
811
812        let mut instruments = Vec::new();
813        let mut skipped = 0;
814
815        for (ticker, market) in markets_response.markets {
816            // Parse using http/parse.rs
817            match super::parse::parse_instrument_any(&market, None, None, ts_init) {
818                Ok(instrument) => {
819                    let instrument_id = instrument.id();
820                    let symbol = instrument_id.symbol.inner();
821                    self.instruments_cache.insert(symbol, instrument.clone());
822
823                    // Also cache by clob_pair_id for efficient WebSocket lookups
824                    self.clob_pair_id_to_instrument
825                        .insert(market.clob_pair_id, instrument_id);
826
827                    // Cache raw market data for market params extraction
828                    self.market_params_cache.insert(instrument_id, market);
829
830                    instruments.push(instrument);
831                }
832                Err(e) => {
833                    tracing::warn!("Failed to parse instrument {ticker}: {e}");
834                    skipped += 1;
835                }
836            }
837        }
838
839        if !instruments.is_empty() {
840            self.cache_initialized.store(true, Ordering::Release);
841        }
842
843        if skipped > 0 {
844            tracing::info!(
845                "Cached {} instruments, skipped {} (inactive or invalid)",
846                instruments.len(),
847                skipped
848            );
849        } else {
850            tracing::info!("Cached {} instruments", instruments.len());
851        }
852
853        Ok(())
854    }
855
856    /// Caches multiple instruments.
857    ///
858    /// Any existing instruments with the same symbols will be replaced.
859    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
860        for inst in instruments {
861            let symbol = inst.id().symbol.inner();
862            self.instruments_cache.insert(symbol, inst);
863        }
864        self.cache_initialized.store(true, Ordering::Release);
865    }
866
867    /// Caches a single instrument.
868    ///
869    /// Any existing instrument with the same symbol will be replaced.
870    pub fn cache_instrument(&self, instrument: InstrumentAny) {
871        let symbol = instrument.id().symbol.inner();
872        self.instruments_cache.insert(symbol, instrument);
873        self.cache_initialized.store(true, Ordering::Release);
874    }
875
876    /// Gets an instrument from the cache by symbol.
877    #[must_use]
878    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
879        self.instruments_cache
880            .get(symbol)
881            .map(|entry| entry.clone())
882    }
883
884    /// Gets an instrument by CLOB pair ID.
885    ///
886    /// This uses the internal clob_pair_id mapping populated during `fetch_and_cache_instruments()`.
887    #[must_use]
888    pub fn get_instrument_by_clob_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
889        // First get the InstrumentId from clob_pair_id mapping
890        let instrument_id = self
891            .clob_pair_id_to_instrument
892            .get(&clob_pair_id)
893            .map(|entry| *entry)?;
894
895        // Then look up the full instrument by symbol
896        self.get_instrument(&instrument_id.symbol.inner())
897    }
898
899    /// Gets market parameters for order submission from the cached market data.
900    ///
901    /// Returns the quantization parameters needed by OrderBuilder to construct
902    /// properly formatted orders for the dYdX v4 protocol.
903    ///
904    /// # Errors
905    ///
906    /// Returns None if the instrument is not found in the market params cache.
907    #[must_use]
908    pub fn get_market_params(
909        &self,
910        instrument_id: &InstrumentId,
911    ) -> Option<super::models::PerpetualMarket> {
912        self.market_params_cache
913            .get(instrument_id)
914            .map(|entry| entry.clone())
915    }
916
917    /// Requests historical trades for a symbol.
918    ///
919    /// Fetches trade data from the dYdX Indexer API's `/v4/trades/perpetualMarket/:ticker` endpoint.
920    /// Results are ordered by creation time descending (newest first).
921    ///
922    /// # Errors
923    ///
924    /// Returns an error if the HTTP request fails or response cannot be parsed.
925    pub async fn request_trades(
926        &self,
927        symbol: &str,
928        limit: Option<u32>,
929    ) -> anyhow::Result<super::models::TradesResponse> {
930        self.inner
931            .get_trades(symbol, limit)
932            .await
933            .map_err(Into::into)
934    }
935
936    /// Requests historical candles for a symbol.
937    ///
938    /// Fetches candle data from the dYdX Indexer API's `/v4/candles/perpetualMarkets/:ticker` endpoint.
939    /// Results are ordered by start time ascending (oldest first).
940    ///
941    /// # Errors
942    ///
943    /// Returns an error if the HTTP request fails or response cannot be parsed.
944    pub async fn request_candles(
945        &self,
946        symbol: &str,
947        resolution: DydxCandleResolution,
948        limit: Option<u32>,
949        from_iso: Option<DateTime<Utc>>,
950        to_iso: Option<DateTime<Utc>>,
951    ) -> anyhow::Result<super::models::CandlesResponse> {
952        self.inner
953            .get_candles(symbol, resolution, limit, from_iso, to_iso)
954            .await
955            .map_err(Into::into)
956    }
957
958    /// Requests historical bars for a symbol and converts to Nautilus Bar objects.
959    ///
960    /// Fetches candle data and converts to Nautilus `Bar` objects using the
961    /// provided `BarType`. Results are ordered by timestamp ascending (oldest first).
962    ///
963    /// # Errors
964    ///
965    /// Returns an error if the HTTP request fails, response cannot be parsed,
966    /// or the instrument is not found in the cache.
967    pub async fn request_bars(
968        &self,
969        bar_type: BarType,
970        resolution: DydxCandleResolution,
971        limit: Option<u32>,
972        from_iso: Option<DateTime<Utc>>,
973        to_iso: Option<DateTime<Utc>>,
974    ) -> anyhow::Result<Vec<Bar>> {
975        let instrument_id = bar_type.instrument_id();
976        let symbol = instrument_id.symbol;
977
978        // Get instrument for precision info
979        let instrument = self
980            .get_instrument(&symbol.inner())
981            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {symbol}"))?;
982
983        // dYdX API expects ticker format "BTC-USD", not "BTC-USD-PERP"
984        let ticker = extract_raw_symbol(symbol.as_str());
985        let response = self
986            .request_candles(ticker, resolution, limit, from_iso, to_iso)
987            .await?;
988
989        let ts_init = get_atomic_clock_realtime().get_time_ns();
990        let interval_ns = get_bar_interval_ns(&bar_type);
991
992        let mut bars = Vec::with_capacity(response.candles.len());
993
994        for candle in response.candles {
995            // Calculate ts_event: startedAt + interval (end of bar)
996            let started_at_nanos = candle.started_at.timestamp_nanos_opt().ok_or_else(|| {
997                anyhow::anyhow!("Timestamp out of range for candle at {}", candle.started_at)
998            })?;
999            let ts_event = UnixNanos::from(started_at_nanos as u64) + interval_ns;
1000
1001            let bar = Bar::new(
1002                bar_type,
1003                Price::from_decimal_dp(candle.open, instrument.price_precision())?,
1004                Price::from_decimal_dp(candle.high, instrument.price_precision())?,
1005                Price::from_decimal_dp(candle.low, instrument.price_precision())?,
1006                Price::from_decimal_dp(candle.close, instrument.price_precision())?,
1007                Quantity::from_decimal_dp(candle.base_token_volume, instrument.size_precision())?,
1008                ts_event,
1009                ts_init,
1010            );
1011
1012            bars.push(bar);
1013        }
1014
1015        Ok(bars)
1016    }
1017
1018    /// Requests historical trade ticks for a symbol.
1019    ///
1020    /// Fetches trade data from the dYdX Indexer API and converts them to Nautilus
1021    /// `TradeTick` objects. Results are ordered by timestamp descending (newest first).
1022    ///
1023    /// # Errors
1024    ///
1025    /// Returns an error if the HTTP request fails, response cannot be parsed,
1026    /// or the instrument is not found in the cache.
1027    pub async fn request_trade_ticks(
1028        &self,
1029        instrument_id: InstrumentId,
1030        limit: Option<u32>,
1031    ) -> anyhow::Result<Vec<TradeTick>> {
1032        let symbol = instrument_id.symbol;
1033
1034        let instrument = self
1035            .get_instrument(&symbol.inner())
1036            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {symbol}"))?;
1037
1038        let ticker = extract_raw_symbol(symbol.as_str());
1039        let response = self.request_trades(ticker, limit).await?;
1040
1041        let ts_init = get_atomic_clock_realtime().get_time_ns();
1042
1043        let mut trades = Vec::with_capacity(response.trades.len());
1044
1045        for trade in response.trades {
1046            let ts_event_nanos = trade.created_at.timestamp_nanos_opt().ok_or_else(|| {
1047                anyhow::anyhow!("Timestamp out of range for trade at {}", trade.created_at)
1048            })?;
1049            let ts_event = UnixNanos::from(ts_event_nanos as u64);
1050
1051            let aggressor_side = match trade.side {
1052                NautilusOrderSide::Buy => AggressorSide::Buyer,
1053                NautilusOrderSide::Sell => AggressorSide::Seller,
1054                NautilusOrderSide::NoOrderSide => AggressorSide::NoAggressor,
1055            };
1056
1057            let trade_tick = TradeTick::new(
1058                instrument_id,
1059                Price::from_decimal_dp(trade.price, instrument.price_precision())?,
1060                Quantity::from_decimal_dp(trade.size, instrument.size_precision())?,
1061                aggressor_side,
1062                TradeId::new(&trade.id),
1063                ts_event,
1064                ts_init,
1065            );
1066
1067            trades.push(trade_tick);
1068        }
1069
1070        Ok(trades)
1071    }
1072
1073    /// Requests an order book snapshot for a symbol.
1074    ///
1075    /// Fetches order book data from the dYdX Indexer API and converts it to Nautilus
1076    /// `OrderBookDeltas`. The snapshot is represented as a sequence of deltas starting
1077    /// with a CLEAR action followed by ADD actions for each level.
1078    ///
1079    /// # Errors
1080    ///
1081    /// Returns an error if the HTTP request fails, response cannot be parsed,
1082    /// or the instrument is not found in the cache.
1083    pub async fn request_orderbook_snapshot(
1084        &self,
1085        instrument_id: InstrumentId,
1086    ) -> anyhow::Result<OrderBookDeltas> {
1087        let symbol = instrument_id.symbol;
1088
1089        let instrument = self
1090            .get_instrument(&symbol.inner())
1091            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {symbol}"))?;
1092
1093        let ticker = extract_raw_symbol(symbol.as_str());
1094        let response = self.inner.get_orderbook(ticker).await?;
1095
1096        let ts_init = get_atomic_clock_realtime().get_time_ns();
1097
1098        let mut deltas = Vec::with_capacity(1 + response.bids.len() + response.asks.len());
1099
1100        deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init));
1101
1102        for (i, level) in response.bids.iter().enumerate() {
1103            let is_last = i == response.bids.len() - 1 && response.asks.is_empty();
1104            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1105
1106            let order = BookOrder::new(
1107                NautilusOrderSide::Buy,
1108                Price::from_decimal_dp(level.price, instrument.price_precision())?,
1109                Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1110                0,
1111            );
1112
1113            deltas.push(OrderBookDelta::new(
1114                instrument_id,
1115                BookAction::Add,
1116                order,
1117                flags,
1118                0,
1119                ts_init,
1120                ts_init,
1121            ));
1122        }
1123
1124        for (i, level) in response.asks.iter().enumerate() {
1125            let is_last = i == response.asks.len() - 1;
1126            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1127
1128            let order = BookOrder::new(
1129                NautilusOrderSide::Sell,
1130                Price::from_decimal_dp(level.price, instrument.price_precision())?,
1131                Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1132                0,
1133            );
1134
1135            deltas.push(OrderBookDelta::new(
1136                instrument_id,
1137                BookAction::Add,
1138                order,
1139                flags,
1140                0,
1141                ts_init,
1142                ts_init,
1143            ));
1144        }
1145
1146        Ok(OrderBookDeltas::new(instrument_id, deltas))
1147    }
1148
1149    /// Exposes raw HTTP client for testing and advanced use cases.
1150    ///
1151    /// This provides access to the underlying [`DydxRawHttpClient`] for cases
1152    /// where low-level API access is needed. Most users should use the domain
1153    /// client methods instead.
1154    #[must_use]
1155    pub fn raw_client(&self) -> &Arc<DydxRawHttpClient> {
1156        &self.inner
1157    }
1158
1159    /// Check if this client is configured for testnet.
1160    #[must_use]
1161    pub fn is_testnet(&self) -> bool {
1162        self.inner.is_testnet()
1163    }
1164
1165    /// Get the base URL being used by this client.
1166    #[must_use]
1167    pub fn base_url(&self) -> &str {
1168        self.inner.base_url()
1169    }
1170
1171    /// Check if the instrument cache has been initialized.
1172    #[must_use]
1173    pub fn is_cache_initialized(&self) -> bool {
1174        self.cache_initialized.load(Ordering::Acquire)
1175    }
1176
1177    /// Get the number of instruments currently cached.
1178    #[must_use]
1179    pub fn cached_instruments_count(&self) -> usize {
1180        self.instruments_cache.len()
1181    }
1182
1183    /// Returns a reference to the instruments cache.
1184    #[must_use]
1185    pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
1186        &self.instruments_cache
1187    }
1188
1189    /// Get the mapping from CLOB pair ID to `InstrumentId`.
1190    ///
1191    /// This map is populated when instruments are fetched via `request_instruments` /
1192    /// `cache_instruments()` using the Indexer `PerpetualMarket.clob_pair_id` field.
1193    #[must_use]
1194    pub fn clob_pair_id_mapping(&self) -> &Arc<DashMap<u32, InstrumentId>> {
1195        &self.clob_pair_id_to_instrument
1196    }
1197
1198    /// Requests order status reports for a subaccount.
1199    ///
1200    /// Fetches orders from the dYdX Indexer API and converts them to Nautilus
1201    /// `OrderStatusReport` objects.
1202    ///
1203    /// # Errors
1204    ///
1205    /// Returns an error if the HTTP request fails or parsing fails.
1206    pub async fn request_order_status_reports(
1207        &self,
1208        address: &str,
1209        subaccount_number: u32,
1210        account_id: AccountId,
1211        instrument_id: Option<InstrumentId>,
1212    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1213        let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1214
1215        // Convert instrument_id to market filter
1216        let market = instrument_id.map(|id| {
1217            let symbol = id.symbol.to_string();
1218            // Remove -PERP suffix if present to get the dYdX market format (e.g., ETH-USD)
1219            symbol.trim_end_matches("-PERP").to_string()
1220        });
1221
1222        let orders = self
1223            .inner
1224            .get_orders(address, subaccount_number, market.as_deref(), None)
1225            .await?;
1226
1227        let mut reports = Vec::new();
1228
1229        for order in orders {
1230            // Get instrument by clob_pair_id
1231            let instrument = match self.get_instrument_by_clob_id(order.clob_pair_id) {
1232                Some(inst) => inst,
1233                None => {
1234                    tracing::warn!(
1235                        "Skipping order {}: no cached instrument for clob_pair_id {}",
1236                        order.id,
1237                        order.clob_pair_id
1238                    );
1239                    continue;
1240                }
1241            };
1242
1243            // Filter by instrument_id if specified
1244            if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1245                continue;
1246            }
1247
1248            match super::parse::parse_order_status_report(&order, &instrument, account_id, ts_init)
1249            {
1250                Ok(report) => reports.push(report),
1251                Err(e) => {
1252                    tracing::warn!("Failed to parse order {}: {e}", order.id);
1253                }
1254            }
1255        }
1256
1257        Ok(reports)
1258    }
1259
1260    /// Requests fill reports for a subaccount.
1261    ///
1262    /// Fetches fills from the dYdX Indexer API and converts them to Nautilus
1263    /// `FillReport` objects.
1264    ///
1265    /// # Errors
1266    ///
1267    /// Returns an error if the HTTP request fails or parsing fails.
1268    pub async fn request_fill_reports(
1269        &self,
1270        address: &str,
1271        subaccount_number: u32,
1272        account_id: AccountId,
1273        instrument_id: Option<InstrumentId>,
1274    ) -> anyhow::Result<Vec<FillReport>> {
1275        let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1276
1277        // Convert instrument_id to market filter
1278        let market = instrument_id.map(|id| {
1279            let symbol = id.symbol.to_string();
1280            symbol.trim_end_matches("-PERP").to_string()
1281        });
1282
1283        let fills_response = self
1284            .inner
1285            .get_fills(address, subaccount_number, market.as_deref(), None)
1286            .await?;
1287
1288        let mut reports = Vec::new();
1289
1290        for fill in fills_response.fills {
1291            // Get instrument by market ticker
1292            let market = &fill.market;
1293            let symbol = ustr::Ustr::from(&format!("{market}-PERP"));
1294            let instrument = match self.get_instrument(&symbol) {
1295                Some(inst) => inst,
1296                None => {
1297                    tracing::warn!(
1298                        "Skipping fill {}: no cached instrument for market {}",
1299                        fill.id,
1300                        fill.market
1301                    );
1302                    continue;
1303                }
1304            };
1305
1306            // Filter by instrument_id if specified
1307            if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1308                continue;
1309            }
1310
1311            match super::parse::parse_fill_report(&fill, &instrument, account_id, ts_init) {
1312                Ok(report) => reports.push(report),
1313                Err(e) => {
1314                    tracing::warn!("Failed to parse fill {}: {e}", fill.id);
1315                }
1316            }
1317        }
1318
1319        Ok(reports)
1320    }
1321
1322    /// Requests position status reports for a subaccount.
1323    ///
1324    /// Fetches positions from the dYdX Indexer API and converts them to Nautilus
1325    /// `PositionStatusReport` objects.
1326    ///
1327    /// # Errors
1328    ///
1329    /// Returns an error if the HTTP request fails or parsing fails.
1330    pub async fn request_position_status_reports(
1331        &self,
1332        address: &str,
1333        subaccount_number: u32,
1334        account_id: AccountId,
1335        instrument_id: Option<InstrumentId>,
1336    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1337        let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1338
1339        let subaccount_response = self
1340            .inner
1341            .get_subaccount(address, subaccount_number)
1342            .await?;
1343
1344        let mut reports = Vec::new();
1345
1346        for (market, position) in subaccount_response.subaccount.open_perpetual_positions {
1347            // Get instrument by market ticker
1348            let symbol = ustr::Ustr::from(&format!("{market}-PERP"));
1349            let instrument = match self.get_instrument(&symbol) {
1350                Some(inst) => inst,
1351                None => {
1352                    tracing::warn!(
1353                        "Skipping position: no cached instrument for market {}",
1354                        market
1355                    );
1356                    continue;
1357                }
1358            };
1359
1360            // Filter by instrument_id if specified
1361            if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1362                continue;
1363            }
1364
1365            match super::parse::parse_position_status_report(
1366                &position,
1367                &instrument,
1368                account_id,
1369                ts_init,
1370            ) {
1371                Ok(report) => reports.push(report),
1372                Err(e) => {
1373                    tracing::warn!("Failed to parse position for {}: {e}", market);
1374                }
1375            }
1376        }
1377
1378        Ok(reports)
1379    }
1380}
1381
1382#[cfg(test)]
1383mod tests {
1384    use nautilus_core::UnixNanos;
1385    use rstest::rstest;
1386
1387    use super::*;
1388    use crate::http::error;
1389
1390    #[tokio::test]
1391    async fn test_raw_client_creation() {
1392        let client = DydxRawHttpClient::new(None, Some(30), None, false, None);
1393        assert!(client.is_ok());
1394
1395        let client = client.unwrap();
1396        assert!(!client.is_testnet());
1397        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1398    }
1399
1400    #[tokio::test]
1401    async fn test_raw_client_testnet() {
1402        let client = DydxRawHttpClient::new(None, Some(30), None, true, None);
1403        assert!(client.is_ok());
1404
1405        let client = client.unwrap();
1406        assert!(client.is_testnet());
1407        assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1408    }
1409
1410    #[tokio::test]
1411    async fn test_domain_client_creation() {
1412        let client = DydxHttpClient::new(None, Some(30), None, false, None);
1413        assert!(client.is_ok());
1414
1415        let client = client.unwrap();
1416        assert!(!client.is_testnet());
1417        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1418        assert!(!client.is_cache_initialized());
1419        assert_eq!(client.cached_instruments_count(), 0);
1420    }
1421
1422    #[tokio::test]
1423    async fn test_domain_client_testnet() {
1424        let client = DydxHttpClient::new(None, Some(30), None, true, None);
1425        assert!(client.is_ok());
1426
1427        let client = client.unwrap();
1428        assert!(client.is_testnet());
1429        assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1430    }
1431
1432    #[tokio::test]
1433    async fn test_domain_client_default() {
1434        let client = DydxHttpClient::default();
1435        assert!(!client.is_testnet());
1436        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1437        assert!(!client.is_cache_initialized());
1438    }
1439
1440    #[tokio::test]
1441    async fn test_domain_client_clone() {
1442        let client = DydxHttpClient::new(None, Some(30), None, false, None).unwrap();
1443
1444        // Clone before initialization
1445        let cloned = client.clone();
1446        assert!(!cloned.is_cache_initialized());
1447
1448        // Simulate cache initialization
1449        client.cache_initialized.store(true, Ordering::Release);
1450
1451        // Clone after initialization
1452        #[allow(clippy::redundant_clone)]
1453        let cloned_after = client.clone();
1454        assert!(cloned_after.is_cache_initialized());
1455    }
1456
1457    #[rstest]
1458    fn test_domain_client_cache_instrument() {
1459        use nautilus_model::{
1460            identifiers::{InstrumentId, Symbol},
1461            instruments::CryptoPerpetual,
1462            types::{Currency, Price, Quantity},
1463        };
1464
1465        let client = DydxHttpClient::default();
1466        assert_eq!(client.cached_instruments_count(), 0);
1467
1468        // Create a test instrument
1469        let instrument_id =
1470            InstrumentId::new(Symbol::from("BTC-USD"), *crate::common::consts::DYDX_VENUE);
1471        let price = Price::from("1.0");
1472        let size = Quantity::from("0.001");
1473        let instrument = CryptoPerpetual::new(
1474            instrument_id,
1475            Symbol::from("BTC-USD"),
1476            Currency::BTC(),
1477            Currency::USD(),
1478            Currency::USD(),
1479            false,
1480            price.precision,
1481            size.precision,
1482            price,
1483            size,
1484            None,
1485            None,
1486            None,
1487            None,
1488            None,
1489            None,
1490            None,
1491            None,
1492            None,
1493            None,
1494            None,
1495            None,
1496            UnixNanos::default(),
1497            UnixNanos::default(),
1498        );
1499
1500        // Cache the instrument
1501        client.cache_instrument(InstrumentAny::CryptoPerpetual(instrument));
1502        assert_eq!(client.cached_instruments_count(), 1);
1503        assert!(client.is_cache_initialized());
1504
1505        // Retrieve it
1506        let btc_usd = Ustr::from("BTC-USD");
1507        let cached = client.get_instrument(&btc_usd);
1508        assert!(cached.is_some());
1509    }
1510
1511    #[rstest]
1512    fn test_domain_client_get_instrument_not_found() {
1513        let client = DydxHttpClient::default();
1514        let eth_usd = Ustr::from("ETH-USD");
1515        let result = client.get_instrument(&eth_usd);
1516        assert!(result.is_none());
1517    }
1518
1519    #[tokio::test]
1520    async fn test_http_timeout_respects_configuration_and_does_not_block() {
1521        use axum::{Router, routing::get};
1522        use tokio::net::TcpListener;
1523
1524        async fn slow_handler() -> &'static str {
1525            // Sleep longer than the configured HTTP timeout.
1526            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1527            "ok"
1528        }
1529
1530        let router = Router::new().route("/v4/slow", get(slow_handler));
1531
1532        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1533        let addr = listener.local_addr().unwrap();
1534
1535        tokio::spawn(async move {
1536            axum::serve(listener, router.into_make_service())
1537                .await
1538                .unwrap();
1539        });
1540
1541        let base_url = format!("http://{addr}");
1542
1543        // Configure a small operation timeout and no retries so the request
1544        // fails quickly even though the handler sleeps for 5 seconds.
1545        let retry_config = RetryConfig {
1546            max_retries: 0,
1547            initial_delay_ms: 0,
1548            max_delay_ms: 0,
1549            backoff_factor: 1.0,
1550            jitter_ms: 0,
1551            operation_timeout_ms: Some(500),
1552            immediate_first: true,
1553            max_elapsed_ms: Some(1_000),
1554        };
1555
1556        // Keep HTTP client timeout at a typical value; rely on RetryManager
1557        // operation timeout to enforce non-blocking behavior.
1558        let client =
1559            DydxRawHttpClient::new(Some(base_url), Some(60), None, false, Some(retry_config))
1560                .unwrap();
1561
1562        let start = std::time::Instant::now();
1563        let result: Result<serde_json::Value, error::DydxHttpError> =
1564            client.send_request(Method::GET, "/v4/slow", None).await;
1565        let elapsed = start.elapsed();
1566
1567        // Request should fail (timeout or client error), but without blocking the thread
1568        // for the full handler duration.
1569        assert!(result.is_err());
1570        assert!(elapsed < std::time::Duration::from_secs(3));
1571    }
1572}