Skip to main content

nautilus_dydx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides an ergonomic wrapper around the **dYdX v4 Indexer REST API** –
17//! <https://docs.dydx.exchange/api_integration-indexer/indexer_api>.
18//!
19//! This module exports two complementary HTTP clients following the standardized
20//! two-layer architecture pattern established in OKX, Bybit, and BitMEX adapters:
21//!
22//! - [`DydxRawHttpClient`]: Low-level HTTP methods matching dYdX Indexer API endpoints.
23//! - [`DydxHttpClient`]: High-level methods using Nautilus domain types with instrument caching.
24//!
25//! ## Two-Layer Architecture
26//!
27//! The raw client handles HTTP communication, rate limiting, retries, and basic response parsing.
28//! The domain client wraps the raw client in an `Arc`, maintains an instrument cache using `DashMap`,
29//! and provides high-level methods that work with Nautilus domain types.
30//!
31//! ## Responsibilities
32//!
33//! - Rate-limiting based on the public dYdX specification.
34//! - Zero-copy deserialization of large JSON payloads into domain models.
35//! - Conversion of raw exchange errors into the rich [`DydxHttpError`] enum.
36//! - Instrument caching with standard methods: `cache_instruments()`, `cache_instrument()`, `get_instrument()`.
37//!
38//! # Important Note
39//!
40//! The dYdX v4 Indexer REST API does **NOT** require authentication or request signing.
41//! All endpoints are publicly accessible using only wallet addresses and subaccount numbers
42//! as query parameters. Order submission and trading operations use gRPC with blockchain
43//! transaction signing, not REST API.
44//!
45//! # Official Documentation
46//!
47//! | Endpoint                             | Reference                                              |
48//! |--------------------------------------|--------------------------------------------------------|
49//! | Market data                          | <https://docs.dydx.exchange/api_integration-indexer/indexer_api#markets> |
50//! | Account data                         | <https://docs.dydx.exchange/api_integration-indexer/indexer_api#accounts> |
51//! | Utility endpoints                    | <https://docs.dydx.exchange/api_integration-indexer/indexer_api#utility> |
52
53use std::{
54    collections::HashMap,
55    fmt::Debug,
56    num::NonZeroU32,
57    sync::{Arc, LazyLock},
58};
59
60use chrono::{DateTime, Utc};
61use nautilus_core::{consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
62use nautilus_model::{
63    data::{Bar, BarType, BookOrder, OrderBookDelta, OrderBookDeltas, TradeTick},
64    enums::{
65        AggregationSource, BarAggregation, BookAction, OrderSide as NautilusOrderSide, PriceType,
66        RecordFlag,
67    },
68    identifiers::{AccountId, InstrumentId},
69    instruments::{Instrument, InstrumentAny},
70    reports::{FillReport, OrderStatusReport, PositionStatusReport},
71    types::{Price, Quantity},
72};
73use nautilus_network::{
74    http::{HttpClient, Method, USER_AGENT},
75    ratelimiter::quota::Quota,
76    retry::{RetryConfig, RetryManager},
77};
78use rust_decimal::Decimal;
79use serde::{Deserialize, Serialize, de::DeserializeOwned};
80use tokio_util::sync::CancellationToken;
81
82use super::error::DydxHttpError;
83use crate::{
84    common::{
85        consts::{DYDX_HTTP_URL, DYDX_TESTNET_HTTP_URL},
86        enums::DydxCandleResolution,
87        instrument_cache::InstrumentCache,
88        parse::extract_raw_symbol,
89    },
90    http::parse::parse_instrument_any,
91};
92
93/// Maximum number of candles returned per dYdX API request.
94const DYDX_MAX_BARS_PER_REQUEST: u32 = 1_000;
95
96fn bar_type_to_resolution(bar_type: &BarType) -> anyhow::Result<DydxCandleResolution> {
97    if bar_type.aggregation_source() != AggregationSource::External {
98        anyhow::bail!(
99            "dYdX only supports EXTERNAL aggregation, was {:?}",
100            bar_type.aggregation_source()
101        );
102    }
103
104    let spec = bar_type.spec();
105    if spec.price_type != PriceType::Last {
106        anyhow::bail!(
107            "dYdX only supports LAST price type, was {:?}",
108            spec.price_type
109        );
110    }
111
112    DydxCandleResolution::from_bar_spec(&spec)
113}
114
115/// Default dYdX Indexer REST API rate limit.
116///
117/// The dYdX Indexer API rate limits are generous for read-only operations:
118/// - General: 100 requests per 10 seconds per IP
119/// - We use a conservative 10 requests per second as the default quota.
120pub static DYDX_REST_QUOTA: LazyLock<Quota> =
121    LazyLock::new(|| Quota::per_second(NonZeroU32::new(10).unwrap()));
122
123/// Represents a dYdX HTTP response wrapper.
124///
125/// Most dYdX Indexer API endpoints return data directly without a wrapper,
126/// but some endpoints may use this structure for consistency.
127#[derive(Debug, Serialize, Deserialize)]
128pub struct DydxResponse<T> {
129    /// The typed data returned by the dYdX endpoint.
130    pub data: T,
131}
132
133/// Provides a raw HTTP client for interacting with the [dYdX v4](https://dydx.exchange) Indexer REST API.
134///
135/// This client wraps the underlying [`HttpClient`] to handle functionality
136/// specific to dYdX Indexer API, such as rate-limiting, forming request URLs,
137/// and deserializing responses into dYdX specific data models.
138///
139/// **Note**: Unlike traditional centralized exchanges, the dYdX v4 Indexer REST API
140/// does NOT require authentication, API keys, or request signing. All endpoints are
141/// publicly accessible.
142pub struct DydxRawHttpClient {
143    base_url: String,
144    client: HttpClient,
145    retry_manager: RetryManager<DydxHttpError>,
146    cancellation_token: CancellationToken,
147    is_testnet: bool,
148}
149
150impl Default for DydxRawHttpClient {
151    fn default() -> Self {
152        Self::new(None, Some(60), None, false, None)
153            .expect("Failed to create default DydxRawHttpClient")
154    }
155}
156
157impl Debug for DydxRawHttpClient {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        f.debug_struct(stringify!(DydxRawHttpClient))
160            .field("base_url", &self.base_url)
161            .field("is_testnet", &self.is_testnet)
162            .finish_non_exhaustive()
163    }
164}
165
166impl DydxRawHttpClient {
167    /// Cancel all pending HTTP requests.
168    pub fn cancel_all_requests(&self) {
169        self.cancellation_token.cancel();
170    }
171
172    /// Get the cancellation token for this client.
173    pub fn cancellation_token(&self) -> &CancellationToken {
174        &self.cancellation_token
175    }
176
177    /// Creates a new [`DydxRawHttpClient`] using the default dYdX Indexer HTTP URL,
178    /// optionally overridden with a custom base URL.
179    ///
180    /// **Note**: No credentials are required as the dYdX Indexer API is publicly accessible.
181    ///
182    /// # Errors
183    ///
184    /// Returns an error if the retry manager cannot be created.
185    pub fn new(
186        base_url: Option<String>,
187        timeout_secs: Option<u64>,
188        proxy_url: Option<String>,
189        is_testnet: bool,
190        retry_config: Option<RetryConfig>,
191    ) -> anyhow::Result<Self> {
192        let base_url = if is_testnet {
193            base_url.unwrap_or_else(|| DYDX_TESTNET_HTTP_URL.to_string())
194        } else {
195            base_url.unwrap_or_else(|| DYDX_HTTP_URL.to_string())
196        };
197
198        let retry_manager = RetryManager::new(retry_config.unwrap_or_default());
199
200        let mut headers = HashMap::new();
201        headers.insert(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
202
203        let client = HttpClient::new(
204            headers,
205            vec![], // No specific headers to extract from responses
206            vec![], // No keyed quotas (we use a single global quota)
207            Some(*DYDX_REST_QUOTA),
208            timeout_secs,
209            proxy_url,
210        )
211        .map_err(|e| {
212            DydxHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
213        })?;
214
215        Ok(Self {
216            base_url,
217            client,
218            retry_manager,
219            cancellation_token: CancellationToken::new(),
220            is_testnet,
221        })
222    }
223
224    /// Check if this client is configured for testnet.
225    #[must_use]
226    pub const fn is_testnet(&self) -> bool {
227        self.is_testnet
228    }
229
230    /// Get the base URL being used by this client.
231    #[must_use]
232    pub fn base_url(&self) -> &str {
233        &self.base_url
234    }
235
236    /// Send a request to a dYdX Indexer API endpoint.
237    ///
238    /// **Note**: dYdX Indexer API does not require authentication headers.
239    ///
240    /// # Errors
241    ///
242    /// Returns an error if:
243    /// - The HTTP request fails.
244    /// - The response has a non-success HTTP status code.
245    /// - The response body cannot be deserialized to type `T`.
246    /// - The request is canceled.
247    pub async fn send_request<T>(
248        &self,
249        method: Method,
250        endpoint: &str,
251        query_params: Option<&str>,
252    ) -> Result<T, DydxHttpError>
253    where
254        T: DeserializeOwned,
255    {
256        let url = if let Some(params) = query_params {
257            format!("{}{endpoint}?{params}", self.base_url)
258        } else {
259            format!("{}{endpoint}", self.base_url)
260        };
261
262        let operation = || async {
263            let request = self
264                .client
265                .request_with_ustr_keys(
266                    method.clone(),
267                    url.clone(),
268                    None, // No params
269                    None, // No additional headers
270                    None, // No body for GET requests
271                    None, // Use default timeout
272                    None, // No specific rate limit keys (using global quota)
273                )
274                .await
275                .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
276
277            if !request.status.is_success() {
278                return Err(DydxHttpError::HttpStatus {
279                    status: request.status.as_u16(),
280                    message: String::from_utf8_lossy(&request.body).to_string(),
281                });
282            }
283
284            Ok(request)
285        };
286
287        // Retry strategy for dYdX Indexer API:
288        // 1. Network errors: always retry (transient connection issues)
289        // 2. HTTP 429/5xx: rate limiting and server errors should be retried
290        // 3. Client errors (4xx except 429): should NOT be retried
291        let should_retry = |error: &DydxHttpError| -> bool {
292            match error {
293                DydxHttpError::HttpClientError(_) => true,
294                DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
295                _ => false,
296            }
297        };
298
299        let create_error = |msg: String| -> DydxHttpError {
300            if msg == "canceled" {
301                DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
302            } else if msg.contains("Timed out") {
303                // Timeouts are transient — map to HttpClientError so they are retried
304                DydxHttpError::HttpClientError(msg)
305            } else {
306                DydxHttpError::ValidationError(msg)
307            }
308        };
309
310        let response = self
311            .retry_manager
312            .execute_with_retry_with_cancel(
313                endpoint,
314                operation,
315                should_retry,
316                create_error,
317                &self.cancellation_token,
318            )
319            .await?;
320
321        serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
322            error: e.to_string(),
323            body: String::from_utf8_lossy(&response.body).to_string(),
324        })
325    }
326
327    /// Send a POST request to a dYdX Indexer API endpoint.
328    ///
329    /// Note: Most dYdX Indexer endpoints are GET-based. POST is rarely used.
330    ///
331    /// # Errors
332    ///
333    /// Returns an error if:
334    /// - The request body cannot be serialized to JSON.
335    /// - The HTTP request fails.
336    /// - The response has a non-success HTTP status code.
337    /// - The response body cannot be deserialized to type `T`.
338    /// - The request is canceled.
339    pub async fn send_post_request<T, B>(
340        &self,
341        endpoint: &str,
342        body: &B,
343    ) -> Result<T, DydxHttpError>
344    where
345        T: DeserializeOwned,
346        B: Serialize,
347    {
348        let url = format!("{}{endpoint}", self.base_url);
349
350        let body_bytes = serde_json::to_vec(body).map_err(|e| DydxHttpError::Serialization {
351            error: e.to_string(),
352        })?;
353
354        let operation = || async {
355            let request = self
356                .client
357                .request_with_ustr_keys(
358                    Method::POST,
359                    url.clone(),
360                    None, // No params
361                    None, // No additional headers (content-type handled by body)
362                    Some(body_bytes.clone()),
363                    None, // Use default timeout
364                    None, // No specific rate limit keys (using global quota)
365                )
366                .await
367                .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
368
369            if !request.status.is_success() {
370                return Err(DydxHttpError::HttpStatus {
371                    status: request.status.as_u16(),
372                    message: String::from_utf8_lossy(&request.body).to_string(),
373                });
374            }
375
376            Ok(request)
377        };
378
379        // Retry strategy (same as GET requests)
380        let should_retry = |error: &DydxHttpError| -> bool {
381            match error {
382                DydxHttpError::HttpClientError(_) => true,
383                DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
384                _ => false,
385            }
386        };
387
388        let create_error = |msg: String| -> DydxHttpError {
389            if msg == "canceled" {
390                DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
391            } else if msg.contains("Timed out") {
392                // Timeouts are transient — map to HttpClientError so they are retried
393                DydxHttpError::HttpClientError(msg)
394            } else {
395                DydxHttpError::ValidationError(msg)
396            }
397        };
398
399        let response = self
400            .retry_manager
401            .execute_with_retry_with_cancel(
402                endpoint,
403                operation,
404                should_retry,
405                create_error,
406                &self.cancellation_token,
407            )
408            .await?;
409
410        serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
411            error: e.to_string(),
412            body: String::from_utf8_lossy(&response.body).to_string(),
413        })
414    }
415
416    /// Fetch all perpetual markets from dYdX.
417    ///
418    /// # Errors
419    ///
420    /// Returns an error if the HTTP request fails or response parsing fails.
421    pub async fn get_markets(&self) -> Result<super::models::MarketsResponse, DydxHttpError> {
422        self.send_request(Method::GET, "/v4/perpetualMarkets", None)
423            .await
424    }
425
426    /// Fetch a single perpetual market by ticker.
427    ///
428    /// Uses the `market` query parameter for efficient single-market fetch.
429    ///
430    /// # Errors
431    ///
432    /// Returns an error if the HTTP request fails or response parsing fails.
433    pub async fn get_market(
434        &self,
435        ticker: &str,
436    ) -> Result<super::models::MarketsResponse, DydxHttpError> {
437        let query = format!("ticker={ticker}");
438        self.send_request(Method::GET, "/v4/perpetualMarkets", Some(&query))
439            .await
440    }
441
442    /// Fetch all instruments and parse them into Nautilus `InstrumentAny` types.
443    ///
444    /// This method fetches all perpetual markets from dYdX and converts them
445    /// into Nautilus instrument definitions using the `parse_instrument_any` function.
446    ///
447    /// # Errors
448    ///
449    /// Returns an error if:
450    /// - The HTTP request fails.
451    /// - The response cannot be parsed.
452    /// - Any instrument parsing fails.
453    ///
454    pub async fn fetch_instruments(
455        &self,
456        maker_fee: Option<Decimal>,
457        taker_fee: Option<Decimal>,
458    ) -> Result<Vec<InstrumentAny>, DydxHttpError> {
459        let markets_response = self.get_markets().await?;
460        let ts_init = get_atomic_clock_realtime().get_time_ns();
461
462        let mut instruments = Vec::new();
463        let mut skipped_inactive = 0;
464
465        for (ticker, market) in markets_response.markets {
466            if !super::parse::is_market_active(&market.status) {
467                log::debug!(
468                    "Skipping inactive market {ticker} (status: {:?})",
469                    market.status
470                );
471                skipped_inactive += 1;
472                continue;
473            }
474
475            match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
476                Ok(instrument) => {
477                    instruments.push(instrument);
478                }
479                Err(e) => {
480                    log::error!("Failed to parse instrument {ticker}: {e}");
481                }
482            }
483        }
484
485        if skipped_inactive > 0 {
486            log::info!(
487                "Parsed {} instruments, skipped {} inactive",
488                instruments.len(),
489                skipped_inactive
490            );
491        } else {
492            log::info!("Parsed {} instruments", instruments.len());
493        }
494
495        Ok(instruments)
496    }
497
498    /// Fetch orderbook for a specific market.
499    ///
500    /// # Errors
501    ///
502    /// Returns an error if the HTTP request fails or response parsing fails.
503    pub async fn get_orderbook(
504        &self,
505        ticker: &str,
506    ) -> Result<super::models::OrderbookResponse, DydxHttpError> {
507        let endpoint = format!("/v4/orderbooks/perpetualMarket/{ticker}");
508        self.send_request(Method::GET, &endpoint, None).await
509    }
510
511    /// Fetch recent trades for a market.
512    ///
513    /// # Errors
514    ///
515    /// Returns an error if the HTTP request fails or response parsing fails.
516    pub async fn get_trades(
517        &self,
518        ticker: &str,
519        limit: Option<u32>,
520        starting_before_or_at_height: Option<u64>,
521    ) -> Result<super::models::TradesResponse, DydxHttpError> {
522        let endpoint = format!("/v4/trades/perpetualMarket/{ticker}");
523        let mut query_parts = Vec::new();
524        if let Some(l) = limit {
525            query_parts.push(format!("limit={l}"));
526        }
527        if let Some(height) = starting_before_or_at_height {
528            query_parts.push(format!("createdBeforeOrAtHeight={height}"));
529        }
530        let query = if query_parts.is_empty() {
531            None
532        } else {
533            Some(query_parts.join("&"))
534        };
535        self.send_request(Method::GET, &endpoint, query.as_deref())
536            .await
537    }
538
539    /// Fetch candles/klines for a market.
540    ///
541    /// # Errors
542    ///
543    /// Returns an error if the HTTP request fails or response parsing fails.
544    pub async fn get_candles(
545        &self,
546        ticker: &str,
547        resolution: DydxCandleResolution,
548        limit: Option<u32>,
549        from_iso: Option<DateTime<Utc>>,
550        to_iso: Option<DateTime<Utc>>,
551    ) -> Result<super::models::CandlesResponse, DydxHttpError> {
552        let endpoint = format!("/v4/candles/perpetualMarkets/{ticker}");
553        let mut query_parts = vec![format!("resolution={resolution}")];
554        if let Some(l) = limit {
555            query_parts.push(format!("limit={l}"));
556        }
557        if let Some(from) = from_iso {
558            let from_str = from.to_rfc3339();
559            query_parts.push(format!("fromISO={}", urlencoding::encode(&from_str)));
560        }
561        if let Some(to) = to_iso {
562            let to_str = to.to_rfc3339();
563            query_parts.push(format!("toISO={}", urlencoding::encode(&to_str)));
564        }
565        let query = query_parts.join("&");
566        self.send_request(Method::GET, &endpoint, Some(&query))
567            .await
568    }
569
570    /// Fetch subaccount information.
571    ///
572    /// # Errors
573    ///
574    /// Returns an error if the HTTP request fails or response parsing fails.
575    pub async fn get_subaccount(
576        &self,
577        address: &str,
578        subaccount_number: u32,
579    ) -> Result<super::models::SubaccountResponse, DydxHttpError> {
580        let endpoint = format!("/v4/addresses/{address}/subaccountNumber/{subaccount_number}");
581        self.send_request(Method::GET, &endpoint, None).await
582    }
583
584    /// Fetch fills for a subaccount.
585    ///
586    /// # Errors
587    ///
588    /// Returns an error if the HTTP request fails or response parsing fails.
589    pub async fn get_fills(
590        &self,
591        address: &str,
592        subaccount_number: u32,
593        market: Option<&str>,
594        limit: Option<u32>,
595    ) -> Result<super::models::FillsResponse, DydxHttpError> {
596        let endpoint = "/v4/fills";
597        let mut query_parts = vec![
598            format!("address={address}"),
599            format!("subaccountNumber={subaccount_number}"),
600        ];
601        if let Some(m) = market {
602            query_parts.push(format!("market={m}"));
603        }
604        if let Some(l) = limit {
605            query_parts.push(format!("limit={l}"));
606        }
607        let query = query_parts.join("&");
608        self.send_request(Method::GET, endpoint, Some(&query)).await
609    }
610
611    /// Fetch orders for a subaccount.
612    ///
613    /// # Errors
614    ///
615    /// Returns an error if the HTTP request fails or response parsing fails.
616    pub async fn get_orders(
617        &self,
618        address: &str,
619        subaccount_number: u32,
620        market: Option<&str>,
621        limit: Option<u32>,
622    ) -> Result<super::models::OrdersResponse, DydxHttpError> {
623        let endpoint = "/v4/orders";
624        let mut query_parts = vec![
625            format!("address={address}"),
626            format!("subaccountNumber={subaccount_number}"),
627        ];
628        if let Some(m) = market {
629            query_parts.push(format!("market={m}"));
630        }
631        if let Some(l) = limit {
632            query_parts.push(format!("limit={l}"));
633        }
634        let query = query_parts.join("&");
635        self.send_request(Method::GET, endpoint, Some(&query)).await
636    }
637
638    /// Fetch transfers for a subaccount.
639    ///
640    /// # Errors
641    ///
642    /// Returns an error if the HTTP request fails or response parsing fails.
643    pub async fn get_transfers(
644        &self,
645        address: &str,
646        subaccount_number: u32,
647        limit: Option<u32>,
648    ) -> Result<super::models::TransfersResponse, DydxHttpError> {
649        let endpoint = "/v4/transfers";
650        let mut query_parts = vec![
651            format!("address={address}"),
652            format!("subaccountNumber={subaccount_number}"),
653        ];
654        if let Some(l) = limit {
655            query_parts.push(format!("limit={l}"));
656        }
657        let query = query_parts.join("&");
658        self.send_request(Method::GET, endpoint, Some(&query)).await
659    }
660
661    /// Get current server time.
662    ///
663    /// # Errors
664    ///
665    /// Returns an error if the HTTP request fails or response parsing fails.
666    pub async fn get_time(&self) -> Result<super::models::TimeResponse, DydxHttpError> {
667        self.send_request(Method::GET, "/v4/time", None).await
668    }
669
670    /// Get current blockchain height.
671    ///
672    /// # Errors
673    ///
674    /// Returns an error if the HTTP request fails or response parsing fails.
675    pub async fn get_height(&self) -> Result<super::models::HeightResponse, DydxHttpError> {
676        self.send_request(Method::GET, "/v4/height", None).await
677    }
678}
679
680/// Provides a higher-level HTTP client for the [dYdX v4](https://dydx.exchange) Indexer REST API.
681///
682/// This client wraps the underlying `DydxRawHttpClient` to handle conversions
683/// into the Nautilus domain model, following the two-layer pattern established
684/// in OKX, Bybit, and BitMEX adapters.
685///
686/// **Architecture:**
687/// - **Raw client** (`DydxRawHttpClient`): Low-level HTTP methods matching dYdX Indexer API endpoints.
688/// - **Domain client** (`DydxHttpClient`): High-level methods using Nautilus domain types.
689///
690/// The domain client:
691/// - Wraps the raw client in an `Arc` for efficient cloning (required for Python bindings).
692/// - Maintains an instrument cache using `DashMap` for thread-safe concurrent access.
693/// - Provides standard cache methods: `cache_instruments()`, `cache_instrument()`, `get_instrument()`.
694/// - Tracks cache initialization state for optimizations.
695#[derive(Debug)]
696#[cfg_attr(
697    feature = "python",
698    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx")
699)]
700pub struct DydxHttpClient {
701    /// Raw HTTP client wrapped in Arc for efficient cloning.
702    pub(crate) inner: Arc<DydxRawHttpClient>,
703    /// Shared instrument cache with multiple lookup indices.
704    ///
705    /// This cache is shared across HTTP client, WebSocket client, and execution client.
706    /// It provides O(1) lookups by symbol, market ticker, or clob_pair_id.
707    pub(crate) instrument_cache: Arc<InstrumentCache>,
708}
709
710impl Clone for DydxHttpClient {
711    fn clone(&self) -> Self {
712        Self {
713            inner: self.inner.clone(),
714            instrument_cache: Arc::clone(&self.instrument_cache),
715        }
716    }
717}
718
719impl Default for DydxHttpClient {
720    fn default() -> Self {
721        Self::new(None, Some(60), None, false, None)
722            .expect("Failed to create default DydxHttpClient")
723    }
724}
725
726impl DydxHttpClient {
727    /// Creates a new [`DydxHttpClient`] using the default dYdX Indexer HTTP URL,
728    /// optionally overridden with a custom base URL.
729    ///
730    /// This constructor creates its own internal instrument cache. For shared caching
731    /// across multiple clients, use [`new_with_cache`](Self::new_with_cache) instead.
732    ///
733    /// **Note**: No credentials are required as the dYdX Indexer API is publicly accessible.
734    /// Order submission and trading operations use gRPC with blockchain transaction signing.
735    ///
736    /// # Errors
737    ///
738    /// Returns an error if the underlying HTTP client or retry manager cannot be created.
739    pub fn new(
740        base_url: Option<String>,
741        timeout_secs: Option<u64>,
742        proxy_url: Option<String>,
743        is_testnet: bool,
744        retry_config: Option<RetryConfig>,
745    ) -> anyhow::Result<Self> {
746        Self::new_with_cache(
747            base_url,
748            timeout_secs,
749            proxy_url,
750            is_testnet,
751            retry_config,
752            Arc::new(InstrumentCache::new()),
753        )
754    }
755
756    /// Creates a new [`DydxHttpClient`] with a shared instrument cache.
757    ///
758    /// Use this constructor when sharing instrument data between HTTP client,
759    /// WebSocket client, and execution client.
760    ///
761    /// # Arguments
762    ///
763    /// * `instrument_cache` - Shared instrument cache for lookups by symbol, ticker, or clob_pair_id
764    ///
765    /// # Errors
766    ///
767    /// Returns an error if the underlying HTTP client or retry manager cannot be created.
768    pub fn new_with_cache(
769        base_url: Option<String>,
770        timeout_secs: Option<u64>,
771        proxy_url: Option<String>,
772        is_testnet: bool,
773        retry_config: Option<RetryConfig>,
774        instrument_cache: Arc<InstrumentCache>,
775    ) -> anyhow::Result<Self> {
776        Ok(Self {
777            inner: Arc::new(DydxRawHttpClient::new(
778                base_url,
779                timeout_secs,
780                proxy_url,
781                is_testnet,
782                retry_config,
783            )?),
784            instrument_cache,
785        })
786    }
787
788    /// Requests instruments from the dYdX Indexer API and returns Nautilus domain types.
789    ///
790    /// This method does NOT automatically cache results. Use `fetch_and_cache_instruments()`
791    /// for automatic caching, or call `cache_instruments()` manually with the results.
792    ///
793    /// # Errors
794    ///
795    /// Returns an error if the HTTP request or parsing fails.
796    /// Individual instrument parsing errors are logged as warnings.
797    pub async fn request_instruments(
798        &self,
799        symbol: Option<String>,
800        maker_fee: Option<Decimal>,
801        taker_fee: Option<Decimal>,
802    ) -> anyhow::Result<Vec<InstrumentAny>> {
803        let markets_response = self.inner.get_markets().await?;
804        let ts_init = get_atomic_clock_realtime().get_time_ns();
805
806        let mut instruments = Vec::new();
807        let mut skipped_inactive = 0;
808
809        for (ticker, market) in markets_response.markets {
810            // Filter by symbol if specified
811            if let Some(ref sym) = symbol
812                && ticker != *sym
813            {
814                continue;
815            }
816
817            if !super::parse::is_market_active(&market.status) {
818                log::debug!(
819                    "Skipping inactive market {ticker} (status: {:?})",
820                    market.status
821                );
822                skipped_inactive += 1;
823                continue;
824            }
825
826            match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
827                Ok(instrument) => {
828                    instruments.push(instrument);
829                }
830                Err(e) => {
831                    log::error!("Failed to parse instrument {ticker}: {e}");
832                }
833            }
834        }
835
836        if skipped_inactive > 0 {
837            log::info!(
838                "Parsed {} instruments, skipped {} inactive",
839                instruments.len(),
840                skipped_inactive
841            );
842        } else {
843            log::debug!("Parsed {} instruments", instruments.len());
844        }
845
846        Ok(instruments)
847    }
848
849    /// Fetches instruments from the API and caches them.
850    ///
851    /// This is a convenience method that fetches instruments and populates both
852    /// the symbol-based and CLOB pair ID-based caches.
853    ///
854    /// On success, existing caches are cleared and repopulated atomically.
855    /// On failure, existing caches are preserved (no partial updates).
856    ///
857    /// # Errors
858    ///
859    /// Returns an error if the HTTP request fails.
860    pub async fn fetch_and_cache_instruments(&self) -> anyhow::Result<()> {
861        // Fetch first - preserve existing cache on network failure
862        let markets_response = self.inner.get_markets().await?;
863        let ts_init = get_atomic_clock_realtime().get_time_ns();
864
865        let mut parsed_instruments = Vec::new();
866        let mut parsed_markets = Vec::new();
867        let mut skipped_inactive = 0;
868
869        for (ticker, market) in markets_response.markets {
870            if !super::parse::is_market_active(&market.status) {
871                log::debug!(
872                    "Skipping inactive market {ticker} (status: {:?})",
873                    market.status
874                );
875                skipped_inactive += 1;
876                continue;
877            }
878
879            match super::parse::parse_instrument_any(&market, None, None, ts_init) {
880                Ok(instrument) => {
881                    parsed_instruments.push(instrument);
882                    parsed_markets.push(market);
883                }
884                Err(e) => {
885                    log::error!("Failed to parse instrument {ticker}: {e}");
886                }
887            }
888        }
889
890        // Only clear and repopulate cache after successful fetch and parse
891        self.instrument_cache.clear();
892
893        // Zip instruments with their market data for bulk insert
894        let items: Vec<_> = parsed_instruments.into_iter().zip(parsed_markets).collect();
895
896        if !items.is_empty() {
897            self.instrument_cache.insert_many(items.clone());
898        }
899
900        let count = items.len();
901
902        if skipped_inactive > 0 {
903            log::info!("Cached {count} instruments, skipped {skipped_inactive} inactive");
904        } else {
905            log::info!("Cached {count} instruments");
906        }
907
908        Ok(())
909    }
910
911    /// Fetches a single instrument by ticker and caches it.
912    ///
913    /// # Errors
914    ///
915    /// Returns an error if the HTTP request fails.
916    pub async fn fetch_and_cache_single_instrument(
917        &self,
918        ticker: &str,
919    ) -> anyhow::Result<Option<InstrumentAny>> {
920        let markets_response = self.inner.get_market(ticker).await?;
921        let ts_init = get_atomic_clock_realtime().get_time_ns();
922
923        // The API returns all markets if ticker not found, so check specifically
924        if let Some(market) = markets_response.markets.get(ticker) {
925            if !super::parse::is_market_active(&market.status) {
926                log::debug!(
927                    "Skipping inactive market {ticker} (status: {:?})",
928                    market.status
929                );
930                return Ok(None);
931            }
932
933            let instrument = parse_instrument_any(market, None, None, ts_init)?;
934            self.instrument_cache
935                .insert(instrument.clone(), market.clone());
936
937            log::info!("Fetched and cached new instrument: {ticker}");
938            return Ok(Some(instrument));
939        }
940
941        Ok(None)
942    }
943
944    /// Caches multiple instruments (symbol lookup only).
945    ///
946    /// Use `fetch_and_cache_instruments()` for full caching with market params.
947    /// Any existing instruments with the same symbols will be replaced.
948    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
949        self.instrument_cache.insert_instruments_only(instruments);
950    }
951
952    /// Caches a single instrument (symbol lookup only).
953    ///
954    /// Use `fetch_and_cache_instruments()` for full caching with market params.
955    /// Any existing instrument with the same symbol will be replaced.
956    pub fn cache_instrument(&self, instrument: InstrumentAny) {
957        self.instrument_cache.insert_instrument_only(instrument);
958    }
959
960    /// Gets an instrument from the cache by InstrumentId.
961    #[must_use]
962    pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
963        self.instrument_cache.get(instrument_id)
964    }
965
966    /// Gets an instrument by CLOB pair ID.
967    ///
968    /// Only works for instruments cached via `fetch_and_cache_instruments()`.
969    #[must_use]
970    pub fn get_instrument_by_clob_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
971        self.instrument_cache.get_by_clob_id(clob_pair_id)
972    }
973
974    /// Gets an instrument by market ticker (e.g., "BTC-USD").
975    ///
976    /// Only works for instruments cached via `fetch_and_cache_instruments()`.
977    #[must_use]
978    pub fn get_instrument_by_market(&self, ticker: &str) -> Option<InstrumentAny> {
979        self.instrument_cache.get_by_market(ticker)
980    }
981
982    /// Gets market parameters for order submission from the cached market data.
983    ///
984    /// Returns the quantization parameters needed by OrderBuilder to construct
985    /// properly formatted orders for the dYdX v4 protocol.
986    ///
987    /// # Errors
988    ///
989    /// Returns None if the instrument is not found in the market params cache.
990    #[must_use]
991    pub fn get_market_params(
992        &self,
993        instrument_id: &InstrumentId,
994    ) -> Option<super::models::PerpetualMarket> {
995        self.instrument_cache.get_market_params(instrument_id)
996    }
997
998    /// Requests historical trades for a symbol.
999    ///
1000    /// Fetches trade data from the dYdX Indexer API's `/v4/trades/perpetualMarket/:ticker` endpoint.
1001    /// Results are ordered by creation time descending (newest first).
1002    ///
1003    /// # Errors
1004    ///
1005    /// Returns an error if the HTTP request fails or response cannot be parsed.
1006    pub async fn request_trades(
1007        &self,
1008        symbol: &str,
1009        limit: Option<u32>,
1010        starting_before_or_at_height: Option<u64>,
1011    ) -> anyhow::Result<super::models::TradesResponse> {
1012        self.inner
1013            .get_trades(symbol, limit, starting_before_or_at_height)
1014            .await
1015            .map_err(Into::into)
1016    }
1017
1018    /// Requests historical candles for a symbol.
1019    ///
1020    /// Fetches candle data from the dYdX Indexer API's `/v4/candles/perpetualMarkets/:ticker` endpoint.
1021    /// Results are ordered by start time ascending (oldest first).
1022    ///
1023    /// # Errors
1024    ///
1025    /// Returns an error if the HTTP request fails or response cannot be parsed.
1026    pub async fn request_candles(
1027        &self,
1028        symbol: &str,
1029        resolution: DydxCandleResolution,
1030        limit: Option<u32>,
1031        from_iso: Option<DateTime<Utc>>,
1032        to_iso: Option<DateTime<Utc>>,
1033    ) -> anyhow::Result<super::models::CandlesResponse> {
1034        self.inner
1035            .get_candles(symbol, resolution, limit, from_iso, to_iso)
1036            .await
1037            .map_err(Into::into)
1038    }
1039
1040    /// Requests historical bars for an instrument with optional pagination.
1041    ///
1042    /// Fetches candle data from the dYdX Indexer API and converts to Nautilus
1043    /// `Bar` objects. Supports time-chunked pagination for large date ranges.
1044    ///
1045    /// The resolution is derived internally from `bar_type` (no need to pass
1046    /// `DydxCandleResolution`). Incomplete bars (where `ts_event >= now`) are
1047    /// filtered out.
1048    ///
1049    /// Results are returned in chronological order (oldest first).
1050    ///
1051    /// # Errors
1052    ///
1053    /// Returns an error if:
1054    /// - The bar type uses unsupported aggregation/price type.
1055    /// - The HTTP request fails or response cannot be parsed.
1056    /// - The instrument is not found in the cache.
1057    pub async fn request_bars(
1058        &self,
1059        bar_type: BarType,
1060        start: Option<DateTime<Utc>>,
1061        end: Option<DateTime<Utc>>,
1062        limit: Option<u32>,
1063        timestamp_on_close: bool,
1064    ) -> anyhow::Result<Vec<Bar>> {
1065        let resolution = bar_type_to_resolution(&bar_type)?;
1066        let instrument_id = bar_type.instrument_id();
1067
1068        let instrument = self
1069            .get_instrument(&instrument_id)
1070            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1071
1072        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1073        let price_precision = instrument.price_precision();
1074        let size_precision = instrument.size_precision();
1075        let ts_init = get_atomic_clock_realtime().get_time_ns();
1076
1077        let mut all_bars: Vec<Bar> = Vec::new();
1078
1079        // Determine bar duration in seconds for pagination chunking
1080        let spec = bar_type.spec();
1081        let bar_secs: i64 = match spec.aggregation {
1082            BarAggregation::Minute => spec.step.get() as i64 * 60,
1083            BarAggregation::Hour => spec.step.get() as i64 * 3_600,
1084            BarAggregation::Day => spec.step.get() as i64 * 86_400,
1085            _ => anyhow::bail!("Unsupported aggregation: {:?}", spec.aggregation),
1086        };
1087
1088        match (start, end) {
1089            // Time-chunked pagination for date ranges
1090            (Some(range_start), Some(range_end)) if range_end > range_start => {
1091                let overall_limit = limit.unwrap_or(u32::MAX);
1092                let mut remaining = overall_limit;
1093                let bars_per_call = DYDX_MAX_BARS_PER_REQUEST.min(remaining);
1094                let chunk_duration = chrono::Duration::seconds(bar_secs * bars_per_call as i64);
1095                let mut chunk_start = range_start;
1096
1097                while chunk_start < range_end && remaining > 0 {
1098                    let chunk_end = (chunk_start + chunk_duration).min(range_end);
1099                    let per_call_limit = remaining.min(DYDX_MAX_BARS_PER_REQUEST);
1100
1101                    let response = self
1102                        .inner
1103                        .get_candles(
1104                            ticker,
1105                            resolution,
1106                            Some(per_call_limit),
1107                            Some(chunk_start),
1108                            Some(chunk_end),
1109                        )
1110                        .await?;
1111
1112                    let count = response.candles.len() as u32;
1113                    if count == 0 {
1114                        break;
1115                    }
1116
1117                    for candle in &response.candles {
1118                        match super::parse::parse_bar(
1119                            candle,
1120                            bar_type,
1121                            price_precision,
1122                            size_precision,
1123                            timestamp_on_close,
1124                            ts_init,
1125                        ) {
1126                            Ok(bar) => all_bars.push(bar),
1127                            Err(e) => log::warn!("Failed to parse candle for {instrument_id}: {e}"),
1128                        }
1129                    }
1130
1131                    if remaining <= count {
1132                        break;
1133                    }
1134                    remaining -= count;
1135                    chunk_start += chunk_duration;
1136                }
1137            }
1138            // Single request (no date range or invalid range)
1139            _ => {
1140                let req_limit = limit.unwrap_or(DYDX_MAX_BARS_PER_REQUEST);
1141                let response = self
1142                    .inner
1143                    .get_candles(ticker, resolution, Some(req_limit), None, None)
1144                    .await?;
1145
1146                for candle in &response.candles {
1147                    match super::parse::parse_bar(
1148                        candle,
1149                        bar_type,
1150                        price_precision,
1151                        size_precision,
1152                        timestamp_on_close,
1153                        ts_init,
1154                    ) {
1155                        Ok(bar) => all_bars.push(bar),
1156                        Err(e) => log::warn!("Failed to parse candle for {instrument_id}: {e}"),
1157                    }
1158                }
1159            }
1160        }
1161
1162        // Filter incomplete bars (ts_event >= current time)
1163        let current_time_ns = get_atomic_clock_realtime().get_time_ns();
1164        all_bars.retain(|bar| bar.ts_event < current_time_ns);
1165
1166        Ok(all_bars)
1167    }
1168
1169    /// Requests historical trade ticks for an instrument with optional pagination.
1170    ///
1171    /// Fetches trade data from the dYdX Indexer API and converts them to Nautilus
1172    /// `TradeTick` objects. Supports cursor-based pagination using block height
1173    /// and client-side time filtering (the dYdX API has no timestamp filter).
1174    ///
1175    /// Results are returned in chronological order (oldest first).
1176    ///
1177    /// # Errors
1178    ///
1179    /// Returns an error if the HTTP request fails, response cannot be parsed,
1180    /// or the instrument is not found in the cache.
1181    ///
1182    /// # Panics
1183    ///
1184    /// This function will panic if the API returns a non-empty trades response
1185    /// but `last()` on the trades vector returns `None` (should never happen).
1186    pub async fn request_trade_ticks(
1187        &self,
1188        instrument_id: InstrumentId,
1189        start: Option<DateTime<Utc>>,
1190        end: Option<DateTime<Utc>>,
1191        limit: Option<u32>,
1192    ) -> anyhow::Result<Vec<TradeTick>> {
1193        const DYDX_MAX_TRADES_PER_REQUEST: u32 = 1_000;
1194        const DYDX_BLOCK_TIME_SECS: f64 = 1.1;
1195
1196        // Validation
1197        if let (Some(s), Some(e)) = (start, end) {
1198            anyhow::ensure!(s < e, "start ({s}) must be before end ({e})");
1199        }
1200
1201        let instrument = self
1202            .get_instrument(&instrument_id)
1203            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1204
1205        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1206        let price_precision = instrument.price_precision();
1207        let size_precision = instrument.size_precision();
1208        let ts_init = get_atomic_clock_realtime().get_time_ns();
1209
1210        // When an end time is provided, estimate the block height at that time
1211        // so we can skip directly to the relevant window instead of paginating
1212        // from the latest trade backward (which can be extremely slow for liquid markets).
1213        let initial_cursor = if let Some(end_time) = end {
1214            match self.inner.get_height().await {
1215                Ok(height_resp) => {
1216                    let secs_ahead = (height_resp.time - end_time).num_seconds();
1217                    if secs_ahead > 0 {
1218                        let blocks_to_skip = (secs_ahead as f64 / DYDX_BLOCK_TIME_SECS) as u64;
1219                        let target = height_resp.height.saturating_sub(blocks_to_skip);
1220                        log::debug!(
1221                            "Estimated block height at {end_time}: {target} \
1222                             (current: {}, skipping ~{blocks_to_skip} blocks)",
1223                            height_resp.height,
1224                        );
1225                        Some(target)
1226                    } else {
1227                        None // end_time is in the future, start from latest
1228                    }
1229                }
1230                Err(e) => {
1231                    log::warn!(
1232                        "Failed to get block height for time skip, paginating from latest: {e}"
1233                    );
1234                    None
1235                }
1236            }
1237        } else {
1238            None
1239        };
1240
1241        let overall_limit = limit.unwrap_or(u32::MAX);
1242        let mut remaining = overall_limit;
1243        let mut cursor_height: Option<u64> = initial_cursor;
1244        let mut all_trades = Vec::new();
1245
1246        loop {
1247            let page_limit = remaining.min(DYDX_MAX_TRADES_PER_REQUEST);
1248            let response = self
1249                .inner
1250                .get_trades(ticker, Some(page_limit), cursor_height)
1251                .await?;
1252
1253            let page_count = response.trades.len() as u32;
1254            if page_count == 0 {
1255                break;
1256            }
1257
1258            // Trades come newest-first; oldest is last
1259            let oldest_trade = response.trades.last().unwrap();
1260
1261            // Update cursor for next page (go further back in time)
1262            cursor_height = Some(oldest_trade.created_at_height.saturating_sub(1));
1263
1264            // Break if we've reached before the start boundary
1265            if let Some(s) = start
1266                && oldest_trade.created_at < s
1267            {
1268                // This page contains trades before start — filter and stop
1269                for trade in &response.trades {
1270                    if start.is_some_and(|s| trade.created_at < s) {
1271                        continue;
1272                    }
1273                    if end.is_some_and(|e| trade.created_at > e) {
1274                        continue;
1275                    }
1276                    all_trades.push(super::parse::parse_trade_tick(
1277                        trade,
1278                        instrument_id,
1279                        price_precision,
1280                        size_precision,
1281                        ts_init,
1282                    )?);
1283                }
1284                break;
1285            }
1286
1287            // Convert all trades in this page (with time filtering)
1288            for trade in &response.trades {
1289                if start.is_some_and(|s| trade.created_at < s) {
1290                    continue;
1291                }
1292                if end.is_some_and(|e| trade.created_at > e) {
1293                    continue;
1294                }
1295                all_trades.push(super::parse::parse_trade_tick(
1296                    trade,
1297                    instrument_id,
1298                    price_precision,
1299                    size_precision,
1300                    ts_init,
1301                )?);
1302            }
1303
1304            remaining = remaining.saturating_sub(page_count);
1305
1306            // Break on partial page (no more data) or limit reached
1307            if page_count < page_limit || remaining == 0 {
1308                break;
1309            }
1310        }
1311
1312        // Reverse to chronological order (oldest first) and dedup
1313        all_trades.reverse();
1314        all_trades.dedup_by(|a, b| a.trade_id == b.trade_id);
1315
1316        // Truncate to requested limit
1317        if let Some(lim) = limit {
1318            all_trades.truncate(lim as usize);
1319        }
1320
1321        Ok(all_trades)
1322    }
1323
1324    /// Requests an order book snapshot for a symbol.
1325    ///
1326    /// Fetches order book data from the dYdX Indexer API and converts it to Nautilus
1327    /// `OrderBookDeltas`. The snapshot is represented as a sequence of deltas starting
1328    /// with a CLEAR action followed by ADD actions for each level.
1329    ///
1330    /// # Errors
1331    ///
1332    /// Returns an error if the HTTP request fails, response cannot be parsed,
1333    /// or the instrument is not found in the cache.
1334    pub async fn request_orderbook_snapshot(
1335        &self,
1336        instrument_id: InstrumentId,
1337    ) -> anyhow::Result<OrderBookDeltas> {
1338        let instrument = self
1339            .get_instrument(&instrument_id)
1340            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1341
1342        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1343        let response = self.inner.get_orderbook(ticker).await?;
1344
1345        let ts_init = get_atomic_clock_realtime().get_time_ns();
1346
1347        let mut deltas = Vec::with_capacity(1 + response.bids.len() + response.asks.len());
1348
1349        deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init));
1350
1351        for (i, level) in response.bids.iter().enumerate() {
1352            let is_last = i == response.bids.len() - 1 && response.asks.is_empty();
1353            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1354
1355            let order = BookOrder::new(
1356                NautilusOrderSide::Buy,
1357                Price::from_decimal_dp(level.price, instrument.price_precision())?,
1358                Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1359                0,
1360            );
1361
1362            deltas.push(OrderBookDelta::new(
1363                instrument_id,
1364                BookAction::Add,
1365                order,
1366                flags,
1367                0,
1368                ts_init,
1369                ts_init,
1370            ));
1371        }
1372
1373        for (i, level) in response.asks.iter().enumerate() {
1374            let is_last = i == response.asks.len() - 1;
1375            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1376
1377            let order = BookOrder::new(
1378                NautilusOrderSide::Sell,
1379                Price::from_decimal_dp(level.price, instrument.price_precision())?,
1380                Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1381                0,
1382            );
1383
1384            deltas.push(OrderBookDelta::new(
1385                instrument_id,
1386                BookAction::Add,
1387                order,
1388                flags,
1389                0,
1390                ts_init,
1391                ts_init,
1392            ));
1393        }
1394
1395        Ok(OrderBookDeltas::new(instrument_id, deltas))
1396    }
1397
1398    /// Exposes raw HTTP client for testing and advanced use cases.
1399    ///
1400    /// This provides access to the underlying [`DydxRawHttpClient`] for cases
1401    /// where low-level API access is needed. Most users should use the domain
1402    /// client methods instead.
1403    #[must_use]
1404    pub fn raw_client(&self) -> &Arc<DydxRawHttpClient> {
1405        &self.inner
1406    }
1407
1408    /// Check if this client is configured for testnet.
1409    #[must_use]
1410    pub fn is_testnet(&self) -> bool {
1411        self.inner.is_testnet()
1412    }
1413
1414    /// Get the base URL being used by this client.
1415    #[must_use]
1416    pub fn base_url(&self) -> &str {
1417        self.inner.base_url()
1418    }
1419
1420    /// Check if the instrument cache has been initialized.
1421    #[must_use]
1422    pub fn is_cache_initialized(&self) -> bool {
1423        self.instrument_cache.is_initialized()
1424    }
1425
1426    /// Get the number of instruments currently cached.
1427    #[must_use]
1428    pub fn cached_instruments_count(&self) -> usize {
1429        self.instrument_cache.len()
1430    }
1431
1432    /// Returns a reference to the shared instrument cache.
1433    ///
1434    /// The cache provides lookups by symbol, market ticker, and clob_pair_id.
1435    #[must_use]
1436    pub fn instrument_cache(&self) -> &Arc<InstrumentCache> {
1437        &self.instrument_cache
1438    }
1439
1440    /// Returns all cached instruments.
1441    ///
1442    /// This is a convenience method that collects all instruments into a Vec.
1443    #[must_use]
1444    pub fn all_instruments(&self) -> Vec<InstrumentAny> {
1445        self.instrument_cache.all_instruments()
1446    }
1447
1448    /// Returns all cached instrument IDs.
1449    #[must_use]
1450    pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
1451        self.instrument_cache.all_instrument_ids()
1452    }
1453
1454    /// Requests order status reports for a subaccount.
1455    ///
1456    /// Fetches orders from the dYdX Indexer API and converts them to Nautilus
1457    /// `OrderStatusReport` objects.
1458    ///
1459    /// # Errors
1460    ///
1461    /// Returns an error if the HTTP request fails or parsing fails.
1462    pub async fn request_order_status_reports(
1463        &self,
1464        address: &str,
1465        subaccount_number: u32,
1466        account_id: AccountId,
1467        instrument_id: Option<InstrumentId>,
1468    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1469        let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1470
1471        // Convert instrument_id to market filter
1472        let market = instrument_id.map(|id| {
1473            let symbol = id.symbol.to_string();
1474            // Remove -PERP suffix if present to get the dYdX market format (e.g., ETH-USD)
1475            symbol.trim_end_matches("-PERP").to_string()
1476        });
1477
1478        let orders = self
1479            .inner
1480            .get_orders(address, subaccount_number, market.as_deref(), None)
1481            .await?;
1482
1483        let mut reports = Vec::new();
1484
1485        for order in orders {
1486            // Get instrument by clob_pair_id
1487            let instrument = match self.get_instrument_by_clob_id(order.clob_pair_id) {
1488                Some(inst) => inst,
1489                None => {
1490                    log::warn!(
1491                        "Skipping order {}: no cached instrument for clob_pair_id {}",
1492                        order.id,
1493                        order.clob_pair_id
1494                    );
1495                    continue;
1496                }
1497            };
1498
1499            // Filter by instrument_id if specified
1500            if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1501                continue;
1502            }
1503
1504            match super::parse::parse_order_status_report(&order, &instrument, account_id, ts_init)
1505            {
1506                Ok(report) => reports.push(report),
1507                Err(e) => {
1508                    log::warn!("Failed to parse order {}: {e}", order.id);
1509                }
1510            }
1511        }
1512
1513        Ok(reports)
1514    }
1515
1516    /// Requests fill reports for a subaccount.
1517    ///
1518    /// Fetches fills from the dYdX Indexer API and converts them to Nautilus
1519    /// `FillReport` objects.
1520    ///
1521    /// # Errors
1522    ///
1523    /// Returns an error if the HTTP request fails or parsing fails.
1524    pub async fn request_fill_reports(
1525        &self,
1526        address: &str,
1527        subaccount_number: u32,
1528        account_id: AccountId,
1529        instrument_id: Option<InstrumentId>,
1530    ) -> anyhow::Result<Vec<FillReport>> {
1531        let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1532
1533        // Convert instrument_id to market filter
1534        let market = instrument_id.map(|id| {
1535            let symbol = id.symbol.to_string();
1536            symbol.trim_end_matches("-PERP").to_string()
1537        });
1538
1539        let fills_response = self
1540            .inner
1541            .get_fills(address, subaccount_number, market.as_deref(), None)
1542            .await?;
1543
1544        let mut reports = Vec::new();
1545
1546        for fill in fills_response.fills {
1547            // Get instrument by market ticker (e.g., "BTC-USD")
1548            let instrument = match self.get_instrument_by_market(&fill.market) {
1549                Some(inst) => inst,
1550                None => {
1551                    log::warn!(
1552                        "Skipping fill {}: no cached instrument for market {}",
1553                        fill.id,
1554                        fill.market
1555                    );
1556                    continue;
1557                }
1558            };
1559
1560            // Filter by instrument_id if specified
1561            if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1562                continue;
1563            }
1564
1565            match super::parse::parse_fill_report(&fill, &instrument, account_id, ts_init) {
1566                Ok(report) => reports.push(report),
1567                Err(e) => {
1568                    log::warn!("Failed to parse fill {}: {e}", fill.id);
1569                }
1570            }
1571        }
1572
1573        Ok(reports)
1574    }
1575
1576    /// Requests position status reports for a subaccount.
1577    ///
1578    /// Fetches positions from the dYdX Indexer API and converts them to Nautilus
1579    /// `PositionStatusReport` objects.
1580    ///
1581    /// # Errors
1582    ///
1583    /// Returns an error if the HTTP request fails or parsing fails.
1584    pub async fn request_position_status_reports(
1585        &self,
1586        address: &str,
1587        subaccount_number: u32,
1588        account_id: AccountId,
1589        instrument_id: Option<InstrumentId>,
1590    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1591        let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1592
1593        let subaccount_response = self
1594            .inner
1595            .get_subaccount(address, subaccount_number)
1596            .await?;
1597
1598        let mut reports = Vec::new();
1599
1600        for (market, position) in subaccount_response.subaccount.open_perpetual_positions {
1601            // Get instrument by market ticker (e.g., "BTC-USD")
1602            let instrument = match self.get_instrument_by_market(&market) {
1603                Some(inst) => inst,
1604                None => {
1605                    log::warn!("Skipping position: no cached instrument for market {market}");
1606                    continue;
1607                }
1608            };
1609
1610            // Filter by instrument_id if specified
1611            if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1612                continue;
1613            }
1614
1615            match super::parse::parse_position_status_report(
1616                &position,
1617                &instrument,
1618                account_id,
1619                ts_init,
1620            ) {
1621                Ok(report) => reports.push(report),
1622                Err(e) => {
1623                    log::warn!("Failed to parse position for {market}: {e}");
1624                }
1625            }
1626        }
1627
1628        Ok(reports)
1629    }
1630}
1631
1632#[cfg(test)]
1633mod tests {
1634    use rstest::rstest;
1635
1636    use super::*;
1637    use crate::http::error;
1638
1639    #[tokio::test]
1640    async fn test_raw_client_creation() {
1641        let client = DydxRawHttpClient::new(None, Some(30), None, false, None);
1642        assert!(client.is_ok());
1643
1644        let client = client.unwrap();
1645        assert!(!client.is_testnet());
1646        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1647    }
1648
1649    #[tokio::test]
1650    async fn test_raw_client_testnet() {
1651        let client = DydxRawHttpClient::new(None, Some(30), None, true, None);
1652        assert!(client.is_ok());
1653
1654        let client = client.unwrap();
1655        assert!(client.is_testnet());
1656        assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1657    }
1658
1659    #[tokio::test]
1660    async fn test_domain_client_creation() {
1661        let client = DydxHttpClient::new(None, Some(30), None, false, None);
1662        assert!(client.is_ok());
1663
1664        let client = client.unwrap();
1665        assert!(!client.is_testnet());
1666        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1667        assert!(!client.is_cache_initialized());
1668        assert_eq!(client.cached_instruments_count(), 0);
1669    }
1670
1671    #[tokio::test]
1672    async fn test_domain_client_testnet() {
1673        let client = DydxHttpClient::new(None, Some(30), None, true, None);
1674        assert!(client.is_ok());
1675
1676        let client = client.unwrap();
1677        assert!(client.is_testnet());
1678        assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1679    }
1680
1681    #[tokio::test]
1682    async fn test_domain_client_default() {
1683        let client = DydxHttpClient::default();
1684        assert!(!client.is_testnet());
1685        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1686        assert!(!client.is_cache_initialized());
1687    }
1688
1689    #[tokio::test]
1690    async fn test_domain_client_clone() {
1691        let client = DydxHttpClient::new(None, Some(30), None, false, None).unwrap();
1692
1693        // Clone before initialization
1694        let cloned = client.clone();
1695        assert!(!cloned.is_cache_initialized());
1696
1697        client.instrument_cache.insert_instruments_only(vec![]);
1698
1699        // Clone after initialization
1700        #[allow(clippy::redundant_clone)]
1701        let cloned_after = client.clone();
1702        assert!(cloned_after.is_cache_initialized());
1703    }
1704
1705    #[rstest]
1706    fn test_domain_client_get_instrument_not_found() {
1707        use nautilus_model::identifiers::{Symbol, Venue};
1708        let client = DydxHttpClient::default();
1709        let instrument_id = InstrumentId::new(Symbol::new("ETH-USD-PERP"), Venue::new("DYDX"));
1710        let result = client.get_instrument(&instrument_id);
1711        assert!(result.is_none());
1712    }
1713
1714    #[tokio::test]
1715    async fn test_http_timeout_respects_configuration_and_does_not_block() {
1716        use axum::{Router, routing::get};
1717        use tokio::net::TcpListener;
1718
1719        async fn slow_handler() -> &'static str {
1720            // Sleep longer than the configured HTTP timeout.
1721            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1722            "ok"
1723        }
1724
1725        let router = Router::new().route("/v4/slow", get(slow_handler));
1726
1727        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1728        let addr = listener.local_addr().unwrap();
1729
1730        tokio::spawn(async move {
1731            axum::serve(listener, router.into_make_service())
1732                .await
1733                .unwrap();
1734        });
1735
1736        let base_url = format!("http://{addr}");
1737
1738        // Configure a small operation timeout and no retries so the request
1739        // fails quickly even though the handler sleeps for 5 seconds.
1740        let retry_config = RetryConfig {
1741            max_retries: 0,
1742            initial_delay_ms: 0,
1743            max_delay_ms: 0,
1744            backoff_factor: 1.0,
1745            jitter_ms: 0,
1746            operation_timeout_ms: Some(500),
1747            immediate_first: true,
1748            max_elapsed_ms: Some(1_000),
1749        };
1750
1751        // Keep HTTP client timeout at a typical value; rely on RetryManager
1752        // operation timeout to enforce non-blocking behavior.
1753        let client =
1754            DydxRawHttpClient::new(Some(base_url), Some(60), None, false, Some(retry_config))
1755                .unwrap();
1756
1757        let start = std::time::Instant::now();
1758        let result: Result<serde_json::Value, error::DydxHttpError> =
1759            client.send_request(Method::GET, "/v4/slow", None).await;
1760        let elapsed = start.elapsed();
1761
1762        // Request should fail (timeout or client error), but without blocking the thread
1763        // for the full handler duration.
1764        assert!(result.is_err());
1765        assert!(elapsed < std::time::Duration::from_secs(3));
1766    }
1767}