nautilus_dydx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides an ergonomic wrapper around the **dYdX v4 Indexer REST API** –
17//! <https://docs.dydx.exchange/api_integration-indexer/indexer_api>.
18//!
19//! This module exports two complementary HTTP clients following the standardized
20//! two-layer architecture pattern established in OKX, Bybit, and BitMEX adapters:
21//!
22//! - [`DydxRawHttpClient`]: Low-level HTTP methods matching dYdX Indexer API endpoints.
23//! - [`DydxHttpClient`]: High-level methods using Nautilus domain types with instrument caching.
24//!
25//! ## Two-Layer Architecture
26//!
27//! The raw client handles HTTP communication, rate limiting, retries, and basic response parsing.
28//! The domain client wraps the raw client in an `Arc`, maintains an instrument cache using `DashMap`,
29//! and provides high-level methods that work with Nautilus domain types.
30//!
31//! ## Key Responsibilities
32//!
33//! - Rate-limiting based on the public dYdX specification.
34//! - Zero-copy deserialization of large JSON payloads into domain models.
35//! - Conversion of raw exchange errors into the rich [`DydxHttpError`] enum.
36//! - Instrument caching with standard methods: `cache_instruments()`, `cache_instrument()`, `get_instrument()`.
37//!
38//! # Important Note
39//!
40//! The dYdX v4 Indexer REST API does **NOT** require authentication or request signing.
41//! All endpoints are publicly accessible using only wallet addresses and subaccount numbers
42//! as query parameters. Order submission and trading operations use gRPC with blockchain
43//! transaction signing, not REST API.
44//!
45//! # Official Documentation
46//!
47//! | Endpoint                             | Reference                                              |
48//! |--------------------------------------|--------------------------------------------------------|
49//! | Market data                          | <https://docs.dydx.exchange/api_integration-indexer/indexer_api#markets> |
50//! | Account data                         | <https://docs.dydx.exchange/api_integration-indexer/indexer_api#accounts> |
51//! | Utility endpoints                    | <https://docs.dydx.exchange/api_integration-indexer/indexer_api#utility> |
52
53use std::{
54    collections::HashMap,
55    fmt::Debug,
56    num::NonZeroU32,
57    sync::{
58        Arc, LazyLock,
59        atomic::{AtomicBool, Ordering},
60    },
61};
62
63use chrono::{DateTime, Utc};
64use dashmap::DashMap;
65use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
66use nautilus_model::{
67    data::{
68        Bar, BarType, BookOrder, OrderBookDelta, OrderBookDeltas, TradeTick,
69        bar::get_bar_interval_ns,
70    },
71    enums::{AggressorSide, BookAction, OrderSide as NautilusOrderSide, RecordFlag},
72    identifiers::{AccountId, InstrumentId, TradeId},
73    instruments::{Instrument, InstrumentAny},
74    reports::{FillReport, OrderStatusReport, PositionStatusReport},
75    types::{Price, Quantity},
76};
77use nautilus_network::{
78    http::{HttpClient, Method, USER_AGENT},
79    ratelimiter::quota::Quota,
80    retry::{RetryConfig, RetryManager},
81};
82use rust_decimal::Decimal;
83use serde::{Deserialize, Serialize, de::DeserializeOwned};
84use tokio_util::sync::CancellationToken;
85use ustr::Ustr;
86
87use super::error::DydxHttpError;
88use crate::common::{
89    consts::{DYDX_HTTP_URL, DYDX_TESTNET_HTTP_URL},
90    enums::DydxCandleResolution,
91    parse::extract_raw_symbol,
92};
93
94/// Default dYdX Indexer REST API rate limit.
95///
96/// The dYdX Indexer API rate limits are generous for read-only operations:
97/// - General: 100 requests per 10 seconds per IP
98/// - We use a conservative 10 requests per second as the default quota.
99pub static DYDX_REST_QUOTA: LazyLock<Quota> =
100    LazyLock::new(|| Quota::per_second(NonZeroU32::new(10).unwrap()));
101
102/// Represents a dYdX HTTP response wrapper.
103///
104/// Most dYdX Indexer API endpoints return data directly without a wrapper,
105/// but some endpoints may use this structure for consistency.
106#[derive(Debug, Serialize, Deserialize)]
107pub struct DydxResponse<T> {
108    /// The typed data returned by the dYdX endpoint.
109    pub data: T,
110}
111
112/// Provides a raw HTTP client for interacting with the [dYdX v4](https://dydx.exchange) Indexer REST API.
113///
114/// This client wraps the underlying [`HttpClient`] to handle functionality
115/// specific to dYdX Indexer API, such as rate-limiting, forming request URLs,
116/// and deserializing responses into dYdX specific data models.
117///
118/// **Note**: Unlike traditional centralized exchanges, the dYdX v4 Indexer REST API
119/// does NOT require authentication, API keys, or request signing. All endpoints are
120/// publicly accessible.
121pub struct DydxRawHttpClient {
122    base_url: String,
123    client: HttpClient,
124    retry_manager: RetryManager<DydxHttpError>,
125    cancellation_token: CancellationToken,
126    is_testnet: bool,
127}
128
129impl Default for DydxRawHttpClient {
130    fn default() -> Self {
131        Self::new(None, Some(60), None, false, None)
132            .expect("Failed to create default DydxRawHttpClient")
133    }
134}
135
136impl Debug for DydxRawHttpClient {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        f.debug_struct(stringify!(DydxRawHttpClient))
139            .field("base_url", &self.base_url)
140            .field("is_testnet", &self.is_testnet)
141            .finish_non_exhaustive()
142    }
143}
144
145impl DydxRawHttpClient {
146    /// Cancel all pending HTTP requests.
147    pub fn cancel_all_requests(&self) {
148        self.cancellation_token.cancel();
149    }
150
151    /// Get the cancellation token for this client.
152    pub fn cancellation_token(&self) -> &CancellationToken {
153        &self.cancellation_token
154    }
155
156    /// Creates a new [`DydxRawHttpClient`] using the default dYdX Indexer HTTP URL,
157    /// optionally overridden with a custom base URL.
158    ///
159    /// **Note**: No credentials are required as the dYdX Indexer API is publicly accessible.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if the retry manager cannot be created.
164    pub fn new(
165        base_url: Option<String>,
166        timeout_secs: Option<u64>,
167        proxy_url: Option<String>,
168        is_testnet: bool,
169        retry_config: Option<RetryConfig>,
170    ) -> anyhow::Result<Self> {
171        let base_url = if is_testnet {
172            base_url.unwrap_or_else(|| DYDX_TESTNET_HTTP_URL.to_string())
173        } else {
174            base_url.unwrap_or_else(|| DYDX_HTTP_URL.to_string())
175        };
176
177        let retry_manager = RetryManager::new(retry_config.unwrap_or_default());
178
179        let mut headers = HashMap::new();
180        headers.insert(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
181
182        let client = HttpClient::new(
183            headers,
184            vec![], // No specific headers to extract from responses
185            vec![], // No keyed quotas (we use a single global quota)
186            Some(*DYDX_REST_QUOTA),
187            timeout_secs,
188            proxy_url,
189        )
190        .map_err(|e| {
191            DydxHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
192        })?;
193
194        Ok(Self {
195            base_url,
196            client,
197            retry_manager,
198            cancellation_token: CancellationToken::new(),
199            is_testnet,
200        })
201    }
202
203    /// Check if this client is configured for testnet.
204    #[must_use]
205    pub const fn is_testnet(&self) -> bool {
206        self.is_testnet
207    }
208
209    /// Get the base URL being used by this client.
210    #[must_use]
211    pub fn base_url(&self) -> &str {
212        &self.base_url
213    }
214
215    /// Send a request to a dYdX Indexer API endpoint.
216    ///
217    /// **Note**: dYdX Indexer API does not require authentication headers.
218    ///
219    /// # Errors
220    ///
221    /// Returns an error if:
222    /// - The HTTP request fails.
223    /// - The response has a non-success HTTP status code.
224    /// - The response body cannot be deserialized to type `T`.
225    /// - The request is canceled.
226    pub async fn send_request<T>(
227        &self,
228        method: Method,
229        endpoint: &str,
230        query_params: Option<&str>,
231    ) -> Result<T, DydxHttpError>
232    where
233        T: DeserializeOwned,
234    {
235        let url = if let Some(params) = query_params {
236            format!("{}{endpoint}?{params}", self.base_url)
237        } else {
238            format!("{}{endpoint}", self.base_url)
239        };
240
241        let operation = || async {
242            let request = self
243                .client
244                .request_with_ustr_keys(
245                    method.clone(),
246                    url.clone(),
247                    None, // No params
248                    None, // No additional headers
249                    None, // No body for GET requests
250                    None, // Use default timeout
251                    None, // No specific rate limit keys (using global quota)
252                )
253                .await
254                .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
255
256            if !request.status.is_success() {
257                return Err(DydxHttpError::HttpStatus {
258                    status: request.status.as_u16(),
259                    message: String::from_utf8_lossy(&request.body).to_string(),
260                });
261            }
262
263            Ok(request)
264        };
265
266        // Retry strategy for dYdX Indexer API:
267        // 1. Network errors: always retry (transient connection issues)
268        // 2. HTTP 429/5xx: rate limiting and server errors should be retried
269        // 3. Client errors (4xx except 429): should NOT be retried
270        let should_retry = |error: &DydxHttpError| -> bool {
271            match error {
272                DydxHttpError::HttpClientError(_) => true,
273                DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
274                _ => false,
275            }
276        };
277
278        let create_error = |msg: String| -> DydxHttpError {
279            if msg == "canceled" {
280                DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
281            } else {
282                DydxHttpError::ValidationError(msg)
283            }
284        };
285
286        let response = self
287            .retry_manager
288            .execute_with_retry_with_cancel(
289                endpoint,
290                operation,
291                should_retry,
292                create_error,
293                &self.cancellation_token,
294            )
295            .await?;
296
297        serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
298            error: e.to_string(),
299            body: String::from_utf8_lossy(&response.body).to_string(),
300        })
301    }
302
303    /// Send a POST request to a dYdX Indexer API endpoint.
304    ///
305    /// Note: Most dYdX Indexer endpoints are GET-based. POST is rarely used.
306    ///
307    /// # Errors
308    ///
309    /// Returns an error if:
310    /// - The request body cannot be serialized to JSON.
311    /// - The HTTP request fails.
312    /// - The response has a non-success HTTP status code.
313    /// - The response body cannot be deserialized to type `T`.
314    /// - The request is canceled.
315    pub async fn send_post_request<T, B>(
316        &self,
317        endpoint: &str,
318        body: &B,
319    ) -> Result<T, DydxHttpError>
320    where
321        T: DeserializeOwned,
322        B: Serialize,
323    {
324        let url = format!("{}{endpoint}", self.base_url);
325
326        let body_bytes = serde_json::to_vec(body).map_err(|e| DydxHttpError::Serialization {
327            error: e.to_string(),
328        })?;
329
330        let operation = || async {
331            let request = self
332                .client
333                .request_with_ustr_keys(
334                    Method::POST,
335                    url.clone(),
336                    None, // No params
337                    None, // No additional headers (content-type handled by body)
338                    Some(body_bytes.clone()),
339                    None, // Use default timeout
340                    None, // No specific rate limit keys (using global quota)
341                )
342                .await
343                .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
344
345            if !request.status.is_success() {
346                return Err(DydxHttpError::HttpStatus {
347                    status: request.status.as_u16(),
348                    message: String::from_utf8_lossy(&request.body).to_string(),
349                });
350            }
351
352            Ok(request)
353        };
354
355        // Retry strategy (same as GET requests)
356        let should_retry = |error: &DydxHttpError| -> bool {
357            match error {
358                DydxHttpError::HttpClientError(_) => true,
359                DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
360                _ => false,
361            }
362        };
363
364        let create_error = |msg: String| -> DydxHttpError {
365            if msg == "canceled" {
366                DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
367            } else {
368                DydxHttpError::ValidationError(msg)
369            }
370        };
371
372        let response = self
373            .retry_manager
374            .execute_with_retry_with_cancel(
375                endpoint,
376                operation,
377                should_retry,
378                create_error,
379                &self.cancellation_token,
380            )
381            .await?;
382
383        serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
384            error: e.to_string(),
385            body: String::from_utf8_lossy(&response.body).to_string(),
386        })
387    }
388
389    /// Fetch all perpetual markets from dYdX.
390    ///
391    /// # Errors
392    ///
393    /// Returns an error if the HTTP request fails or response parsing fails.
394    pub async fn get_markets(&self) -> Result<super::models::MarketsResponse, DydxHttpError> {
395        self.send_request(Method::GET, "/v4/perpetualMarkets", None)
396            .await
397    }
398
399    /// Fetch all instruments and parse them into Nautilus `InstrumentAny` types.
400    ///
401    /// This method fetches all perpetual markets from dYdX and converts them
402    /// into Nautilus instrument definitions using the `parse_instrument_any` function.
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if:
407    /// - The HTTP request fails.
408    /// - The response cannot be parsed.
409    /// - Any instrument parsing fails.
410    ///
411    pub async fn fetch_instruments(
412        &self,
413        maker_fee: Option<Decimal>,
414        taker_fee: Option<Decimal>,
415    ) -> Result<Vec<InstrumentAny>, DydxHttpError> {
416        let markets_response = self.get_markets().await?;
417        let ts_init = get_atomic_clock_realtime().get_time_ns();
418
419        let mut instruments = Vec::new();
420        let mut skipped_inactive = 0;
421
422        for (ticker, market) in markets_response.markets {
423            if !super::parse::is_market_active(&market.status) {
424                log::debug!(
425                    "Skipping inactive market {ticker} (status: {:?})",
426                    market.status
427                );
428                skipped_inactive += 1;
429                continue;
430            }
431
432            match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
433                Ok(instrument) => {
434                    instruments.push(instrument);
435                }
436                Err(e) => {
437                    log::error!("Failed to parse instrument {ticker}: {e}");
438                }
439            }
440        }
441
442        if skipped_inactive > 0 {
443            log::info!(
444                "Parsed {} instruments, skipped {} inactive",
445                instruments.len(),
446                skipped_inactive
447            );
448        } else {
449            log::info!("Parsed {} instruments", instruments.len());
450        }
451
452        Ok(instruments)
453    }
454
455    /// Fetch orderbook for a specific market.
456    ///
457    /// # Errors
458    ///
459    /// Returns an error if the HTTP request fails or response parsing fails.
460    pub async fn get_orderbook(
461        &self,
462        ticker: &str,
463    ) -> Result<super::models::OrderbookResponse, DydxHttpError> {
464        let endpoint = format!("/v4/orderbooks/perpetualMarket/{ticker}");
465        self.send_request(Method::GET, &endpoint, None).await
466    }
467
468    /// Fetch recent trades for a market.
469    ///
470    /// # Errors
471    ///
472    /// Returns an error if the HTTP request fails or response parsing fails.
473    pub async fn get_trades(
474        &self,
475        ticker: &str,
476        limit: Option<u32>,
477    ) -> Result<super::models::TradesResponse, DydxHttpError> {
478        let endpoint = format!("/v4/trades/perpetualMarket/{ticker}");
479        let query = limit.map(|l| format!("limit={l}"));
480        self.send_request(Method::GET, &endpoint, query.as_deref())
481            .await
482    }
483
484    /// Fetch candles/klines for a market.
485    ///
486    /// # Errors
487    ///
488    /// Returns an error if the HTTP request fails or response parsing fails.
489    pub async fn get_candles(
490        &self,
491        ticker: &str,
492        resolution: DydxCandleResolution,
493        limit: Option<u32>,
494        from_iso: Option<DateTime<Utc>>,
495        to_iso: Option<DateTime<Utc>>,
496    ) -> Result<super::models::CandlesResponse, DydxHttpError> {
497        let endpoint = format!("/v4/candles/perpetualMarkets/{ticker}");
498        let mut query_parts = vec![format!("resolution={resolution}")];
499        if let Some(l) = limit {
500            query_parts.push(format!("limit={l}"));
501        }
502        if let Some(from) = from_iso {
503            let from_str = from.to_rfc3339();
504            query_parts.push(format!("fromISO={}", urlencoding::encode(&from_str)));
505        }
506        if let Some(to) = to_iso {
507            let to_str = to.to_rfc3339();
508            query_parts.push(format!("toISO={}", urlencoding::encode(&to_str)));
509        }
510        let query = query_parts.join("&");
511        self.send_request(Method::GET, &endpoint, Some(&query))
512            .await
513    }
514
515    /// Fetch subaccount information.
516    ///
517    /// # Errors
518    ///
519    /// Returns an error if the HTTP request fails or response parsing fails.
520    pub async fn get_subaccount(
521        &self,
522        address: &str,
523        subaccount_number: u32,
524    ) -> Result<super::models::SubaccountResponse, DydxHttpError> {
525        let endpoint = format!("/v4/addresses/{address}/subaccountNumber/{subaccount_number}");
526        self.send_request(Method::GET, &endpoint, None).await
527    }
528
529    /// Fetch fills for a subaccount.
530    ///
531    /// # Errors
532    ///
533    /// Returns an error if the HTTP request fails or response parsing fails.
534    pub async fn get_fills(
535        &self,
536        address: &str,
537        subaccount_number: u32,
538        market: Option<&str>,
539        limit: Option<u32>,
540    ) -> Result<super::models::FillsResponse, DydxHttpError> {
541        let endpoint = "/v4/fills";
542        let mut query_parts = vec![
543            format!("address={address}"),
544            format!("subaccountNumber={subaccount_number}"),
545        ];
546        if let Some(m) = market {
547            query_parts.push(format!("market={m}"));
548        }
549        if let Some(l) = limit {
550            query_parts.push(format!("limit={l}"));
551        }
552        let query = query_parts.join("&");
553        self.send_request(Method::GET, endpoint, Some(&query)).await
554    }
555
556    /// Fetch orders for a subaccount.
557    ///
558    /// # Errors
559    ///
560    /// Returns an error if the HTTP request fails or response parsing fails.
561    pub async fn get_orders(
562        &self,
563        address: &str,
564        subaccount_number: u32,
565        market: Option<&str>,
566        limit: Option<u32>,
567    ) -> Result<super::models::OrdersResponse, DydxHttpError> {
568        let endpoint = "/v4/orders";
569        let mut query_parts = vec![
570            format!("address={address}"),
571            format!("subaccountNumber={subaccount_number}"),
572        ];
573        if let Some(m) = market {
574            query_parts.push(format!("market={m}"));
575        }
576        if let Some(l) = limit {
577            query_parts.push(format!("limit={l}"));
578        }
579        let query = query_parts.join("&");
580        self.send_request(Method::GET, endpoint, Some(&query)).await
581    }
582
583    /// Fetch transfers for a subaccount.
584    ///
585    /// # Errors
586    ///
587    /// Returns an error if the HTTP request fails or response parsing fails.
588    pub async fn get_transfers(
589        &self,
590        address: &str,
591        subaccount_number: u32,
592        limit: Option<u32>,
593    ) -> Result<super::models::TransfersResponse, DydxHttpError> {
594        let endpoint = "/v4/transfers";
595        let mut query_parts = vec![
596            format!("address={address}"),
597            format!("subaccountNumber={subaccount_number}"),
598        ];
599        if let Some(l) = limit {
600            query_parts.push(format!("limit={l}"));
601        }
602        let query = query_parts.join("&");
603        self.send_request(Method::GET, endpoint, Some(&query)).await
604    }
605
606    /// Get current server time.
607    ///
608    /// # Errors
609    ///
610    /// Returns an error if the HTTP request fails or response parsing fails.
611    pub async fn get_time(&self) -> Result<super::models::TimeResponse, DydxHttpError> {
612        self.send_request(Method::GET, "/v4/time", None).await
613    }
614
615    /// Get current blockchain height.
616    ///
617    /// # Errors
618    ///
619    /// Returns an error if the HTTP request fails or response parsing fails.
620    pub async fn get_height(&self) -> Result<super::models::HeightResponse, DydxHttpError> {
621        self.send_request(Method::GET, "/v4/height", None).await
622    }
623}
624
625/// Provides a higher-level HTTP client for the [dYdX v4](https://dydx.exchange) Indexer REST API.
626///
627/// This client wraps the underlying `DydxRawHttpClient` to handle conversions
628/// into the Nautilus domain model, following the two-layer pattern established
629/// in OKX, Bybit, and BitMEX adapters.
630///
631/// **Architecture:**
632/// - **Raw client** (`DydxRawHttpClient`): Low-level HTTP methods matching dYdX Indexer API endpoints.
633/// - **Domain client** (`DydxHttpClient`): High-level methods using Nautilus domain types.
634///
635/// The domain client:
636/// - Wraps the raw client in an `Arc` for efficient cloning (required for Python bindings).
637/// - Maintains an instrument cache using `DashMap` for thread-safe concurrent access.
638/// - Provides standard cache methods: `cache_instruments()`, `cache_instrument()`, `get_instrument()`.
639/// - Tracks cache initialization state for optimizations.
640#[derive(Debug)]
641#[cfg_attr(
642    feature = "python",
643    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx")
644)]
645pub struct DydxHttpClient {
646    /// Raw HTTP client wrapped in Arc for efficient cloning.
647    pub(crate) inner: Arc<DydxRawHttpClient>,
648    /// Instrument cache shared across the adapter using DashMap for thread-safe access.
649    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
650    /// Cached mapping from CLOB pair ID → InstrumentId for efficient lookups.
651    ///
652    /// This is populated from HTTP PerpetualMarket metadata (`clob_pair_id`) alongside
653    /// instrument creation to avoid re-deriving IDs from symbols or other heuristics.
654    pub(crate) clob_pair_id_to_instrument: Arc<DashMap<u32, InstrumentId>>,
655    /// Cached mapping from InstrumentId → PerpetualMarket for market params extraction.
656    ///
657    /// This stores the raw market data from the HTTP API for later extraction of
658    /// quantization parameters (atomic_resolution, subticks_per_tick, etc.) needed
659    /// for order submission.
660    pub(crate) market_params_cache: Arc<DashMap<InstrumentId, super::models::PerpetualMarket>>,
661    /// Tracks whether the instrument cache has been initialized.
662    cache_initialized: AtomicBool,
663}
664
665impl Clone for DydxHttpClient {
666    fn clone(&self) -> Self {
667        let cache_initialized = AtomicBool::new(false);
668        let is_initialized = self.cache_initialized.load(Ordering::Acquire);
669        if is_initialized {
670            cache_initialized.store(true, Ordering::Release);
671        }
672
673        Self {
674            inner: self.inner.clone(),
675            instruments_cache: self.instruments_cache.clone(),
676            clob_pair_id_to_instrument: self.clob_pair_id_to_instrument.clone(),
677            market_params_cache: self.market_params_cache.clone(),
678            cache_initialized,
679        }
680    }
681}
682
683impl Default for DydxHttpClient {
684    fn default() -> Self {
685        Self::new(None, Some(60), None, false, None)
686            .expect("Failed to create default DydxHttpClient")
687    }
688}
689
690impl DydxHttpClient {
691    /// Creates a new [`DydxHttpClient`] using the default dYdX Indexer HTTP URL,
692    /// optionally overridden with a custom base URL.
693    ///
694    /// **Note**: No credentials are required as the dYdX Indexer API is publicly accessible.
695    /// Order submission and trading operations use gRPC with blockchain transaction signing.
696    ///
697    /// # Errors
698    ///
699    /// Returns an error if the underlying HTTP client or retry manager cannot be created.
700    pub fn new(
701        base_url: Option<String>,
702        timeout_secs: Option<u64>,
703        proxy_url: Option<String>,
704        is_testnet: bool,
705        retry_config: Option<RetryConfig>,
706    ) -> anyhow::Result<Self> {
707        Ok(Self {
708            inner: Arc::new(DydxRawHttpClient::new(
709                base_url,
710                timeout_secs,
711                proxy_url,
712                is_testnet,
713                retry_config,
714            )?),
715            instruments_cache: Arc::new(DashMap::new()),
716            clob_pair_id_to_instrument: Arc::new(DashMap::new()),
717            market_params_cache: Arc::new(DashMap::new()),
718            cache_initialized: AtomicBool::new(false),
719        })
720    }
721
722    /// Requests instruments from the dYdX Indexer API and returns Nautilus domain types.
723    ///
724    /// This method does NOT automatically cache results. Use `fetch_and_cache_instruments()`
725    /// for automatic caching, or call `cache_instruments()` manually with the results.
726    ///
727    /// # Errors
728    ///
729    /// Returns an error if the HTTP request or parsing fails.
730    /// Individual instrument parsing errors are logged as warnings.
731    pub async fn request_instruments(
732        &self,
733        symbol: Option<String>,
734        maker_fee: Option<Decimal>,
735        taker_fee: Option<Decimal>,
736    ) -> anyhow::Result<Vec<InstrumentAny>> {
737        let markets_response = self.inner.get_markets().await?;
738        let ts_init = get_atomic_clock_realtime().get_time_ns();
739
740        let mut instruments = Vec::new();
741        let mut skipped_inactive = 0;
742
743        for (ticker, market) in markets_response.markets {
744            // Filter by symbol if specified
745            if let Some(ref sym) = symbol
746                && ticker != *sym
747            {
748                continue;
749            }
750
751            if !super::parse::is_market_active(&market.status) {
752                log::debug!(
753                    "Skipping inactive market {ticker} (status: {:?})",
754                    market.status
755                );
756                skipped_inactive += 1;
757                continue;
758            }
759
760            match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
761                Ok(instrument) => {
762                    instruments.push(instrument);
763                }
764                Err(e) => {
765                    log::error!("Failed to parse instrument {ticker}: {e}");
766                }
767            }
768        }
769
770        if skipped_inactive > 0 {
771            log::info!(
772                "Parsed {} instruments, skipped {} inactive",
773                instruments.len(),
774                skipped_inactive
775            );
776        } else {
777            log::debug!("Parsed {} instruments", instruments.len());
778        }
779
780        Ok(instruments)
781    }
782
783    /// Fetches instruments from the API and caches them.
784    ///
785    /// This is a convenience method that fetches instruments and populates both
786    /// the symbol-based and CLOB pair ID-based caches.
787    ///
788    /// On success, existing caches are cleared and repopulated atomically.
789    /// On failure, existing caches are preserved (no partial updates).
790    ///
791    /// # Errors
792    ///
793    /// Returns an error if the HTTP request fails.
794    pub async fn fetch_and_cache_instruments(&self) -> anyhow::Result<()> {
795        // Fetch first - preserve existing cache on network failure
796        let markets_response = self.inner.get_markets().await?;
797        let ts_init = get_atomic_clock_realtime().get_time_ns();
798
799        let mut parsed_instruments = Vec::new();
800        let mut parsed_markets = Vec::new();
801        let mut skipped_inactive = 0;
802
803        for (ticker, market) in markets_response.markets {
804            if !super::parse::is_market_active(&market.status) {
805                log::debug!(
806                    "Skipping inactive market {ticker} (status: {:?})",
807                    market.status
808                );
809                skipped_inactive += 1;
810                continue;
811            }
812
813            match super::parse::parse_instrument_any(&market, None, None, ts_init) {
814                Ok(instrument) => {
815                    parsed_instruments.push(instrument);
816                    parsed_markets.push(market);
817                }
818                Err(e) => {
819                    log::error!("Failed to parse instrument {ticker}: {e}");
820                }
821            }
822        }
823
824        // Only clear and repopulate caches after successful fetch and parse
825        self.instruments_cache.clear();
826        self.clob_pair_id_to_instrument.clear();
827        self.market_params_cache.clear();
828
829        for (instrument, market) in parsed_instruments.iter().zip(parsed_markets.into_iter()) {
830            let instrument_id = instrument.id();
831            let symbol = instrument_id.symbol.inner();
832            self.instruments_cache.insert(symbol, instrument.clone());
833            self.clob_pair_id_to_instrument
834                .insert(market.clob_pair_id, instrument_id);
835            self.market_params_cache.insert(instrument_id, market);
836        }
837
838        if !parsed_instruments.is_empty() {
839            self.cache_initialized.store(true, Ordering::Release);
840        }
841
842        if skipped_inactive > 0 {
843            log::info!(
844                "Cached {} instruments, skipped {} inactive",
845                parsed_instruments.len(),
846                skipped_inactive
847            );
848        } else {
849            log::info!("Cached {} instruments", parsed_instruments.len());
850        }
851
852        Ok(())
853    }
854
855    /// Caches multiple instruments.
856    ///
857    /// Any existing instruments with the same symbols will be replaced.
858    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
859        for inst in instruments {
860            let symbol = inst.id().symbol.inner();
861            self.instruments_cache.insert(symbol, inst);
862        }
863        self.cache_initialized.store(true, Ordering::Release);
864    }
865
866    /// Caches a single instrument.
867    ///
868    /// Any existing instrument with the same symbol will be replaced.
869    pub fn cache_instrument(&self, instrument: InstrumentAny) {
870        let symbol = instrument.id().symbol.inner();
871        self.instruments_cache.insert(symbol, instrument);
872        self.cache_initialized.store(true, Ordering::Release);
873    }
874
875    /// Gets an instrument from the cache by symbol.
876    #[must_use]
877    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
878        self.instruments_cache
879            .get(symbol)
880            .map(|entry| entry.clone())
881    }
882
883    /// Gets an instrument by CLOB pair ID.
884    ///
885    /// This uses the internal clob_pair_id mapping populated during `fetch_and_cache_instruments()`.
886    #[must_use]
887    pub fn get_instrument_by_clob_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
888        // First get the InstrumentId from clob_pair_id mapping
889        let instrument_id = self
890            .clob_pair_id_to_instrument
891            .get(&clob_pair_id)
892            .map(|entry| *entry)?;
893
894        // Then look up the full instrument by symbol
895        self.get_instrument(&instrument_id.symbol.inner())
896    }
897
898    /// Gets market parameters for order submission from the cached market data.
899    ///
900    /// Returns the quantization parameters needed by OrderBuilder to construct
901    /// properly formatted orders for the dYdX v4 protocol.
902    ///
903    /// # Errors
904    ///
905    /// Returns None if the instrument is not found in the market params cache.
906    #[must_use]
907    pub fn get_market_params(
908        &self,
909        instrument_id: &InstrumentId,
910    ) -> Option<super::models::PerpetualMarket> {
911        self.market_params_cache
912            .get(instrument_id)
913            .map(|entry| entry.clone())
914    }
915
916    /// Requests historical trades for a symbol.
917    ///
918    /// Fetches trade data from the dYdX Indexer API's `/v4/trades/perpetualMarket/:ticker` endpoint.
919    /// Results are ordered by creation time descending (newest first).
920    ///
921    /// # Errors
922    ///
923    /// Returns an error if the HTTP request fails or response cannot be parsed.
924    pub async fn request_trades(
925        &self,
926        symbol: &str,
927        limit: Option<u32>,
928    ) -> anyhow::Result<super::models::TradesResponse> {
929        self.inner
930            .get_trades(symbol, limit)
931            .await
932            .map_err(Into::into)
933    }
934
935    /// Requests historical candles for a symbol.
936    ///
937    /// Fetches candle data from the dYdX Indexer API's `/v4/candles/perpetualMarkets/:ticker` endpoint.
938    /// Results are ordered by start time ascending (oldest first).
939    ///
940    /// # Errors
941    ///
942    /// Returns an error if the HTTP request fails or response cannot be parsed.
943    pub async fn request_candles(
944        &self,
945        symbol: &str,
946        resolution: DydxCandleResolution,
947        limit: Option<u32>,
948        from_iso: Option<DateTime<Utc>>,
949        to_iso: Option<DateTime<Utc>>,
950    ) -> anyhow::Result<super::models::CandlesResponse> {
951        self.inner
952            .get_candles(symbol, resolution, limit, from_iso, to_iso)
953            .await
954            .map_err(Into::into)
955    }
956
957    /// Requests historical bars for a symbol and converts to Nautilus Bar objects.
958    ///
959    /// Fetches candle data and converts to Nautilus `Bar` objects using the
960    /// provided `BarType`. Results are ordered by timestamp ascending (oldest first).
961    ///
962    /// # Errors
963    ///
964    /// Returns an error if the HTTP request fails, response cannot be parsed,
965    /// or the instrument is not found in the cache.
966    pub async fn request_bars(
967        &self,
968        bar_type: BarType,
969        resolution: DydxCandleResolution,
970        limit: Option<u32>,
971        from_iso: Option<DateTime<Utc>>,
972        to_iso: Option<DateTime<Utc>>,
973    ) -> anyhow::Result<Vec<Bar>> {
974        let instrument_id = bar_type.instrument_id();
975        let symbol = instrument_id.symbol;
976
977        // Get instrument for precision info
978        let instrument = self
979            .get_instrument(&symbol.inner())
980            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {symbol}"))?;
981
982        // dYdX API expects ticker format "BTC-USD", not "BTC-USD-PERP"
983        let ticker = extract_raw_symbol(symbol.as_str());
984        let response = self
985            .request_candles(ticker, resolution, limit, from_iso, to_iso)
986            .await?;
987
988        let ts_init = get_atomic_clock_realtime().get_time_ns();
989        let interval_ns = get_bar_interval_ns(&bar_type);
990
991        let mut bars = Vec::with_capacity(response.candles.len());
992
993        for candle in response.candles {
994            // Calculate ts_event: startedAt + interval (end of bar)
995            let started_at_nanos = candle.started_at.timestamp_nanos_opt().ok_or_else(|| {
996                anyhow::anyhow!("Timestamp out of range for candle at {}", candle.started_at)
997            })?;
998            let ts_event = UnixNanos::from(started_at_nanos as u64) + interval_ns;
999
1000            let bar = Bar::new(
1001                bar_type,
1002                Price::from_decimal_dp(candle.open, instrument.price_precision())?,
1003                Price::from_decimal_dp(candle.high, instrument.price_precision())?,
1004                Price::from_decimal_dp(candle.low, instrument.price_precision())?,
1005                Price::from_decimal_dp(candle.close, instrument.price_precision())?,
1006                Quantity::from_decimal_dp(candle.base_token_volume, instrument.size_precision())?,
1007                ts_event,
1008                ts_init,
1009            );
1010
1011            bars.push(bar);
1012        }
1013
1014        Ok(bars)
1015    }
1016
1017    /// Requests historical trade ticks for a symbol.
1018    ///
1019    /// Fetches trade data from the dYdX Indexer API and converts them to Nautilus
1020    /// `TradeTick` objects. Results are ordered by timestamp descending (newest first).
1021    ///
1022    /// # Errors
1023    ///
1024    /// Returns an error if the HTTP request fails, response cannot be parsed,
1025    /// or the instrument is not found in the cache.
1026    pub async fn request_trade_ticks(
1027        &self,
1028        instrument_id: InstrumentId,
1029        limit: Option<u32>,
1030    ) -> anyhow::Result<Vec<TradeTick>> {
1031        let symbol = instrument_id.symbol;
1032
1033        let instrument = self
1034            .get_instrument(&symbol.inner())
1035            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {symbol}"))?;
1036
1037        let ticker = extract_raw_symbol(symbol.as_str());
1038        let response = self.request_trades(ticker, limit).await?;
1039
1040        let ts_init = get_atomic_clock_realtime().get_time_ns();
1041
1042        let mut trades = Vec::with_capacity(response.trades.len());
1043
1044        for trade in response.trades {
1045            let ts_event_nanos = trade.created_at.timestamp_nanos_opt().ok_or_else(|| {
1046                anyhow::anyhow!("Timestamp out of range for trade at {}", trade.created_at)
1047            })?;
1048            let ts_event = UnixNanos::from(ts_event_nanos as u64);
1049
1050            let aggressor_side = match trade.side {
1051                NautilusOrderSide::Buy => AggressorSide::Buyer,
1052                NautilusOrderSide::Sell => AggressorSide::Seller,
1053                NautilusOrderSide::NoOrderSide => AggressorSide::NoAggressor,
1054            };
1055
1056            let trade_tick = TradeTick::new(
1057                instrument_id,
1058                Price::from_decimal_dp(trade.price, instrument.price_precision())?,
1059                Quantity::from_decimal_dp(trade.size, instrument.size_precision())?,
1060                aggressor_side,
1061                TradeId::new(&trade.id),
1062                ts_event,
1063                ts_init,
1064            );
1065
1066            trades.push(trade_tick);
1067        }
1068
1069        Ok(trades)
1070    }
1071
1072    /// Requests an order book snapshot for a symbol.
1073    ///
1074    /// Fetches order book data from the dYdX Indexer API and converts it to Nautilus
1075    /// `OrderBookDeltas`. The snapshot is represented as a sequence of deltas starting
1076    /// with a CLEAR action followed by ADD actions for each level.
1077    ///
1078    /// # Errors
1079    ///
1080    /// Returns an error if the HTTP request fails, response cannot be parsed,
1081    /// or the instrument is not found in the cache.
1082    pub async fn request_orderbook_snapshot(
1083        &self,
1084        instrument_id: InstrumentId,
1085    ) -> anyhow::Result<OrderBookDeltas> {
1086        let symbol = instrument_id.symbol;
1087
1088        let instrument = self
1089            .get_instrument(&symbol.inner())
1090            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {symbol}"))?;
1091
1092        let ticker = extract_raw_symbol(symbol.as_str());
1093        let response = self.inner.get_orderbook(ticker).await?;
1094
1095        let ts_init = get_atomic_clock_realtime().get_time_ns();
1096
1097        let mut deltas = Vec::with_capacity(1 + response.bids.len() + response.asks.len());
1098
1099        deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init));
1100
1101        for (i, level) in response.bids.iter().enumerate() {
1102            let is_last = i == response.bids.len() - 1 && response.asks.is_empty();
1103            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1104
1105            let order = BookOrder::new(
1106                NautilusOrderSide::Buy,
1107                Price::from_decimal_dp(level.price, instrument.price_precision())?,
1108                Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1109                0,
1110            );
1111
1112            deltas.push(OrderBookDelta::new(
1113                instrument_id,
1114                BookAction::Add,
1115                order,
1116                flags,
1117                0,
1118                ts_init,
1119                ts_init,
1120            ));
1121        }
1122
1123        for (i, level) in response.asks.iter().enumerate() {
1124            let is_last = i == response.asks.len() - 1;
1125            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1126
1127            let order = BookOrder::new(
1128                NautilusOrderSide::Sell,
1129                Price::from_decimal_dp(level.price, instrument.price_precision())?,
1130                Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1131                0,
1132            );
1133
1134            deltas.push(OrderBookDelta::new(
1135                instrument_id,
1136                BookAction::Add,
1137                order,
1138                flags,
1139                0,
1140                ts_init,
1141                ts_init,
1142            ));
1143        }
1144
1145        Ok(OrderBookDeltas::new(instrument_id, deltas))
1146    }
1147
1148    /// Exposes raw HTTP client for testing and advanced use cases.
1149    ///
1150    /// This provides access to the underlying [`DydxRawHttpClient`] for cases
1151    /// where low-level API access is needed. Most users should use the domain
1152    /// client methods instead.
1153    #[must_use]
1154    pub fn raw_client(&self) -> &Arc<DydxRawHttpClient> {
1155        &self.inner
1156    }
1157
1158    /// Check if this client is configured for testnet.
1159    #[must_use]
1160    pub fn is_testnet(&self) -> bool {
1161        self.inner.is_testnet()
1162    }
1163
1164    /// Get the base URL being used by this client.
1165    #[must_use]
1166    pub fn base_url(&self) -> &str {
1167        self.inner.base_url()
1168    }
1169
1170    /// Check if the instrument cache has been initialized.
1171    #[must_use]
1172    pub fn is_cache_initialized(&self) -> bool {
1173        self.cache_initialized.load(Ordering::Acquire)
1174    }
1175
1176    /// Get the number of instruments currently cached.
1177    #[must_use]
1178    pub fn cached_instruments_count(&self) -> usize {
1179        self.instruments_cache.len()
1180    }
1181
1182    /// Returns a reference to the instruments cache.
1183    #[must_use]
1184    pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
1185        &self.instruments_cache
1186    }
1187
1188    /// Get the mapping from CLOB pair ID to `InstrumentId`.
1189    ///
1190    /// This map is populated when instruments are fetched via `request_instruments` /
1191    /// `cache_instruments()` using the Indexer `PerpetualMarket.clob_pair_id` field.
1192    #[must_use]
1193    pub fn clob_pair_id_mapping(&self) -> &Arc<DashMap<u32, InstrumentId>> {
1194        &self.clob_pair_id_to_instrument
1195    }
1196
1197    /// Requests order status reports for a subaccount.
1198    ///
1199    /// Fetches orders from the dYdX Indexer API and converts them to Nautilus
1200    /// `OrderStatusReport` objects.
1201    ///
1202    /// # Errors
1203    ///
1204    /// Returns an error if the HTTP request fails or parsing fails.
1205    pub async fn request_order_status_reports(
1206        &self,
1207        address: &str,
1208        subaccount_number: u32,
1209        account_id: AccountId,
1210        instrument_id: Option<InstrumentId>,
1211    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1212        let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1213
1214        // Convert instrument_id to market filter
1215        let market = instrument_id.map(|id| {
1216            let symbol = id.symbol.to_string();
1217            // Remove -PERP suffix if present to get the dYdX market format (e.g., ETH-USD)
1218            symbol.trim_end_matches("-PERP").to_string()
1219        });
1220
1221        let orders = self
1222            .inner
1223            .get_orders(address, subaccount_number, market.as_deref(), None)
1224            .await?;
1225
1226        let mut reports = Vec::new();
1227
1228        for order in orders {
1229            // Get instrument by clob_pair_id
1230            let instrument = match self.get_instrument_by_clob_id(order.clob_pair_id) {
1231                Some(inst) => inst,
1232                None => {
1233                    log::warn!(
1234                        "Skipping order {}: no cached instrument for clob_pair_id {}",
1235                        order.id,
1236                        order.clob_pair_id
1237                    );
1238                    continue;
1239                }
1240            };
1241
1242            // Filter by instrument_id if specified
1243            if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1244                continue;
1245            }
1246
1247            match super::parse::parse_order_status_report(&order, &instrument, account_id, ts_init)
1248            {
1249                Ok(report) => reports.push(report),
1250                Err(e) => {
1251                    log::warn!("Failed to parse order {}: {e}", order.id);
1252                }
1253            }
1254        }
1255
1256        Ok(reports)
1257    }
1258
1259    /// Requests fill reports for a subaccount.
1260    ///
1261    /// Fetches fills from the dYdX Indexer API and converts them to Nautilus
1262    /// `FillReport` objects.
1263    ///
1264    /// # Errors
1265    ///
1266    /// Returns an error if the HTTP request fails or parsing fails.
1267    pub async fn request_fill_reports(
1268        &self,
1269        address: &str,
1270        subaccount_number: u32,
1271        account_id: AccountId,
1272        instrument_id: Option<InstrumentId>,
1273    ) -> anyhow::Result<Vec<FillReport>> {
1274        let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1275
1276        // Convert instrument_id to market filter
1277        let market = instrument_id.map(|id| {
1278            let symbol = id.symbol.to_string();
1279            symbol.trim_end_matches("-PERP").to_string()
1280        });
1281
1282        let fills_response = self
1283            .inner
1284            .get_fills(address, subaccount_number, market.as_deref(), None)
1285            .await?;
1286
1287        let mut reports = Vec::new();
1288
1289        for fill in fills_response.fills {
1290            // Get instrument by market ticker
1291            let market = &fill.market;
1292            let symbol = Ustr::from(&format!("{market}-PERP"));
1293            let instrument = match self.get_instrument(&symbol) {
1294                Some(inst) => inst,
1295                None => {
1296                    log::warn!(
1297                        "Skipping fill {}: no cached instrument for market {}",
1298                        fill.id,
1299                        fill.market
1300                    );
1301                    continue;
1302                }
1303            };
1304
1305            // Filter by instrument_id if specified
1306            if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1307                continue;
1308            }
1309
1310            match super::parse::parse_fill_report(&fill, &instrument, account_id, ts_init) {
1311                Ok(report) => reports.push(report),
1312                Err(e) => {
1313                    log::warn!("Failed to parse fill {}: {e}", fill.id);
1314                }
1315            }
1316        }
1317
1318        Ok(reports)
1319    }
1320
1321    /// Requests position status reports for a subaccount.
1322    ///
1323    /// Fetches positions from the dYdX Indexer API and converts them to Nautilus
1324    /// `PositionStatusReport` objects.
1325    ///
1326    /// # Errors
1327    ///
1328    /// Returns an error if the HTTP request fails or parsing fails.
1329    pub async fn request_position_status_reports(
1330        &self,
1331        address: &str,
1332        subaccount_number: u32,
1333        account_id: AccountId,
1334        instrument_id: Option<InstrumentId>,
1335    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1336        let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1337
1338        let subaccount_response = self
1339            .inner
1340            .get_subaccount(address, subaccount_number)
1341            .await?;
1342
1343        let mut reports = Vec::new();
1344
1345        for (market, position) in subaccount_response.subaccount.open_perpetual_positions {
1346            // Get instrument by market ticker
1347            let symbol = Ustr::from(&format!("{market}-PERP"));
1348            let instrument = match self.get_instrument(&symbol) {
1349                Some(inst) => inst,
1350                None => {
1351                    log::warn!("Skipping position: no cached instrument for market {market}");
1352                    continue;
1353                }
1354            };
1355
1356            // Filter by instrument_id if specified
1357            if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1358                continue;
1359            }
1360
1361            match super::parse::parse_position_status_report(
1362                &position,
1363                &instrument,
1364                account_id,
1365                ts_init,
1366            ) {
1367                Ok(report) => reports.push(report),
1368                Err(e) => {
1369                    log::warn!("Failed to parse position for {market}: {e}");
1370                }
1371            }
1372        }
1373
1374        Ok(reports)
1375    }
1376}
1377
1378#[cfg(test)]
1379mod tests {
1380    use nautilus_core::UnixNanos;
1381    use rstest::rstest;
1382
1383    use super::*;
1384    use crate::http::error;
1385
1386    #[tokio::test]
1387    async fn test_raw_client_creation() {
1388        let client = DydxRawHttpClient::new(None, Some(30), None, false, None);
1389        assert!(client.is_ok());
1390
1391        let client = client.unwrap();
1392        assert!(!client.is_testnet());
1393        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1394    }
1395
1396    #[tokio::test]
1397    async fn test_raw_client_testnet() {
1398        let client = DydxRawHttpClient::new(None, Some(30), None, true, None);
1399        assert!(client.is_ok());
1400
1401        let client = client.unwrap();
1402        assert!(client.is_testnet());
1403        assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1404    }
1405
1406    #[tokio::test]
1407    async fn test_domain_client_creation() {
1408        let client = DydxHttpClient::new(None, Some(30), None, false, None);
1409        assert!(client.is_ok());
1410
1411        let client = client.unwrap();
1412        assert!(!client.is_testnet());
1413        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1414        assert!(!client.is_cache_initialized());
1415        assert_eq!(client.cached_instruments_count(), 0);
1416    }
1417
1418    #[tokio::test]
1419    async fn test_domain_client_testnet() {
1420        let client = DydxHttpClient::new(None, Some(30), None, true, None);
1421        assert!(client.is_ok());
1422
1423        let client = client.unwrap();
1424        assert!(client.is_testnet());
1425        assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1426    }
1427
1428    #[tokio::test]
1429    async fn test_domain_client_default() {
1430        let client = DydxHttpClient::default();
1431        assert!(!client.is_testnet());
1432        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1433        assert!(!client.is_cache_initialized());
1434    }
1435
1436    #[tokio::test]
1437    async fn test_domain_client_clone() {
1438        let client = DydxHttpClient::new(None, Some(30), None, false, None).unwrap();
1439
1440        // Clone before initialization
1441        let cloned = client.clone();
1442        assert!(!cloned.is_cache_initialized());
1443
1444        // Simulate cache initialization
1445        client.cache_initialized.store(true, Ordering::Release);
1446
1447        // Clone after initialization
1448        #[allow(clippy::redundant_clone)]
1449        let cloned_after = client.clone();
1450        assert!(cloned_after.is_cache_initialized());
1451    }
1452
1453    #[rstest]
1454    fn test_domain_client_cache_instrument() {
1455        use nautilus_model::{
1456            identifiers::{InstrumentId, Symbol},
1457            instruments::CryptoPerpetual,
1458            types::{Currency, Price, Quantity},
1459        };
1460
1461        let client = DydxHttpClient::default();
1462        assert_eq!(client.cached_instruments_count(), 0);
1463
1464        // Create a test instrument
1465        let instrument_id =
1466            InstrumentId::new(Symbol::from("BTC-USD"), *crate::common::consts::DYDX_VENUE);
1467        let price = Price::from("1.0");
1468        let size = Quantity::from("0.001");
1469        let instrument = CryptoPerpetual::new(
1470            instrument_id,
1471            Symbol::from("BTC-USD"),
1472            Currency::BTC(),
1473            Currency::USD(),
1474            Currency::USD(),
1475            false,
1476            price.precision,
1477            size.precision,
1478            price,
1479            size,
1480            None,
1481            None,
1482            None,
1483            None,
1484            None,
1485            None,
1486            None,
1487            None,
1488            None,
1489            None,
1490            None,
1491            None,
1492            UnixNanos::default(),
1493            UnixNanos::default(),
1494        );
1495
1496        // Cache the instrument
1497        client.cache_instrument(InstrumentAny::CryptoPerpetual(instrument));
1498        assert_eq!(client.cached_instruments_count(), 1);
1499        assert!(client.is_cache_initialized());
1500
1501        // Retrieve it
1502        let btc_usd = Ustr::from("BTC-USD");
1503        let cached = client.get_instrument(&btc_usd);
1504        assert!(cached.is_some());
1505    }
1506
1507    #[rstest]
1508    fn test_domain_client_get_instrument_not_found() {
1509        let client = DydxHttpClient::default();
1510        let eth_usd = Ustr::from("ETH-USD");
1511        let result = client.get_instrument(&eth_usd);
1512        assert!(result.is_none());
1513    }
1514
1515    #[tokio::test]
1516    async fn test_http_timeout_respects_configuration_and_does_not_block() {
1517        use axum::{Router, routing::get};
1518        use tokio::net::TcpListener;
1519
1520        async fn slow_handler() -> &'static str {
1521            // Sleep longer than the configured HTTP timeout.
1522            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1523            "ok"
1524        }
1525
1526        let router = Router::new().route("/v4/slow", get(slow_handler));
1527
1528        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1529        let addr = listener.local_addr().unwrap();
1530
1531        tokio::spawn(async move {
1532            axum::serve(listener, router.into_make_service())
1533                .await
1534                .unwrap();
1535        });
1536
1537        let base_url = format!("http://{addr}");
1538
1539        // Configure a small operation timeout and no retries so the request
1540        // fails quickly even though the handler sleeps for 5 seconds.
1541        let retry_config = RetryConfig {
1542            max_retries: 0,
1543            initial_delay_ms: 0,
1544            max_delay_ms: 0,
1545            backoff_factor: 1.0,
1546            jitter_ms: 0,
1547            operation_timeout_ms: Some(500),
1548            immediate_first: true,
1549            max_elapsed_ms: Some(1_000),
1550        };
1551
1552        // Keep HTTP client timeout at a typical value; rely on RetryManager
1553        // operation timeout to enforce non-blocking behavior.
1554        let client =
1555            DydxRawHttpClient::new(Some(base_url), Some(60), None, false, Some(retry_config))
1556                .unwrap();
1557
1558        let start = std::time::Instant::now();
1559        let result: Result<serde_json::Value, error::DydxHttpError> =
1560            client.send_request(Method::GET, "/v4/slow", None).await;
1561        let elapsed = start.elapsed();
1562
1563        // Request should fail (timeout or client error), but without blocking the thread
1564        // for the full handler duration.
1565        assert!(result.is_err());
1566        assert!(elapsed < std::time::Duration::from_secs(3));
1567    }
1568}