nautilus_dydx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides an ergonomic wrapper around the **dYdX v4 Indexer REST API** –
17//! <https://docs.dydx.exchange/api_integration-indexer/indexer_api>.
18//!
19//! This module exports two complementary HTTP clients following the standardized
20//! two-layer architecture pattern established in OKX, Bybit, and BitMEX adapters:
21//!
22//! - [`DydxRawHttpClient`]: Low-level HTTP methods matching dYdX Indexer API endpoints.
23//! - [`DydxHttpClient`]: High-level methods using Nautilus domain types with instrument caching.
24//!
25//! ## Two-Layer Architecture
26//!
27//! The raw client handles HTTP communication, rate limiting, retries, and basic response parsing.
28//! The domain client wraps the raw client in an `Arc`, maintains an instrument cache using `DashMap`,
29//! and provides high-level methods that work with Nautilus domain types.
30//!
31//! ## Key Responsibilities
32//!
33//! - Rate-limiting based on the public dYdX specification.
34//! - Zero-copy deserialization of large JSON payloads into domain models.
35//! - Conversion of raw exchange errors into the rich [`DydxHttpError`] enum.
36//! - Instrument caching with standard methods: `cache_instruments()`, `cache_instrument()`, `get_instrument()`.
37//!
38//! # Important Note
39//!
40//! The dYdX v4 Indexer REST API does **NOT** require authentication or request signing.
41//! All endpoints are publicly accessible using only wallet addresses and subaccount numbers
42//! as query parameters. Order submission and trading operations use gRPC with blockchain
43//! transaction signing, not REST API.
44//!
45//! # Official Documentation
46//!
47//! | Endpoint                             | Reference                                              |
48//! |--------------------------------------|--------------------------------------------------------|
49//! | Market data                          | <https://docs.dydx.exchange/api_integration-indexer/indexer_api#markets> |
50//! | Account data                         | <https://docs.dydx.exchange/api_integration-indexer/indexer_api#accounts> |
51//! | Utility endpoints                    | <https://docs.dydx.exchange/api_integration-indexer/indexer_api#utility> |
52
53use std::{
54    collections::HashMap,
55    fmt::Debug,
56    num::NonZeroU32,
57    sync::{
58        Arc, LazyLock,
59        atomic::{AtomicBool, Ordering},
60    },
61};
62
63use chrono::{DateTime, Utc};
64use dashmap::DashMap;
65use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
66use nautilus_model::{
67    data::{
68        Bar, BarType, BookOrder, OrderBookDelta, OrderBookDeltas, TradeTick,
69        bar::get_bar_interval_ns,
70    },
71    enums::{AggressorSide, BookAction, OrderSide as NautilusOrderSide, RecordFlag},
72    identifiers::{AccountId, InstrumentId, TradeId},
73    instruments::{Instrument, InstrumentAny},
74    reports::{FillReport, OrderStatusReport, PositionStatusReport},
75    types::{Price, Quantity},
76};
77use nautilus_network::{
78    http::{HttpClient, Method, USER_AGENT},
79    ratelimiter::quota::Quota,
80    retry::{RetryConfig, RetryManager},
81};
82use serde::{Deserialize, Serialize, de::DeserializeOwned};
83use tokio_util::sync::CancellationToken;
84use ustr::Ustr;
85
86use super::error::DydxHttpError;
87use crate::common::{
88    consts::{DYDX_HTTP_URL, DYDX_TESTNET_HTTP_URL},
89    enums::DydxCandleResolution,
90    parse::extract_raw_symbol,
91};
92
93/// Default dYdX Indexer REST API rate limit.
94///
95/// The dYdX Indexer API rate limits are generous for read-only operations:
96/// - General: 100 requests per 10 seconds per IP
97/// - We use a conservative 10 requests per second as the default quota.
98pub static DYDX_REST_QUOTA: LazyLock<Quota> =
99    LazyLock::new(|| Quota::per_second(NonZeroU32::new(10).unwrap()));
100
101/// Represents a dYdX HTTP response wrapper.
102///
103/// Most dYdX Indexer API endpoints return data directly without a wrapper,
104/// but some endpoints may use this structure for consistency.
105#[derive(Debug, Serialize, Deserialize)]
106pub struct DydxResponse<T> {
107    /// The typed data returned by the dYdX endpoint.
108    pub data: T,
109}
110
111/// Provides a raw HTTP client for interacting with the [dYdX v4](https://dydx.exchange) Indexer REST API.
112///
113/// This client wraps the underlying [`HttpClient`] to handle functionality
114/// specific to dYdX Indexer API, such as rate-limiting, forming request URLs,
115/// and deserializing responses into dYdX specific data models.
116///
117/// **Note**: Unlike traditional centralized exchanges, the dYdX v4 Indexer REST API
118/// does NOT require authentication, API keys, or request signing. All endpoints are
119/// publicly accessible.
120pub struct DydxRawHttpClient {
121    base_url: String,
122    client: HttpClient,
123    retry_manager: RetryManager<DydxHttpError>,
124    cancellation_token: CancellationToken,
125    is_testnet: bool,
126}
127
128impl Default for DydxRawHttpClient {
129    fn default() -> Self {
130        Self::new(None, Some(60), None, false, None)
131            .expect("Failed to create default DydxRawHttpClient")
132    }
133}
134
135impl Debug for DydxRawHttpClient {
136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137        f.debug_struct(stringify!(DydxRawHttpClient))
138            .field("base_url", &self.base_url)
139            .field("is_testnet", &self.is_testnet)
140            .finish_non_exhaustive()
141    }
142}
143
144impl DydxRawHttpClient {
145    /// Cancel all pending HTTP requests.
146    pub fn cancel_all_requests(&self) {
147        self.cancellation_token.cancel();
148    }
149
150    /// Get the cancellation token for this client.
151    pub fn cancellation_token(&self) -> &CancellationToken {
152        &self.cancellation_token
153    }
154
155    /// Creates a new [`DydxRawHttpClient`] using the default dYdX Indexer HTTP URL,
156    /// optionally overridden with a custom base URL.
157    ///
158    /// **Note**: No credentials are required as the dYdX Indexer API is publicly accessible.
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if the retry manager cannot be created.
163    pub fn new(
164        base_url: Option<String>,
165        timeout_secs: Option<u64>,
166        proxy_url: Option<String>,
167        is_testnet: bool,
168        retry_config: Option<RetryConfig>,
169    ) -> anyhow::Result<Self> {
170        let base_url = if is_testnet {
171            base_url.unwrap_or_else(|| DYDX_TESTNET_HTTP_URL.to_string())
172        } else {
173            base_url.unwrap_or_else(|| DYDX_HTTP_URL.to_string())
174        };
175
176        let retry_manager = RetryManager::new(retry_config.unwrap_or_default());
177
178        // Build headers
179        let mut headers = HashMap::new();
180        headers.insert(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
181
182        let client = HttpClient::new(
183            headers,
184            vec![], // No specific headers to extract from responses
185            vec![], // No keyed quotas (we use a single global quota)
186            Some(*DYDX_REST_QUOTA),
187            timeout_secs,
188            proxy_url,
189        )
190        .map_err(|e| {
191            DydxHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
192        })?;
193
194        Ok(Self {
195            base_url,
196            client,
197            retry_manager,
198            cancellation_token: CancellationToken::new(),
199            is_testnet,
200        })
201    }
202
203    /// Check if this client is configured for testnet.
204    #[must_use]
205    pub const fn is_testnet(&self) -> bool {
206        self.is_testnet
207    }
208
209    /// Get the base URL being used by this client.
210    #[must_use]
211    pub fn base_url(&self) -> &str {
212        &self.base_url
213    }
214
215    /// Send a request to a dYdX Indexer API endpoint.
216    ///
217    /// **Note**: dYdX Indexer API does not require authentication headers.
218    ///
219    /// # Errors
220    ///
221    /// Returns an error if:
222    /// - The HTTP request fails.
223    /// - The response has a non-success HTTP status code.
224    /// - The response body cannot be deserialized to type `T`.
225    /// - The request is canceled.
226    pub async fn send_request<T>(
227        &self,
228        method: Method,
229        endpoint: &str,
230        query_params: Option<&str>,
231    ) -> Result<T, DydxHttpError>
232    where
233        T: DeserializeOwned,
234    {
235        let url = if let Some(params) = query_params {
236            format!("{}{endpoint}?{params}", self.base_url)
237        } else {
238            format!("{}{endpoint}", self.base_url)
239        };
240
241        let operation = || async {
242            let request = self
243                .client
244                .request_with_ustr_keys(
245                    method.clone(),
246                    url.clone(),
247                    None, // No params
248                    None, // No additional headers
249                    None, // No body for GET requests
250                    None, // Use default timeout
251                    None, // No specific rate limit keys (using global quota)
252                )
253                .await
254                .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
255
256            // Check for HTTP errors
257            if !request.status.is_success() {
258                return Err(DydxHttpError::HttpStatus {
259                    status: request.status.as_u16(),
260                    message: String::from_utf8_lossy(&request.body).to_string(),
261                });
262            }
263
264            Ok(request)
265        };
266
267        // Retry strategy for dYdX Indexer API:
268        // 1. Network errors: always retry (transient connection issues)
269        // 2. HTTP 429/5xx: rate limiting and server errors should be retried
270        // 3. Client errors (4xx except 429): should NOT be retried
271        let should_retry = |error: &DydxHttpError| -> bool {
272            match error {
273                DydxHttpError::HttpClientError(_) => true,
274                DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
275                _ => false,
276            }
277        };
278
279        let create_error = |msg: String| -> DydxHttpError {
280            if msg == "canceled" {
281                DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
282            } else {
283                DydxHttpError::ValidationError(msg)
284            }
285        };
286
287        // Execute request with retry logic
288        let response = self
289            .retry_manager
290            .execute_with_retry_with_cancel(
291                endpoint,
292                operation,
293                should_retry,
294                create_error,
295                &self.cancellation_token,
296            )
297            .await?;
298
299        // Deserialize response
300        serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
301            error: e.to_string(),
302            body: String::from_utf8_lossy(&response.body).to_string(),
303        })
304    }
305
306    /// Send a POST request to a dYdX Indexer API endpoint.
307    ///
308    /// Note: Most dYdX Indexer endpoints are GET-based. POST is rarely used.
309    ///
310    /// # Errors
311    ///
312    /// Returns an error if:
313    /// - The request body cannot be serialized to JSON.
314    /// - The HTTP request fails.
315    /// - The response has a non-success HTTP status code.
316    /// - The response body cannot be deserialized to type `T`.
317    /// - The request is canceled.
318    pub async fn send_post_request<T, B>(
319        &self,
320        endpoint: &str,
321        body: &B,
322    ) -> Result<T, DydxHttpError>
323    where
324        T: DeserializeOwned,
325        B: Serialize,
326    {
327        let url = format!("{}{endpoint}", self.base_url);
328
329        let body_bytes = serde_json::to_vec(body).map_err(|e| DydxHttpError::Serialization {
330            error: e.to_string(),
331        })?;
332
333        let operation = || async {
334            let request = self
335                .client
336                .request_with_ustr_keys(
337                    Method::POST,
338                    url.clone(),
339                    None, // No params
340                    None, // No additional headers (content-type handled by body)
341                    Some(body_bytes.clone()),
342                    None, // Use default timeout
343                    None, // No specific rate limit keys (using global quota)
344                )
345                .await
346                .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
347
348            // Check for HTTP errors
349            if !request.status.is_success() {
350                return Err(DydxHttpError::HttpStatus {
351                    status: request.status.as_u16(),
352                    message: String::from_utf8_lossy(&request.body).to_string(),
353                });
354            }
355
356            Ok(request)
357        };
358
359        // Retry strategy (same as GET requests)
360        let should_retry = |error: &DydxHttpError| -> bool {
361            match error {
362                DydxHttpError::HttpClientError(_) => true,
363                DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
364                _ => false,
365            }
366        };
367
368        let create_error = |msg: String| -> DydxHttpError {
369            if msg == "canceled" {
370                DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
371            } else {
372                DydxHttpError::ValidationError(msg)
373            }
374        };
375
376        // Execute request with retry logic
377        let response = self
378            .retry_manager
379            .execute_with_retry_with_cancel(
380                endpoint,
381                operation,
382                should_retry,
383                create_error,
384                &self.cancellation_token,
385            )
386            .await?;
387
388        // Deserialize response
389        serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
390            error: e.to_string(),
391            body: String::from_utf8_lossy(&response.body).to_string(),
392        })
393    }
394
395    /// Fetch all perpetual markets from dYdX.
396    ///
397    /// # Errors
398    ///
399    /// Returns an error if the HTTP request fails or response parsing fails.
400    pub async fn get_markets(&self) -> Result<super::models::MarketsResponse, DydxHttpError> {
401        self.send_request(Method::GET, "/v4/perpetualMarkets", None)
402            .await
403    }
404
405    /// Fetch all instruments and parse them into Nautilus `InstrumentAny` types.
406    ///
407    /// This method fetches all perpetual markets from dYdX and converts them
408    /// into Nautilus instrument definitions using the `parse_instrument_any` function.
409    ///
410    /// # Errors
411    ///
412    /// Returns an error if:
413    /// - The HTTP request fails.
414    /// - The response cannot be parsed.
415    /// - Any instrument parsing fails.
416    ///
417    pub async fn fetch_instruments(
418        &self,
419        maker_fee: Option<rust_decimal::Decimal>,
420        taker_fee: Option<rust_decimal::Decimal>,
421    ) -> Result<Vec<InstrumentAny>, DydxHttpError> {
422        use nautilus_core::time::get_atomic_clock_realtime;
423
424        let markets_response = self.get_markets().await?;
425        let ts_init = get_atomic_clock_realtime().get_time_ns();
426
427        let mut instruments = Vec::new();
428        let mut skipped_inactive = 0;
429
430        for (ticker, market) in markets_response.markets {
431            if !super::parse::is_market_active(&market.status) {
432                tracing::debug!(
433                    "Skipping inactive market {ticker} (status: {:?})",
434                    market.status
435                );
436                skipped_inactive += 1;
437                continue;
438            }
439
440            match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
441                Ok(instrument) => {
442                    instruments.push(instrument);
443                }
444                Err(e) => {
445                    tracing::error!("Failed to parse instrument {ticker}: {e}");
446                }
447            }
448        }
449
450        if skipped_inactive > 0 {
451            tracing::info!(
452                "Parsed {} instruments, skipped {} inactive",
453                instruments.len(),
454                skipped_inactive
455            );
456        } else {
457            tracing::info!("Parsed {} instruments", instruments.len());
458        }
459
460        Ok(instruments)
461    }
462
463    /// Fetch orderbook for a specific market.
464    ///
465    /// # Errors
466    ///
467    /// Returns an error if the HTTP request fails or response parsing fails.
468    pub async fn get_orderbook(
469        &self,
470        ticker: &str,
471    ) -> Result<super::models::OrderbookResponse, DydxHttpError> {
472        let endpoint = format!("/v4/orderbooks/perpetualMarket/{ticker}");
473        self.send_request(Method::GET, &endpoint, None).await
474    }
475
476    /// Fetch recent trades for a market.
477    ///
478    /// # Errors
479    ///
480    /// Returns an error if the HTTP request fails or response parsing fails.
481    pub async fn get_trades(
482        &self,
483        ticker: &str,
484        limit: Option<u32>,
485    ) -> Result<super::models::TradesResponse, DydxHttpError> {
486        let endpoint = format!("/v4/trades/perpetualMarket/{ticker}");
487        let query = limit.map(|l| format!("limit={l}"));
488        self.send_request(Method::GET, &endpoint, query.as_deref())
489            .await
490    }
491
492    /// Fetch candles/klines for a market.
493    ///
494    /// # Errors
495    ///
496    /// Returns an error if the HTTP request fails or response parsing fails.
497    pub async fn get_candles(
498        &self,
499        ticker: &str,
500        resolution: DydxCandleResolution,
501        limit: Option<u32>,
502        from_iso: Option<DateTime<Utc>>,
503        to_iso: Option<DateTime<Utc>>,
504    ) -> Result<super::models::CandlesResponse, DydxHttpError> {
505        let endpoint = format!("/v4/candles/perpetualMarkets/{ticker}");
506        let mut query_parts = vec![format!("resolution={}", resolution)];
507        if let Some(l) = limit {
508            query_parts.push(format!("limit={l}"));
509        }
510        if let Some(from) = from_iso {
511            let from_str = from.to_rfc3339();
512            query_parts.push(format!("fromISO={}", urlencoding::encode(&from_str)));
513        }
514        if let Some(to) = to_iso {
515            let to_str = to.to_rfc3339();
516            query_parts.push(format!("toISO={}", urlencoding::encode(&to_str)));
517        }
518        let query = query_parts.join("&");
519        self.send_request(Method::GET, &endpoint, Some(&query))
520            .await
521    }
522
523    /// Fetch subaccount information.
524    ///
525    /// # Errors
526    ///
527    /// Returns an error if the HTTP request fails or response parsing fails.
528    pub async fn get_subaccount(
529        &self,
530        address: &str,
531        subaccount_number: u32,
532    ) -> Result<super::models::SubaccountResponse, DydxHttpError> {
533        let endpoint = format!("/v4/addresses/{address}/subaccountNumber/{subaccount_number}");
534        self.send_request(Method::GET, &endpoint, None).await
535    }
536
537    /// Fetch fills for a subaccount.
538    ///
539    /// # Errors
540    ///
541    /// Returns an error if the HTTP request fails or response parsing fails.
542    pub async fn get_fills(
543        &self,
544        address: &str,
545        subaccount_number: u32,
546        market: Option<&str>,
547        limit: Option<u32>,
548    ) -> Result<super::models::FillsResponse, DydxHttpError> {
549        let endpoint = "/v4/fills";
550        let mut query_parts = vec![
551            format!("address={address}"),
552            format!("subaccountNumber={subaccount_number}"),
553        ];
554        if let Some(m) = market {
555            query_parts.push(format!("market={m}"));
556        }
557        if let Some(l) = limit {
558            query_parts.push(format!("limit={l}"));
559        }
560        let query = query_parts.join("&");
561        self.send_request(Method::GET, endpoint, Some(&query)).await
562    }
563
564    /// Fetch orders for a subaccount.
565    ///
566    /// # Errors
567    ///
568    /// Returns an error if the HTTP request fails or response parsing fails.
569    pub async fn get_orders(
570        &self,
571        address: &str,
572        subaccount_number: u32,
573        market: Option<&str>,
574        limit: Option<u32>,
575    ) -> Result<super::models::OrdersResponse, DydxHttpError> {
576        let endpoint = "/v4/orders";
577        let mut query_parts = vec![
578            format!("address={address}"),
579            format!("subaccountNumber={subaccount_number}"),
580        ];
581        if let Some(m) = market {
582            query_parts.push(format!("market={m}"));
583        }
584        if let Some(l) = limit {
585            query_parts.push(format!("limit={l}"));
586        }
587        let query = query_parts.join("&");
588        self.send_request(Method::GET, endpoint, Some(&query)).await
589    }
590
591    /// Fetch transfers for a subaccount.
592    ///
593    /// # Errors
594    ///
595    /// Returns an error if the HTTP request fails or response parsing fails.
596    pub async fn get_transfers(
597        &self,
598        address: &str,
599        subaccount_number: u32,
600        limit: Option<u32>,
601    ) -> Result<super::models::TransfersResponse, DydxHttpError> {
602        let endpoint = "/v4/transfers";
603        let mut query_parts = vec![
604            format!("address={address}"),
605            format!("subaccountNumber={subaccount_number}"),
606        ];
607        if let Some(l) = limit {
608            query_parts.push(format!("limit={l}"));
609        }
610        let query = query_parts.join("&");
611        self.send_request(Method::GET, endpoint, Some(&query)).await
612    }
613
614    /// Get current server time.
615    ///
616    /// # Errors
617    ///
618    /// Returns an error if the HTTP request fails or response parsing fails.
619    pub async fn get_time(&self) -> Result<super::models::TimeResponse, DydxHttpError> {
620        self.send_request(Method::GET, "/v4/time", None).await
621    }
622
623    /// Get current blockchain height.
624    ///
625    /// # Errors
626    ///
627    /// Returns an error if the HTTP request fails or response parsing fails.
628    pub async fn get_height(&self) -> Result<super::models::HeightResponse, DydxHttpError> {
629        self.send_request(Method::GET, "/v4/height", None).await
630    }
631}
632
633/// Provides a higher-level HTTP client for the [dYdX v4](https://dydx.exchange) Indexer REST API.
634///
635/// This client wraps the underlying `DydxRawHttpClient` to handle conversions
636/// into the Nautilus domain model, following the two-layer pattern established
637/// in OKX, Bybit, and BitMEX adapters.
638///
639/// **Architecture:**
640/// - **Raw client** (`DydxRawHttpClient`): Low-level HTTP methods matching dYdX Indexer API endpoints.
641/// - **Domain client** (`DydxHttpClient`): High-level methods using Nautilus domain types.
642///
643/// The domain client:
644/// - Wraps the raw client in an `Arc` for efficient cloning (required for Python bindings).
645/// - Maintains an instrument cache using `DashMap` for thread-safe concurrent access.
646/// - Provides standard cache methods: `cache_instruments()`, `cache_instrument()`, `get_instrument()`.
647/// - Tracks cache initialization state for optimizations.
648#[derive(Debug)]
649#[cfg_attr(
650    feature = "python",
651    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx")
652)]
653pub struct DydxHttpClient {
654    /// Raw HTTP client wrapped in Arc for efficient cloning.
655    pub(crate) inner: Arc<DydxRawHttpClient>,
656    /// Instrument cache shared across the adapter using DashMap for thread-safe access.
657    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
658    /// Cached mapping from CLOB pair ID → InstrumentId for efficient lookups.
659    ///
660    /// This is populated from HTTP PerpetualMarket metadata (`clob_pair_id`) alongside
661    /// instrument creation to avoid re-deriving IDs from symbols or other heuristics.
662    pub(crate) clob_pair_id_to_instrument: Arc<DashMap<u32, InstrumentId>>,
663    /// Cached mapping from InstrumentId → PerpetualMarket for market params extraction.
664    ///
665    /// This stores the raw market data from the HTTP API for later extraction of
666    /// quantization parameters (atomic_resolution, subticks_per_tick, etc.) needed
667    /// for order submission.
668    pub(crate) market_params_cache: Arc<DashMap<InstrumentId, super::models::PerpetualMarket>>,
669    /// Tracks whether the instrument cache has been initialized.
670    cache_initialized: AtomicBool,
671}
672
673impl Clone for DydxHttpClient {
674    fn clone(&self) -> Self {
675        let cache_initialized = AtomicBool::new(false);
676        let is_initialized = self.cache_initialized.load(Ordering::Acquire);
677        if is_initialized {
678            cache_initialized.store(true, Ordering::Release);
679        }
680
681        Self {
682            inner: self.inner.clone(),
683            instruments_cache: self.instruments_cache.clone(),
684            clob_pair_id_to_instrument: self.clob_pair_id_to_instrument.clone(),
685            market_params_cache: self.market_params_cache.clone(),
686            cache_initialized,
687        }
688    }
689}
690
691impl Default for DydxHttpClient {
692    fn default() -> Self {
693        Self::new(None, Some(60), None, false, None)
694            .expect("Failed to create default DydxHttpClient")
695    }
696}
697
698impl DydxHttpClient {
699    /// Creates a new [`DydxHttpClient`] using the default dYdX Indexer HTTP URL,
700    /// optionally overridden with a custom base URL.
701    ///
702    /// **Note**: No credentials are required as the dYdX Indexer API is publicly accessible.
703    /// Order submission and trading operations use gRPC with blockchain transaction signing.
704    ///
705    /// # Errors
706    ///
707    /// Returns an error if the underlying HTTP client or retry manager cannot be created.
708    pub fn new(
709        base_url: Option<String>,
710        timeout_secs: Option<u64>,
711        proxy_url: Option<String>,
712        is_testnet: bool,
713        retry_config: Option<RetryConfig>,
714    ) -> anyhow::Result<Self> {
715        Ok(Self {
716            inner: Arc::new(DydxRawHttpClient::new(
717                base_url,
718                timeout_secs,
719                proxy_url,
720                is_testnet,
721                retry_config,
722            )?),
723            instruments_cache: Arc::new(DashMap::new()),
724            clob_pair_id_to_instrument: Arc::new(DashMap::new()),
725            market_params_cache: Arc::new(DashMap::new()),
726            cache_initialized: AtomicBool::new(false),
727        })
728    }
729
730    /// Requests instruments from the dYdX Indexer API and returns Nautilus domain types.
731    ///
732    /// This method does NOT automatically cache results. Use `fetch_and_cache_instruments()`
733    /// for automatic caching, or call `cache_instruments()` manually with the results.
734    ///
735    /// # Errors
736    ///
737    /// Returns an error if the HTTP request or parsing fails.
738    /// Individual instrument parsing errors are logged as warnings.
739    pub async fn request_instruments(
740        &self,
741        symbol: Option<String>,
742        maker_fee: Option<rust_decimal::Decimal>,
743        taker_fee: Option<rust_decimal::Decimal>,
744    ) -> anyhow::Result<Vec<InstrumentAny>> {
745        use nautilus_core::time::get_atomic_clock_realtime;
746
747        let markets_response = self.inner.get_markets().await?;
748        let ts_init = get_atomic_clock_realtime().get_time_ns();
749
750        let mut instruments = Vec::new();
751        let mut skipped_inactive = 0;
752
753        for (ticker, market) in markets_response.markets {
754            // Filter by symbol if specified
755            if let Some(ref sym) = symbol
756                && ticker != *sym
757            {
758                continue;
759            }
760
761            if !super::parse::is_market_active(&market.status) {
762                tracing::debug!(
763                    "Skipping inactive market {ticker} (status: {:?})",
764                    market.status
765                );
766                skipped_inactive += 1;
767                continue;
768            }
769
770            match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
771                Ok(instrument) => {
772                    instruments.push(instrument);
773                }
774                Err(e) => {
775                    tracing::error!("Failed to parse instrument {ticker}: {e}");
776                }
777            }
778        }
779
780        if skipped_inactive > 0 {
781            tracing::info!(
782                "Parsed {} instruments, skipped {} inactive",
783                instruments.len(),
784                skipped_inactive
785            );
786        } else {
787            tracing::debug!("Parsed {} instruments", instruments.len());
788        }
789
790        Ok(instruments)
791    }
792
793    /// Fetches instruments from the API and caches them.
794    ///
795    /// This is a convenience method that fetches instruments and populates both
796    /// the symbol-based and CLOB pair ID-based caches.
797    ///
798    /// On success, existing caches are cleared and repopulated atomically.
799    /// On failure, existing caches are preserved (no partial updates).
800    ///
801    /// # Errors
802    ///
803    /// Returns an error if the HTTP request fails.
804    pub async fn fetch_and_cache_instruments(&self) -> anyhow::Result<()> {
805        use nautilus_core::time::get_atomic_clock_realtime;
806
807        // Fetch first - preserve existing cache on network failure
808        let markets_response = self.inner.get_markets().await?;
809        let ts_init = get_atomic_clock_realtime().get_time_ns();
810
811        let mut parsed_instruments = Vec::new();
812        let mut parsed_markets = Vec::new();
813        let mut skipped_inactive = 0;
814
815        for (ticker, market) in markets_response.markets {
816            if !super::parse::is_market_active(&market.status) {
817                tracing::debug!(
818                    "Skipping inactive market {ticker} (status: {:?})",
819                    market.status
820                );
821                skipped_inactive += 1;
822                continue;
823            }
824
825            match super::parse::parse_instrument_any(&market, None, None, ts_init) {
826                Ok(instrument) => {
827                    parsed_instruments.push(instrument);
828                    parsed_markets.push(market);
829                }
830                Err(e) => {
831                    tracing::error!("Failed to parse instrument {ticker}: {e}");
832                }
833            }
834        }
835
836        // Only clear and repopulate caches after successful fetch and parse
837        self.instruments_cache.clear();
838        self.clob_pair_id_to_instrument.clear();
839        self.market_params_cache.clear();
840
841        for (instrument, market) in parsed_instruments.iter().zip(parsed_markets.into_iter()) {
842            let instrument_id = instrument.id();
843            let symbol = instrument_id.symbol.inner();
844            self.instruments_cache.insert(symbol, instrument.clone());
845            self.clob_pair_id_to_instrument
846                .insert(market.clob_pair_id, instrument_id);
847            self.market_params_cache.insert(instrument_id, market);
848        }
849
850        if !parsed_instruments.is_empty() {
851            self.cache_initialized.store(true, Ordering::Release);
852        }
853
854        if skipped_inactive > 0 {
855            tracing::info!(
856                "Cached {} instruments, skipped {} inactive",
857                parsed_instruments.len(),
858                skipped_inactive
859            );
860        } else {
861            tracing::info!("Cached {} instruments", parsed_instruments.len());
862        }
863
864        Ok(())
865    }
866
867    /// Caches multiple instruments.
868    ///
869    /// Any existing instruments with the same symbols will be replaced.
870    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
871        for inst in instruments {
872            let symbol = inst.id().symbol.inner();
873            self.instruments_cache.insert(symbol, inst);
874        }
875        self.cache_initialized.store(true, Ordering::Release);
876    }
877
878    /// Caches a single instrument.
879    ///
880    /// Any existing instrument with the same symbol will be replaced.
881    pub fn cache_instrument(&self, instrument: InstrumentAny) {
882        let symbol = instrument.id().symbol.inner();
883        self.instruments_cache.insert(symbol, instrument);
884        self.cache_initialized.store(true, Ordering::Release);
885    }
886
887    /// Gets an instrument from the cache by symbol.
888    #[must_use]
889    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
890        self.instruments_cache
891            .get(symbol)
892            .map(|entry| entry.clone())
893    }
894
895    /// Gets an instrument by CLOB pair ID.
896    ///
897    /// This uses the internal clob_pair_id mapping populated during `fetch_and_cache_instruments()`.
898    #[must_use]
899    pub fn get_instrument_by_clob_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
900        // First get the InstrumentId from clob_pair_id mapping
901        let instrument_id = self
902            .clob_pair_id_to_instrument
903            .get(&clob_pair_id)
904            .map(|entry| *entry)?;
905
906        // Then look up the full instrument by symbol
907        self.get_instrument(&instrument_id.symbol.inner())
908    }
909
910    /// Gets market parameters for order submission from the cached market data.
911    ///
912    /// Returns the quantization parameters needed by OrderBuilder to construct
913    /// properly formatted orders for the dYdX v4 protocol.
914    ///
915    /// # Errors
916    ///
917    /// Returns None if the instrument is not found in the market params cache.
918    #[must_use]
919    pub fn get_market_params(
920        &self,
921        instrument_id: &InstrumentId,
922    ) -> Option<super::models::PerpetualMarket> {
923        self.market_params_cache
924            .get(instrument_id)
925            .map(|entry| entry.clone())
926    }
927
928    /// Requests historical trades for a symbol.
929    ///
930    /// Fetches trade data from the dYdX Indexer API's `/v4/trades/perpetualMarket/:ticker` endpoint.
931    /// Results are ordered by creation time descending (newest first).
932    ///
933    /// # Errors
934    ///
935    /// Returns an error if the HTTP request fails or response cannot be parsed.
936    pub async fn request_trades(
937        &self,
938        symbol: &str,
939        limit: Option<u32>,
940    ) -> anyhow::Result<super::models::TradesResponse> {
941        self.inner
942            .get_trades(symbol, limit)
943            .await
944            .map_err(Into::into)
945    }
946
947    /// Requests historical candles for a symbol.
948    ///
949    /// Fetches candle data from the dYdX Indexer API's `/v4/candles/perpetualMarkets/:ticker` endpoint.
950    /// Results are ordered by start time ascending (oldest first).
951    ///
952    /// # Errors
953    ///
954    /// Returns an error if the HTTP request fails or response cannot be parsed.
955    pub async fn request_candles(
956        &self,
957        symbol: &str,
958        resolution: DydxCandleResolution,
959        limit: Option<u32>,
960        from_iso: Option<DateTime<Utc>>,
961        to_iso: Option<DateTime<Utc>>,
962    ) -> anyhow::Result<super::models::CandlesResponse> {
963        self.inner
964            .get_candles(symbol, resolution, limit, from_iso, to_iso)
965            .await
966            .map_err(Into::into)
967    }
968
969    /// Requests historical bars for a symbol and converts to Nautilus Bar objects.
970    ///
971    /// Fetches candle data and converts to Nautilus `Bar` objects using the
972    /// provided `BarType`. Results are ordered by timestamp ascending (oldest first).
973    ///
974    /// # Errors
975    ///
976    /// Returns an error if the HTTP request fails, response cannot be parsed,
977    /// or the instrument is not found in the cache.
978    pub async fn request_bars(
979        &self,
980        bar_type: BarType,
981        resolution: DydxCandleResolution,
982        limit: Option<u32>,
983        from_iso: Option<DateTime<Utc>>,
984        to_iso: Option<DateTime<Utc>>,
985    ) -> anyhow::Result<Vec<Bar>> {
986        let instrument_id = bar_type.instrument_id();
987        let symbol = instrument_id.symbol;
988
989        // Get instrument for precision info
990        let instrument = self
991            .get_instrument(&symbol.inner())
992            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {symbol}"))?;
993
994        // dYdX API expects ticker format "BTC-USD", not "BTC-USD-PERP"
995        let ticker = extract_raw_symbol(symbol.as_str());
996        let response = self
997            .request_candles(ticker, resolution, limit, from_iso, to_iso)
998            .await?;
999
1000        let ts_init = get_atomic_clock_realtime().get_time_ns();
1001        let interval_ns = get_bar_interval_ns(&bar_type);
1002
1003        let mut bars = Vec::with_capacity(response.candles.len());
1004
1005        for candle in response.candles {
1006            // Calculate ts_event: startedAt + interval (end of bar)
1007            let started_at_nanos = candle.started_at.timestamp_nanos_opt().ok_or_else(|| {
1008                anyhow::anyhow!("Timestamp out of range for candle at {}", candle.started_at)
1009            })?;
1010            let ts_event = UnixNanos::from(started_at_nanos as u64) + interval_ns;
1011
1012            let bar = Bar::new(
1013                bar_type,
1014                Price::from_decimal_dp(candle.open, instrument.price_precision())?,
1015                Price::from_decimal_dp(candle.high, instrument.price_precision())?,
1016                Price::from_decimal_dp(candle.low, instrument.price_precision())?,
1017                Price::from_decimal_dp(candle.close, instrument.price_precision())?,
1018                Quantity::from_decimal_dp(candle.base_token_volume, instrument.size_precision())?,
1019                ts_event,
1020                ts_init,
1021            );
1022
1023            bars.push(bar);
1024        }
1025
1026        Ok(bars)
1027    }
1028
1029    /// Requests historical trade ticks for a symbol.
1030    ///
1031    /// Fetches trade data from the dYdX Indexer API and converts them to Nautilus
1032    /// `TradeTick` objects. Results are ordered by timestamp descending (newest first).
1033    ///
1034    /// # Errors
1035    ///
1036    /// Returns an error if the HTTP request fails, response cannot be parsed,
1037    /// or the instrument is not found in the cache.
1038    pub async fn request_trade_ticks(
1039        &self,
1040        instrument_id: InstrumentId,
1041        limit: Option<u32>,
1042    ) -> anyhow::Result<Vec<TradeTick>> {
1043        let symbol = instrument_id.symbol;
1044
1045        let instrument = self
1046            .get_instrument(&symbol.inner())
1047            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {symbol}"))?;
1048
1049        let ticker = extract_raw_symbol(symbol.as_str());
1050        let response = self.request_trades(ticker, limit).await?;
1051
1052        let ts_init = get_atomic_clock_realtime().get_time_ns();
1053
1054        let mut trades = Vec::with_capacity(response.trades.len());
1055
1056        for trade in response.trades {
1057            let ts_event_nanos = trade.created_at.timestamp_nanos_opt().ok_or_else(|| {
1058                anyhow::anyhow!("Timestamp out of range for trade at {}", trade.created_at)
1059            })?;
1060            let ts_event = UnixNanos::from(ts_event_nanos as u64);
1061
1062            let aggressor_side = match trade.side {
1063                NautilusOrderSide::Buy => AggressorSide::Buyer,
1064                NautilusOrderSide::Sell => AggressorSide::Seller,
1065                NautilusOrderSide::NoOrderSide => AggressorSide::NoAggressor,
1066            };
1067
1068            let trade_tick = TradeTick::new(
1069                instrument_id,
1070                Price::from_decimal_dp(trade.price, instrument.price_precision())?,
1071                Quantity::from_decimal_dp(trade.size, instrument.size_precision())?,
1072                aggressor_side,
1073                TradeId::new(&trade.id),
1074                ts_event,
1075                ts_init,
1076            );
1077
1078            trades.push(trade_tick);
1079        }
1080
1081        Ok(trades)
1082    }
1083
1084    /// Requests an order book snapshot for a symbol.
1085    ///
1086    /// Fetches order book data from the dYdX Indexer API and converts it to Nautilus
1087    /// `OrderBookDeltas`. The snapshot is represented as a sequence of deltas starting
1088    /// with a CLEAR action followed by ADD actions for each level.
1089    ///
1090    /// # Errors
1091    ///
1092    /// Returns an error if the HTTP request fails, response cannot be parsed,
1093    /// or the instrument is not found in the cache.
1094    pub async fn request_orderbook_snapshot(
1095        &self,
1096        instrument_id: InstrumentId,
1097    ) -> anyhow::Result<OrderBookDeltas> {
1098        let symbol = instrument_id.symbol;
1099
1100        let instrument = self
1101            .get_instrument(&symbol.inner())
1102            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {symbol}"))?;
1103
1104        let ticker = extract_raw_symbol(symbol.as_str());
1105        let response = self.inner.get_orderbook(ticker).await?;
1106
1107        let ts_init = get_atomic_clock_realtime().get_time_ns();
1108
1109        let mut deltas = Vec::with_capacity(1 + response.bids.len() + response.asks.len());
1110
1111        deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init));
1112
1113        for (i, level) in response.bids.iter().enumerate() {
1114            let is_last = i == response.bids.len() - 1 && response.asks.is_empty();
1115            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1116
1117            let order = BookOrder::new(
1118                NautilusOrderSide::Buy,
1119                Price::from_decimal_dp(level.price, instrument.price_precision())?,
1120                Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1121                0,
1122            );
1123
1124            deltas.push(OrderBookDelta::new(
1125                instrument_id,
1126                BookAction::Add,
1127                order,
1128                flags,
1129                0,
1130                ts_init,
1131                ts_init,
1132            ));
1133        }
1134
1135        for (i, level) in response.asks.iter().enumerate() {
1136            let is_last = i == response.asks.len() - 1;
1137            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1138
1139            let order = BookOrder::new(
1140                NautilusOrderSide::Sell,
1141                Price::from_decimal_dp(level.price, instrument.price_precision())?,
1142                Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1143                0,
1144            );
1145
1146            deltas.push(OrderBookDelta::new(
1147                instrument_id,
1148                BookAction::Add,
1149                order,
1150                flags,
1151                0,
1152                ts_init,
1153                ts_init,
1154            ));
1155        }
1156
1157        Ok(OrderBookDeltas::new(instrument_id, deltas))
1158    }
1159
1160    /// Exposes raw HTTP client for testing and advanced use cases.
1161    ///
1162    /// This provides access to the underlying [`DydxRawHttpClient`] for cases
1163    /// where low-level API access is needed. Most users should use the domain
1164    /// client methods instead.
1165    #[must_use]
1166    pub fn raw_client(&self) -> &Arc<DydxRawHttpClient> {
1167        &self.inner
1168    }
1169
1170    /// Check if this client is configured for testnet.
1171    #[must_use]
1172    pub fn is_testnet(&self) -> bool {
1173        self.inner.is_testnet()
1174    }
1175
1176    /// Get the base URL being used by this client.
1177    #[must_use]
1178    pub fn base_url(&self) -> &str {
1179        self.inner.base_url()
1180    }
1181
1182    /// Check if the instrument cache has been initialized.
1183    #[must_use]
1184    pub fn is_cache_initialized(&self) -> bool {
1185        self.cache_initialized.load(Ordering::Acquire)
1186    }
1187
1188    /// Get the number of instruments currently cached.
1189    #[must_use]
1190    pub fn cached_instruments_count(&self) -> usize {
1191        self.instruments_cache.len()
1192    }
1193
1194    /// Returns a reference to the instruments cache.
1195    #[must_use]
1196    pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
1197        &self.instruments_cache
1198    }
1199
1200    /// Get the mapping from CLOB pair ID to `InstrumentId`.
1201    ///
1202    /// This map is populated when instruments are fetched via `request_instruments` /
1203    /// `cache_instruments()` using the Indexer `PerpetualMarket.clob_pair_id` field.
1204    #[must_use]
1205    pub fn clob_pair_id_mapping(&self) -> &Arc<DashMap<u32, InstrumentId>> {
1206        &self.clob_pair_id_to_instrument
1207    }
1208
1209    /// Requests order status reports for a subaccount.
1210    ///
1211    /// Fetches orders from the dYdX Indexer API and converts them to Nautilus
1212    /// `OrderStatusReport` objects.
1213    ///
1214    /// # Errors
1215    ///
1216    /// Returns an error if the HTTP request fails or parsing fails.
1217    pub async fn request_order_status_reports(
1218        &self,
1219        address: &str,
1220        subaccount_number: u32,
1221        account_id: AccountId,
1222        instrument_id: Option<InstrumentId>,
1223    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1224        let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1225
1226        // Convert instrument_id to market filter
1227        let market = instrument_id.map(|id| {
1228            let symbol = id.symbol.to_string();
1229            // Remove -PERP suffix if present to get the dYdX market format (e.g., ETH-USD)
1230            symbol.trim_end_matches("-PERP").to_string()
1231        });
1232
1233        let orders = self
1234            .inner
1235            .get_orders(address, subaccount_number, market.as_deref(), None)
1236            .await?;
1237
1238        let mut reports = Vec::new();
1239
1240        for order in orders {
1241            // Get instrument by clob_pair_id
1242            let instrument = match self.get_instrument_by_clob_id(order.clob_pair_id) {
1243                Some(inst) => inst,
1244                None => {
1245                    tracing::warn!(
1246                        "Skipping order {}: no cached instrument for clob_pair_id {}",
1247                        order.id,
1248                        order.clob_pair_id
1249                    );
1250                    continue;
1251                }
1252            };
1253
1254            // Filter by instrument_id if specified
1255            if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1256                continue;
1257            }
1258
1259            match super::parse::parse_order_status_report(&order, &instrument, account_id, ts_init)
1260            {
1261                Ok(report) => reports.push(report),
1262                Err(e) => {
1263                    tracing::warn!("Failed to parse order {}: {e}", order.id);
1264                }
1265            }
1266        }
1267
1268        Ok(reports)
1269    }
1270
1271    /// Requests fill reports for a subaccount.
1272    ///
1273    /// Fetches fills from the dYdX Indexer API and converts them to Nautilus
1274    /// `FillReport` objects.
1275    ///
1276    /// # Errors
1277    ///
1278    /// Returns an error if the HTTP request fails or parsing fails.
1279    pub async fn request_fill_reports(
1280        &self,
1281        address: &str,
1282        subaccount_number: u32,
1283        account_id: AccountId,
1284        instrument_id: Option<InstrumentId>,
1285    ) -> anyhow::Result<Vec<FillReport>> {
1286        let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1287
1288        // Convert instrument_id to market filter
1289        let market = instrument_id.map(|id| {
1290            let symbol = id.symbol.to_string();
1291            symbol.trim_end_matches("-PERP").to_string()
1292        });
1293
1294        let fills_response = self
1295            .inner
1296            .get_fills(address, subaccount_number, market.as_deref(), None)
1297            .await?;
1298
1299        let mut reports = Vec::new();
1300
1301        for fill in fills_response.fills {
1302            // Get instrument by market ticker
1303            let market = &fill.market;
1304            let symbol = Ustr::from(&format!("{market}-PERP"));
1305            let instrument = match self.get_instrument(&symbol) {
1306                Some(inst) => inst,
1307                None => {
1308                    tracing::warn!(
1309                        "Skipping fill {}: no cached instrument for market {}",
1310                        fill.id,
1311                        fill.market
1312                    );
1313                    continue;
1314                }
1315            };
1316
1317            // Filter by instrument_id if specified
1318            if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1319                continue;
1320            }
1321
1322            match super::parse::parse_fill_report(&fill, &instrument, account_id, ts_init) {
1323                Ok(report) => reports.push(report),
1324                Err(e) => {
1325                    tracing::warn!("Failed to parse fill {}: {e}", fill.id);
1326                }
1327            }
1328        }
1329
1330        Ok(reports)
1331    }
1332
1333    /// Requests position status reports for a subaccount.
1334    ///
1335    /// Fetches positions from the dYdX Indexer API and converts them to Nautilus
1336    /// `PositionStatusReport` objects.
1337    ///
1338    /// # Errors
1339    ///
1340    /// Returns an error if the HTTP request fails or parsing fails.
1341    pub async fn request_position_status_reports(
1342        &self,
1343        address: &str,
1344        subaccount_number: u32,
1345        account_id: AccountId,
1346        instrument_id: Option<InstrumentId>,
1347    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1348        let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1349
1350        let subaccount_response = self
1351            .inner
1352            .get_subaccount(address, subaccount_number)
1353            .await?;
1354
1355        let mut reports = Vec::new();
1356
1357        for (market, position) in subaccount_response.subaccount.open_perpetual_positions {
1358            // Get instrument by market ticker
1359            let symbol = Ustr::from(&format!("{market}-PERP"));
1360            let instrument = match self.get_instrument(&symbol) {
1361                Some(inst) => inst,
1362                None => {
1363                    tracing::warn!(
1364                        "Skipping position: no cached instrument for market {}",
1365                        market
1366                    );
1367                    continue;
1368                }
1369            };
1370
1371            // Filter by instrument_id if specified
1372            if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1373                continue;
1374            }
1375
1376            match super::parse::parse_position_status_report(
1377                &position,
1378                &instrument,
1379                account_id,
1380                ts_init,
1381            ) {
1382                Ok(report) => reports.push(report),
1383                Err(e) => {
1384                    tracing::warn!("Failed to parse position for {}: {e}", market);
1385                }
1386            }
1387        }
1388
1389        Ok(reports)
1390    }
1391}
1392
1393#[cfg(test)]
1394mod tests {
1395    use nautilus_core::UnixNanos;
1396    use rstest::rstest;
1397
1398    use super::*;
1399    use crate::http::error;
1400
1401    #[tokio::test]
1402    async fn test_raw_client_creation() {
1403        let client = DydxRawHttpClient::new(None, Some(30), None, false, None);
1404        assert!(client.is_ok());
1405
1406        let client = client.unwrap();
1407        assert!(!client.is_testnet());
1408        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1409    }
1410
1411    #[tokio::test]
1412    async fn test_raw_client_testnet() {
1413        let client = DydxRawHttpClient::new(None, Some(30), None, true, None);
1414        assert!(client.is_ok());
1415
1416        let client = client.unwrap();
1417        assert!(client.is_testnet());
1418        assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1419    }
1420
1421    #[tokio::test]
1422    async fn test_domain_client_creation() {
1423        let client = DydxHttpClient::new(None, Some(30), None, false, None);
1424        assert!(client.is_ok());
1425
1426        let client = client.unwrap();
1427        assert!(!client.is_testnet());
1428        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1429        assert!(!client.is_cache_initialized());
1430        assert_eq!(client.cached_instruments_count(), 0);
1431    }
1432
1433    #[tokio::test]
1434    async fn test_domain_client_testnet() {
1435        let client = DydxHttpClient::new(None, Some(30), None, true, None);
1436        assert!(client.is_ok());
1437
1438        let client = client.unwrap();
1439        assert!(client.is_testnet());
1440        assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1441    }
1442
1443    #[tokio::test]
1444    async fn test_domain_client_default() {
1445        let client = DydxHttpClient::default();
1446        assert!(!client.is_testnet());
1447        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1448        assert!(!client.is_cache_initialized());
1449    }
1450
1451    #[tokio::test]
1452    async fn test_domain_client_clone() {
1453        let client = DydxHttpClient::new(None, Some(30), None, false, None).unwrap();
1454
1455        // Clone before initialization
1456        let cloned = client.clone();
1457        assert!(!cloned.is_cache_initialized());
1458
1459        // Simulate cache initialization
1460        client.cache_initialized.store(true, Ordering::Release);
1461
1462        // Clone after initialization
1463        #[allow(clippy::redundant_clone)]
1464        let cloned_after = client.clone();
1465        assert!(cloned_after.is_cache_initialized());
1466    }
1467
1468    #[rstest]
1469    fn test_domain_client_cache_instrument() {
1470        use nautilus_model::{
1471            identifiers::{InstrumentId, Symbol},
1472            instruments::CryptoPerpetual,
1473            types::{Currency, Price, Quantity},
1474        };
1475
1476        let client = DydxHttpClient::default();
1477        assert_eq!(client.cached_instruments_count(), 0);
1478
1479        // Create a test instrument
1480        let instrument_id =
1481            InstrumentId::new(Symbol::from("BTC-USD"), *crate::common::consts::DYDX_VENUE);
1482        let price = Price::from("1.0");
1483        let size = Quantity::from("0.001");
1484        let instrument = CryptoPerpetual::new(
1485            instrument_id,
1486            Symbol::from("BTC-USD"),
1487            Currency::BTC(),
1488            Currency::USD(),
1489            Currency::USD(),
1490            false,
1491            price.precision,
1492            size.precision,
1493            price,
1494            size,
1495            None,
1496            None,
1497            None,
1498            None,
1499            None,
1500            None,
1501            None,
1502            None,
1503            None,
1504            None,
1505            None,
1506            None,
1507            UnixNanos::default(),
1508            UnixNanos::default(),
1509        );
1510
1511        // Cache the instrument
1512        client.cache_instrument(InstrumentAny::CryptoPerpetual(instrument));
1513        assert_eq!(client.cached_instruments_count(), 1);
1514        assert!(client.is_cache_initialized());
1515
1516        // Retrieve it
1517        let btc_usd = Ustr::from("BTC-USD");
1518        let cached = client.get_instrument(&btc_usd);
1519        assert!(cached.is_some());
1520    }
1521
1522    #[rstest]
1523    fn test_domain_client_get_instrument_not_found() {
1524        let client = DydxHttpClient::default();
1525        let eth_usd = Ustr::from("ETH-USD");
1526        let result = client.get_instrument(&eth_usd);
1527        assert!(result.is_none());
1528    }
1529
1530    #[tokio::test]
1531    async fn test_http_timeout_respects_configuration_and_does_not_block() {
1532        use axum::{Router, routing::get};
1533        use tokio::net::TcpListener;
1534
1535        async fn slow_handler() -> &'static str {
1536            // Sleep longer than the configured HTTP timeout.
1537            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1538            "ok"
1539        }
1540
1541        let router = Router::new().route("/v4/slow", get(slow_handler));
1542
1543        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1544        let addr = listener.local_addr().unwrap();
1545
1546        tokio::spawn(async move {
1547            axum::serve(listener, router.into_make_service())
1548                .await
1549                .unwrap();
1550        });
1551
1552        let base_url = format!("http://{addr}");
1553
1554        // Configure a small operation timeout and no retries so the request
1555        // fails quickly even though the handler sleeps for 5 seconds.
1556        let retry_config = RetryConfig {
1557            max_retries: 0,
1558            initial_delay_ms: 0,
1559            max_delay_ms: 0,
1560            backoff_factor: 1.0,
1561            jitter_ms: 0,
1562            operation_timeout_ms: Some(500),
1563            immediate_first: true,
1564            max_elapsed_ms: Some(1_000),
1565        };
1566
1567        // Keep HTTP client timeout at a typical value; rely on RetryManager
1568        // operation timeout to enforce non-blocking behavior.
1569        let client =
1570            DydxRawHttpClient::new(Some(base_url), Some(60), None, false, Some(retry_config))
1571                .unwrap();
1572
1573        let start = std::time::Instant::now();
1574        let result: Result<serde_json::Value, error::DydxHttpError> =
1575            client.send_request(Method::GET, "/v4/slow", None).await;
1576        let elapsed = start.elapsed();
1577
1578        // Request should fail (timeout or client error), but without blocking the thread
1579        // for the full handler duration.
1580        assert!(result.is_err());
1581        assert!(elapsed < std::time::Duration::from_secs(3));
1582    }
1583}