nautilus_bitmex/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the HTTP client integration for the [BitMEX](https://bitmex.com) REST API.
17//!
18//! This module defines and implements a [`BitmexHttpClient`] for
19//! sending requests to various BitMEX endpoints. It handles request signing
20//! (when credentials are provided), constructs valid HTTP requests
21//! using the [`HttpClient`], and parses the responses back into structured data or a [`BitmexHttpError`].
22//!
23//! BitMEX API reference <https://www.bitmex.com/api/explorer/#/default>.
24
25use std::{
26    collections::HashMap,
27    num::NonZeroU32,
28    sync::{
29        Arc, LazyLock,
30        atomic::{AtomicBool, Ordering},
31    },
32};
33
34use chrono::{DateTime, Utc};
35use dashmap::DashMap;
36use nautilus_core::{
37    UnixNanos,
38    consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
39    env::get_or_env_var_opt,
40    time::get_atomic_clock_realtime,
41};
42use nautilus_model::{
43    data::{Bar, BarType, TradeTick},
44    enums::{
45        AggregationSource, BarAggregation, ContingencyType, OrderSide, OrderType, PriceType,
46        TimeInForce, TriggerType,
47    },
48    events::AccountState,
49    identifiers::{AccountId, ClientOrderId, InstrumentId, OrderListId, VenueOrderId},
50    instruments::{Instrument as InstrumentTrait, InstrumentAny},
51    reports::{FillReport, OrderStatusReport, PositionStatusReport},
52    types::{Price, Quantity},
53};
54use nautilus_network::{
55    http::{HttpClient, Method, StatusCode, USER_AGENT},
56    ratelimiter::quota::Quota,
57    retry::{RetryConfig, RetryManager},
58};
59use serde::{Deserialize, Serialize, de::DeserializeOwned};
60use serde_json::Value;
61use tokio_util::sync::CancellationToken;
62use ustr::Ustr;
63
64use super::{
65    error::{BitmexErrorResponse, BitmexHttpError},
66    models::{
67        BitmexApiInfo, BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder,
68        BitmexPosition, BitmexTrade, BitmexTradeBin, BitmexWallet,
69    },
70    query::{
71        DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
72        GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeBucketedParams,
73        GetTradeBucketedParamsBuilder, GetTradeParams, GetTradeParamsBuilder, PostOrderParams,
74        PostPositionLeverageParams, PutOrderParams,
75    },
76};
77use crate::{
78    common::{
79        consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL},
80        credential::Credential,
81        enums::{BitmexContingencyType, BitmexOrderStatus, BitmexSide},
82        parse::{parse_account_state, quantity_to_u32},
83    },
84    http::{
85        parse::{
86            InstrumentParseResult, parse_fill_report, parse_instrument_any,
87            parse_order_status_report, parse_position_report, parse_trade, parse_trade_bin,
88        },
89        query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
90    },
91    websocket::messages::BitmexMarginMsg,
92};
93
94/// Default BitMEX REST API rate limits.
95///
96/// BitMEX implements a dual-layer rate limiting system:
97/// - Primary limit: 120 requests per minute for authenticated users (30 for unauthenticated).
98/// - Secondary limit: 10 requests per second burst limit for specific endpoints.
99const BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 10;
100const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED: u32 = 120;
101const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED: u32 = 30;
102
103const BITMEX_GLOBAL_RATE_KEY: &str = "bitmex:global";
104const BITMEX_MINUTE_RATE_KEY: &str = "bitmex:minute";
105
106static RATE_LIMIT_KEYS: LazyLock<Vec<Ustr>> = LazyLock::new(|| {
107    vec![
108        Ustr::from(BITMEX_GLOBAL_RATE_KEY),
109        Ustr::from(BITMEX_MINUTE_RATE_KEY),
110    ]
111});
112
113/// Represents a BitMEX HTTP response.
114#[derive(Debug, Serialize, Deserialize)]
115pub struct BitmexResponse<T> {
116    /// The typed data returned by the BitMEX endpoint.
117    pub data: Vec<T>,
118}
119
120/// Provides a lower-level HTTP client for connecting to the [BitMEX](https://bitmex.com) REST API.
121///
122/// This client wraps the underlying [`HttpClient`] to handle functionality
123/// specific to BitMEX, such as request signing (for authenticated endpoints),
124/// forming request URLs, and deserializing responses into specific data models.
125///
126/// # Connection Management
127///
128/// The client uses HTTP keep-alive for connection pooling with a 90-second idle timeout,
129/// which matches BitMEX's server-side keep-alive timeout. Connections are automatically
130/// reused for subsequent requests to minimize latency.
131///
132/// # Rate Limiting
133///
134/// BitMEX enforces the following rate limits:
135/// - 120 requests per minute for authenticated users (30 for unauthenticated).
136/// - 10 requests per second burst limit for certain endpoints (order management).
137///
138/// The client automatically respects these limits through the configured quota.
139#[derive(Debug, Clone)]
140pub struct BitmexRawHttpClient {
141    base_url: String,
142    client: HttpClient,
143    credential: Option<Credential>,
144    recv_window_ms: u64,
145    retry_manager: RetryManager<BitmexHttpError>,
146    cancellation_token: CancellationToken,
147}
148
149impl Default for BitmexRawHttpClient {
150    fn default() -> Self {
151        Self::new(None, Some(60), None, None, None, None, None, None, None)
152            .expect("Failed to create default BitmexHttpInnerClient")
153    }
154}
155
156impl BitmexRawHttpClient {
157    /// Creates a new [`BitmexRawHttpClient`] using the default BitMEX HTTP URL,
158    /// optionally overridden with a custom base URL.
159    ///
160    /// This version of the client has **no credentials**, so it can only
161    /// call publicly accessible endpoints.
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if the retry manager cannot be created.
166    #[allow(clippy::too_many_arguments)]
167    pub fn new(
168        base_url: Option<String>,
169        timeout_secs: Option<u64>,
170        max_retries: Option<u32>,
171        retry_delay_ms: Option<u64>,
172        retry_delay_max_ms: Option<u64>,
173        recv_window_ms: Option<u64>,
174        max_requests_per_second: Option<u32>,
175        max_requests_per_minute: Option<u32>,
176        proxy_url: Option<String>,
177    ) -> Result<Self, BitmexHttpError> {
178        let retry_config = RetryConfig {
179            max_retries: max_retries.unwrap_or(3),
180            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
181            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
182            backoff_factor: 2.0,
183            jitter_ms: 1000,
184            operation_timeout_ms: Some(60_000),
185            immediate_first: false,
186            max_elapsed_ms: Some(180_000),
187        };
188
189        let retry_manager = RetryManager::new(retry_config);
190
191        let max_req_per_sec =
192            max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
193        let max_req_per_min =
194            max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED);
195
196        Ok(Self {
197            base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
198            client: HttpClient::new(
199                Self::default_headers(),
200                vec![],
201                Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
202                Some(Self::default_quota(max_req_per_sec)),
203                timeout_secs,
204                proxy_url,
205            )
206            .map_err(|e| {
207                BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
208            })?,
209            credential: None,
210            recv_window_ms: recv_window_ms.unwrap_or(10_000),
211            retry_manager,
212            cancellation_token: CancellationToken::new(),
213        })
214    }
215
216    /// Creates a new [`BitmexRawHttpClient`] configured with credentials
217    /// for authenticated requests, optionally using a custom base URL.
218    ///
219    /// # Errors
220    ///
221    /// Returns an error if the retry manager cannot be created.
222    #[allow(clippy::too_many_arguments)]
223    pub fn with_credentials(
224        api_key: String,
225        api_secret: String,
226        base_url: String,
227        timeout_secs: Option<u64>,
228        max_retries: Option<u32>,
229        retry_delay_ms: Option<u64>,
230        retry_delay_max_ms: Option<u64>,
231        recv_window_ms: Option<u64>,
232        max_requests_per_second: Option<u32>,
233        max_requests_per_minute: Option<u32>,
234        proxy_url: Option<String>,
235    ) -> Result<Self, BitmexHttpError> {
236        let retry_config = RetryConfig {
237            max_retries: max_retries.unwrap_or(3),
238            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
239            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
240            backoff_factor: 2.0,
241            jitter_ms: 1000,
242            operation_timeout_ms: Some(60_000),
243            immediate_first: false,
244            max_elapsed_ms: Some(180_000),
245        };
246
247        let retry_manager = RetryManager::new(retry_config);
248
249        let max_req_per_sec =
250            max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
251        let max_req_per_min =
252            max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED);
253
254        Ok(Self {
255            base_url,
256            client: HttpClient::new(
257                Self::default_headers(),
258                vec![],
259                Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
260                Some(Self::default_quota(max_req_per_sec)),
261                timeout_secs,
262                proxy_url,
263            )
264            .map_err(|e| {
265                BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
266            })?,
267            credential: Some(Credential::new(api_key, api_secret)),
268            recv_window_ms: recv_window_ms.unwrap_or(10_000),
269            retry_manager,
270            cancellation_token: CancellationToken::new(),
271        })
272    }
273
274    fn default_headers() -> HashMap<String, String> {
275        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
276    }
277
278    fn default_quota(max_requests_per_second: u32) -> Quota {
279        Quota::per_second(
280            NonZeroU32::new(max_requests_per_second)
281                .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
282        )
283    }
284
285    fn rate_limiter_quotas(
286        max_requests_per_second: u32,
287        max_requests_per_minute: u32,
288    ) -> Vec<(String, Quota)> {
289        let per_sec_quota = Quota::per_second(
290            NonZeroU32::new(max_requests_per_second)
291                .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
292        );
293        let per_min_quota =
294            Quota::per_minute(NonZeroU32::new(max_requests_per_minute).unwrap_or_else(|| {
295                NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED).unwrap()
296            }));
297
298        vec![
299            (BITMEX_GLOBAL_RATE_KEY.to_string(), per_sec_quota),
300            (BITMEX_MINUTE_RATE_KEY.to_string(), per_min_quota),
301        ]
302    }
303
304    fn rate_limit_keys() -> Vec<Ustr> {
305        RATE_LIMIT_KEYS.clone()
306    }
307
308    /// Cancel all pending HTTP requests.
309    pub fn cancel_all_requests(&self) {
310        self.cancellation_token.cancel();
311    }
312
313    /// Get the cancellation token for this client.
314    pub fn cancellation_token(&self) -> &CancellationToken {
315        &self.cancellation_token
316    }
317
318    fn sign_request(
319        &self,
320        method: &Method,
321        endpoint: &str,
322        body: Option<&[u8]>,
323    ) -> Result<HashMap<String, String>, BitmexHttpError> {
324        let credential = self
325            .credential
326            .as_ref()
327            .ok_or(BitmexHttpError::MissingCredentials)?;
328
329        let expires = Utc::now().timestamp() + (self.recv_window_ms / 1000) as i64;
330        let body_str = body.and_then(|b| std::str::from_utf8(b).ok()).unwrap_or("");
331
332        let full_path = if endpoint.starts_with("/api/v1") {
333            endpoint.to_string()
334        } else {
335            format!("/api/v1{endpoint}")
336        };
337
338        let signature = credential.sign(method.as_str(), &full_path, expires, body_str);
339
340        let mut headers = HashMap::new();
341        headers.insert("api-expires".to_string(), expires.to_string());
342        headers.insert("api-key".to_string(), credential.api_key.to_string());
343        headers.insert("api-signature".to_string(), signature);
344
345        // Add Content-Type header for form-encoded body
346        if body.is_some()
347            && (*method == Method::POST || *method == Method::PUT || *method == Method::DELETE)
348        {
349            headers.insert(
350                "Content-Type".to_string(),
351                "application/x-www-form-urlencoded".to_string(),
352            );
353        }
354
355        Ok(headers)
356    }
357
358    async fn send_request<T: DeserializeOwned, P: Serialize>(
359        &self,
360        method: Method,
361        endpoint: &str,
362        params: Option<&P>,
363        body: Option<Vec<u8>>,
364        authenticate: bool,
365    ) -> Result<T, BitmexHttpError> {
366        let endpoint = endpoint.to_string();
367        let method_clone = method.clone();
368        let body_clone = body.clone();
369
370        // Serialize params before closure to avoid reference lifetime issues
371        // Query params are used with GET and DELETE methods
372        let params_str = if method == Method::GET || method == Method::DELETE {
373            params
374                .map(serde_urlencoded::to_string)
375                .transpose()
376                .map_err(|e| {
377                    BitmexHttpError::JsonError(format!("Failed to serialize params: {e}"))
378                })?
379        } else {
380            None
381        };
382
383        let full_endpoint = match params_str {
384            Some(ref query) if !query.is_empty() => format!("{endpoint}?{query}"),
385            _ => endpoint.clone(),
386        };
387
388        let url = format!("{}{}", self.base_url, full_endpoint);
389
390        let operation = || {
391            let url = url.clone();
392            let method = method_clone.clone();
393            let body = body_clone.clone();
394            let full_endpoint = full_endpoint.clone();
395
396            async move {
397                let headers = if authenticate {
398                    Some(self.sign_request(&method, &full_endpoint, body.as_deref())?)
399                } else {
400                    None
401                };
402
403                let rate_keys = Self::rate_limit_keys();
404                let resp = self
405                    .client
406                    .request_with_ustr_keys(method, url, None, headers, body, None, Some(rate_keys))
407                    .await?;
408
409                if resp.status.is_success() {
410                    serde_json::from_slice(&resp.body).map_err(Into::into)
411                } else if let Ok(error_resp) =
412                    serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
413                {
414                    Err(error_resp.into())
415                } else {
416                    Err(BitmexHttpError::UnexpectedStatus {
417                        status: StatusCode::from_u16(resp.status.as_u16())
418                            .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
419                        body: String::from_utf8_lossy(&resp.body).to_string(),
420                    })
421                }
422            }
423        };
424
425        // Retry strategy based on BitMEX error responses and HTTP status codes:
426        //
427        // 1. Network errors: always retry (transient connection issues).
428        // 2. HTTP 5xx/429: server errors and rate limiting should be retried.
429        // 3. BitMEX JSON errors with specific handling:
430        //    - "RateLimitError": explicit rate limit error from BitMEX.
431        //    - "HTTPError": generic error name used by BitMEX for various issues
432        //      Only retry if message contains "rate limit" to avoid retrying
433        //      non-transient errors like authentication failures, validation errors,
434        //      insufficient balance, etc. which also return as "HTTPError".
435        //
436        // Note: BitMEX returns many permanent errors as "HTTPError" (e.g., "Invalid orderQty",
437        // "Account has insufficient Available Balance", "Invalid API Key") which should NOT
438        // be retried. We only retry when the message explicitly mentions rate limiting.
439        //
440        // See tests in tests/http.rs for retry behavior validation.
441        let should_retry = |error: &BitmexHttpError| -> bool {
442            match error {
443                BitmexHttpError::NetworkError(_) => true,
444                BitmexHttpError::UnexpectedStatus { status, .. } => {
445                    status.as_u16() >= 500 || status.as_u16() == 429
446                }
447                BitmexHttpError::BitmexError {
448                    error_name,
449                    message,
450                } => {
451                    error_name == "RateLimitError"
452                        || (error_name == "HTTPError"
453                            && message.to_lowercase().contains("rate limit"))
454                }
455                _ => false,
456            }
457        };
458
459        let create_error = |msg: String| -> BitmexHttpError {
460            if msg == "canceled" {
461                BitmexHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
462            } else {
463                BitmexHttpError::NetworkError(msg)
464            }
465        };
466
467        self.retry_manager
468            .execute_with_retry_with_cancel(
469                endpoint.as_str(),
470                operation,
471                should_retry,
472                create_error,
473                &self.cancellation_token,
474            )
475            .await
476    }
477
478    /// Get all instruments.
479    ///
480    /// # Errors
481    ///
482    /// Returns an error if the request fails, the response cannot be parsed, or the API returns an error.
483    pub async fn get_instruments(
484        &self,
485        active_only: bool,
486    ) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
487        let path = if active_only {
488            "/instrument/active"
489        } else {
490            "/instrument"
491        };
492        self.send_request::<_, ()>(Method::GET, path, None, None, false)
493            .await
494    }
495
496    /// Requests the current server time from BitMEX.
497    ///
498    /// Retrieves the BitMEX API info including the system time in Unix timestamp (milliseconds).
499    /// This is useful for synchronizing local clocks with the exchange server and logging time drift.
500    ///
501    /// # Errors
502    ///
503    /// Returns an error if the HTTP request fails or if the response body
504    /// cannot be parsed into [`BitmexApiInfo`].
505    pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
506        let response: BitmexApiInfo = self
507            .send_request::<_, ()>(Method::GET, "", None, None, false)
508            .await?;
509        Ok(response.timestamp)
510    }
511
512    /// Get the instrument definition for the specified symbol.
513    ///
514    /// BitMEX responds to `/instrument?symbol=...` with an array, even when
515    /// a single symbol is requested. This helper returns the first element of
516    /// that array and yields `Ok(None)` when the venue returns an empty list
517    /// (e.g. unknown symbol).
518    ///
519    /// # Errors
520    ///
521    /// Returns an error if the request fails or the payload cannot be deserialized.
522    pub async fn get_instrument(
523        &self,
524        symbol: &str,
525    ) -> Result<Option<BitmexInstrument>, BitmexHttpError> {
526        let path = &format!("/instrument?symbol={symbol}");
527        let instruments: Vec<BitmexInstrument> = self
528            .send_request::<_, ()>(Method::GET, path, None, None, false)
529            .await?;
530
531        Ok(instruments.into_iter().next())
532    }
533
534    /// Get user wallet information.
535    ///
536    /// # Errors
537    ///
538    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
539    pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
540        let endpoint = "/user/wallet";
541        self.send_request::<_, ()>(Method::GET, endpoint, None, None, true)
542            .await
543    }
544
545    /// Get user margin information.
546    ///
547    /// # Errors
548    ///
549    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
550    pub async fn get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
551        let path = format!("/user/margin?currency={currency}");
552        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
553            .await
554    }
555
556    /// Get historical trades.
557    ///
558    /// # Errors
559    ///
560    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
561    ///
562    /// # Panics
563    ///
564    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
565    pub async fn get_trades(
566        &self,
567        params: GetTradeParams,
568    ) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
569        self.send_request(Method::GET, "/trade", Some(&params), None, true)
570            .await
571    }
572
573    /// Get bucketed (aggregated) trade data.
574    ///
575    /// # Errors
576    ///
577    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
578    pub async fn get_trade_bucketed(
579        &self,
580        params: GetTradeBucketedParams,
581    ) -> Result<Vec<BitmexTradeBin>, BitmexHttpError> {
582        self.send_request(Method::GET, "/trade/bucketed", Some(&params), None, true)
583            .await
584    }
585
586    /// Get user orders.
587    ///
588    /// # Errors
589    ///
590    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
591    ///
592    /// # Panics
593    ///
594    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
595    pub async fn get_orders(
596        &self,
597        params: GetOrderParams,
598    ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
599        self.send_request(Method::GET, "/order", Some(&params), None, true)
600            .await
601    }
602
603    /// Place a new order.
604    ///
605    /// # Errors
606    ///
607    /// Returns an error if credentials are missing, the request fails, order validation fails, or the API returns an error.
608    ///
609    /// # Panics
610    ///
611    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
612    pub async fn place_order(&self, params: PostOrderParams) -> Result<Value, BitmexHttpError> {
613        // BitMEX spec requires form-encoded body for POST /order
614        let body = serde_urlencoded::to_string(&params)
615            .map_err(|e| {
616                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
617            })?
618            .into_bytes();
619        let path = "/order";
620        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
621            .await
622    }
623
624    /// Cancel user orders.
625    ///
626    /// # Errors
627    ///
628    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
629    ///
630    /// # Panics
631    ///
632    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
633    pub async fn cancel_orders(&self, params: DeleteOrderParams) -> Result<Value, BitmexHttpError> {
634        // BitMEX spec requires form-encoded body for DELETE /order
635        let body = serde_urlencoded::to_string(&params)
636            .map_err(|e| {
637                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
638            })?
639            .into_bytes();
640        let path = "/order";
641        self.send_request::<_, ()>(Method::DELETE, path, None, Some(body), true)
642            .await
643    }
644
645    /// Amend an existing order.
646    ///
647    /// # Errors
648    ///
649    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
650    ///
651    /// # Panics
652    ///
653    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
654    pub async fn amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
655        // BitMEX spec requires form-encoded body for PUT /order
656        let body = serde_urlencoded::to_string(&params)
657            .map_err(|e| {
658                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
659            })?
660            .into_bytes();
661        let path = "/order";
662        self.send_request::<_, ()>(Method::PUT, path, None, Some(body), true)
663            .await
664    }
665
666    /// Cancel all orders.
667    ///
668    /// # Errors
669    ///
670    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
671    ///
672    /// # Panics
673    ///
674    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
675    ///
676    /// # References
677    ///
678    /// <https://www.bitmex.com/api/explorer/#!/Order/Order_cancelAll>
679    pub async fn cancel_all_orders(
680        &self,
681        params: DeleteAllOrdersParams,
682    ) -> Result<Value, BitmexHttpError> {
683        self.send_request(Method::DELETE, "/order/all", Some(&params), None, true)
684            .await
685    }
686
687    /// Get user executions.
688    ///
689    /// # Errors
690    ///
691    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
692    ///
693    /// # Panics
694    ///
695    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
696    pub async fn get_executions(
697        &self,
698        params: GetExecutionParams,
699    ) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
700        let query = serde_urlencoded::to_string(&params).map_err(|e| {
701            BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
702        })?;
703        let path = format!("/execution/tradeHistory?{query}");
704        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
705            .await
706    }
707
708    /// Get user positions.
709    ///
710    /// # Errors
711    ///
712    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
713    ///
714    /// # Panics
715    ///
716    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
717    pub async fn get_positions(
718        &self,
719        params: GetPositionParams,
720    ) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
721        self.send_request(Method::GET, "/position", Some(&params), None, true)
722            .await
723    }
724
725    /// Update position leverage.
726    ///
727    /// # Errors
728    ///
729    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
730    ///
731    /// # Panics
732    ///
733    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
734    pub async fn update_position_leverage(
735        &self,
736        params: PostPositionLeverageParams,
737    ) -> Result<BitmexPosition, BitmexHttpError> {
738        // BitMEX spec requires form-encoded body for POST endpoints
739        let body = serde_urlencoded::to_string(&params)
740            .map_err(|e| {
741                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
742            })?
743            .into_bytes();
744        let path = "/position/leverage";
745        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
746            .await
747    }
748}
749
750/// Provides a HTTP client for connecting to the [BitMEX](https://bitmex.com) REST API.
751///
752/// This is the high-level client that wraps the inner client and provides
753/// Nautilus-specific functionality for trading operations.
754#[derive(Debug)]
755#[cfg_attr(
756    feature = "python",
757    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.bitmex")
758)]
759pub struct BitmexHttpClient {
760    inner: Arc<BitmexRawHttpClient>,
761    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
762    cache_initialized: AtomicBool,
763}
764
765impl Clone for BitmexHttpClient {
766    fn clone(&self) -> Self {
767        let cache_initialized = AtomicBool::new(false);
768
769        let is_initialized = self.cache_initialized.load(Ordering::Acquire);
770        if is_initialized {
771            cache_initialized.store(true, Ordering::Release);
772        }
773
774        Self {
775            inner: self.inner.clone(),
776            instruments_cache: self.instruments_cache.clone(),
777            cache_initialized,
778        }
779    }
780}
781
782impl Default for BitmexHttpClient {
783    fn default() -> Self {
784        Self::new(
785            None,
786            None,
787            None,
788            false,
789            Some(60),
790            None,
791            None,
792            None,
793            None,
794            None,
795            None,
796            None, // proxy_url
797        )
798        .expect("Failed to create default BitmexHttpClient")
799    }
800}
801
802impl BitmexHttpClient {
803    /// Creates a new [`BitmexHttpClient`] instance.
804    ///
805    /// # Errors
806    ///
807    /// Returns an error if the HTTP client cannot be created.
808    #[allow(clippy::too_many_arguments)]
809    pub fn new(
810        base_url: Option<String>,
811        api_key: Option<String>,
812        api_secret: Option<String>,
813        testnet: bool,
814        timeout_secs: Option<u64>,
815        max_retries: Option<u32>,
816        retry_delay_ms: Option<u64>,
817        retry_delay_max_ms: Option<u64>,
818        recv_window_ms: Option<u64>,
819        max_requests_per_second: Option<u32>,
820        max_requests_per_minute: Option<u32>,
821        proxy_url: Option<String>,
822    ) -> Result<Self, BitmexHttpError> {
823        // Determine the base URL
824        let url = base_url.unwrap_or_else(|| {
825            if testnet {
826                BITMEX_HTTP_TESTNET_URL.to_string()
827            } else {
828                BITMEX_HTTP_URL.to_string()
829            }
830        });
831
832        let inner = match (api_key, api_secret) {
833            (Some(key), Some(secret)) => BitmexRawHttpClient::with_credentials(
834                key,
835                secret,
836                url,
837                timeout_secs,
838                max_retries,
839                retry_delay_ms,
840                retry_delay_max_ms,
841                recv_window_ms,
842                max_requests_per_second,
843                max_requests_per_minute,
844                proxy_url,
845            )?,
846            _ => BitmexRawHttpClient::new(
847                Some(url),
848                timeout_secs,
849                max_retries,
850                retry_delay_ms,
851                retry_delay_max_ms,
852                recv_window_ms,
853                max_requests_per_second,
854                max_requests_per_minute,
855                proxy_url,
856            )?,
857        };
858
859        Ok(Self {
860            inner: Arc::new(inner),
861            instruments_cache: Arc::new(DashMap::new()),
862            cache_initialized: AtomicBool::new(false),
863        })
864    }
865
866    /// Creates a new [`BitmexHttpClient`] instance using environment variables and
867    /// the default BitMEX HTTP base URL.
868    ///
869    /// # Errors
870    ///
871    /// Returns an error if required environment variables are not set or invalid.
872    pub fn from_env() -> anyhow::Result<Self> {
873        Self::with_credentials(
874            None, None, None, None, None, None, None, None, None, None, None,
875        )
876        .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
877    }
878
879    /// Creates a new [`BitmexHttpClient`] configured with credentials
880    /// for authenticated requests.
881    ///
882    /// If `api_key` or `api_secret` are `None`, they will be sourced from the
883    /// `BITMEX_API_KEY` and `BITMEX_API_SECRET` environment variables.
884    ///
885    /// # Errors
886    ///
887    /// Returns an error if one credential is provided without the other.
888    #[allow(clippy::too_many_arguments)]
889    pub fn with_credentials(
890        api_key: Option<String>,
891        api_secret: Option<String>,
892        base_url: Option<String>,
893        timeout_secs: Option<u64>,
894        max_retries: Option<u32>,
895        retry_delay_ms: Option<u64>,
896        retry_delay_max_ms: Option<u64>,
897        recv_window_ms: Option<u64>,
898        max_requests_per_second: Option<u32>,
899        max_requests_per_minute: Option<u32>,
900        proxy_url: Option<String>,
901    ) -> anyhow::Result<Self> {
902        // Determine testnet from URL first to select correct environment variables
903        let testnet = base_url.as_ref().is_some_and(|url| url.contains("testnet"));
904
905        // Choose environment variables based on testnet flag
906        let (key_var, secret_var) = if testnet {
907            ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
908        } else {
909            ("BITMEX_API_KEY", "BITMEX_API_SECRET")
910        };
911
912        let api_key = get_or_env_var_opt(api_key, key_var);
913        let api_secret = get_or_env_var_opt(api_secret, secret_var);
914
915        // If we're trying to create an authenticated client, we need both key and secret
916        if api_key.is_some() && api_secret.is_none() {
917            anyhow::bail!("{secret_var} is required when {key_var} is provided");
918        }
919        if api_key.is_none() && api_secret.is_some() {
920            anyhow::bail!("{key_var} is required when {secret_var} is provided");
921        }
922
923        Self::new(
924            base_url,
925            api_key,
926            api_secret,
927            testnet,
928            timeout_secs,
929            max_retries,
930            retry_delay_ms,
931            retry_delay_max_ms,
932            recv_window_ms,
933            max_requests_per_second,
934            max_requests_per_minute,
935            proxy_url,
936        )
937        .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
938    }
939
940    /// Returns the base url being used by the client.
941    #[must_use]
942    pub fn base_url(&self) -> &str {
943        self.inner.base_url.as_str()
944    }
945
946    /// Returns the public API key being used by the client.
947    #[must_use]
948    pub fn api_key(&self) -> Option<&str> {
949        self.inner.credential.as_ref().map(|c| c.api_key.as_str())
950    }
951
952    /// Returns a masked version of the API key for logging purposes.
953    #[must_use]
954    pub fn api_key_masked(&self) -> Option<String> {
955        self.inner.credential.as_ref().map(|c| c.api_key_masked())
956    }
957
958    /// Requests the current server time from BitMEX.
959    ///
960    /// Returns the BitMEX system time as a Unix timestamp in milliseconds.
961    ///
962    /// # Errors
963    ///
964    /// Returns an error if the HTTP request fails or if the response cannot be parsed.
965    pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
966        self.inner.get_server_time().await
967    }
968
969    /// Generates a timestamp for initialization.
970    fn generate_ts_init(&self) -> UnixNanos {
971        get_atomic_clock_realtime().get_time_ns()
972    }
973
974    /// Check if the order has a contingency type that requires linking.
975    fn is_contingent_order(contingency_type: ContingencyType) -> bool {
976        matches!(
977            contingency_type,
978            ContingencyType::Oco | ContingencyType::Oto | ContingencyType::Ouo
979        )
980    }
981
982    /// Check if the order is a parent in contingency relationships.
983    fn is_parent_contingency(contingency_type: ContingencyType) -> bool {
984        matches!(
985            contingency_type,
986            ContingencyType::Oco | ContingencyType::Oto
987        )
988    }
989
990    /// Populate missing `linked_order_ids` for contingency orders by grouping on `order_list_id`.
991    fn populate_linked_order_ids(reports: &mut [OrderStatusReport]) {
992        let mut order_list_groups: HashMap<OrderListId, Vec<ClientOrderId>> = HashMap::new();
993        let mut order_list_parents: HashMap<OrderListId, ClientOrderId> = HashMap::new();
994        let mut prefix_groups: HashMap<String, Vec<ClientOrderId>> = HashMap::new();
995        let mut prefix_parents: HashMap<String, ClientOrderId> = HashMap::new();
996
997        for report in reports.iter() {
998            let Some(client_order_id) = report.client_order_id else {
999                continue;
1000            };
1001
1002            if let Some(order_list_id) = report.order_list_id {
1003                order_list_groups
1004                    .entry(order_list_id)
1005                    .or_default()
1006                    .push(client_order_id);
1007
1008                if Self::is_parent_contingency(report.contingency_type) {
1009                    order_list_parents
1010                        .entry(order_list_id)
1011                        .or_insert(client_order_id);
1012                }
1013            }
1014
1015            if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1016                && Self::is_contingent_order(report.contingency_type)
1017            {
1018                prefix_groups
1019                    .entry(base.to_owned())
1020                    .or_default()
1021                    .push(client_order_id);
1022
1023                if Self::is_parent_contingency(report.contingency_type) {
1024                    prefix_parents
1025                        .entry(base.to_owned())
1026                        .or_insert(client_order_id);
1027                }
1028            }
1029        }
1030
1031        for report in reports.iter_mut() {
1032            let Some(client_order_id) = report.client_order_id else {
1033                continue;
1034            };
1035
1036            if report.linked_order_ids.is_some() {
1037                continue;
1038            }
1039
1040            // Only process contingent orders
1041            if !Self::is_contingent_order(report.contingency_type) {
1042                continue;
1043            }
1044
1045            if let Some(order_list_id) = report.order_list_id
1046                && let Some(group) = order_list_groups.get(&order_list_id)
1047            {
1048                let mut linked: Vec<ClientOrderId> = group
1049                    .iter()
1050                    .copied()
1051                    .filter(|candidate| candidate != &client_order_id)
1052                    .collect();
1053
1054                if !linked.is_empty() {
1055                    if let Some(parent_id) = order_list_parents.get(&order_list_id) {
1056                        if client_order_id != *parent_id {
1057                            linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1058                            report.parent_order_id = Some(*parent_id);
1059                        } else {
1060                            report.parent_order_id = None;
1061                        }
1062                    } else {
1063                        report.parent_order_id = None;
1064                    }
1065
1066                    tracing::trace!(
1067                        client_order_id = ?client_order_id,
1068                        order_list_id = ?order_list_id,
1069                        contingency_type = ?report.contingency_type,
1070                        linked_order_ids = ?linked,
1071                        "BitMEX linked ids sourced from order list id",
1072                    );
1073                    report.linked_order_ids = Some(linked);
1074                    continue;
1075                }
1076
1077                tracing::trace!(
1078                    client_order_id = ?client_order_id,
1079                    order_list_id = ?order_list_id,
1080                    contingency_type = ?report.contingency_type,
1081                    order_list_group = ?group,
1082                    "BitMEX order list id group had no peers",
1083                );
1084                report.parent_order_id = None;
1085            } else if report.order_list_id.is_none() {
1086                report.parent_order_id = None;
1087            }
1088
1089            if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1090                && let Some(group) = prefix_groups.get(base)
1091            {
1092                let mut linked: Vec<ClientOrderId> = group
1093                    .iter()
1094                    .copied()
1095                    .filter(|candidate| candidate != &client_order_id)
1096                    .collect();
1097
1098                if !linked.is_empty() {
1099                    if let Some(parent_id) = prefix_parents.get(base) {
1100                        if client_order_id != *parent_id {
1101                            linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1102                            report.parent_order_id = Some(*parent_id);
1103                        } else {
1104                            report.parent_order_id = None;
1105                        }
1106                    } else {
1107                        report.parent_order_id = None;
1108                    }
1109
1110                    tracing::trace!(
1111                        client_order_id = ?client_order_id,
1112                        contingency_type = ?report.contingency_type,
1113                        base = base,
1114                        linked_order_ids = ?linked,
1115                        "BitMEX linked ids constructed from client order id prefix",
1116                    );
1117                    report.linked_order_ids = Some(linked);
1118                    continue;
1119                }
1120
1121                tracing::trace!(
1122                    client_order_id = ?client_order_id,
1123                    contingency_type = ?report.contingency_type,
1124                    base = base,
1125                    prefix_group = ?group,
1126                    "BitMEX client order id prefix group had no peers",
1127                );
1128                report.parent_order_id = None;
1129            } else if client_order_id.as_str().contains('-') {
1130                report.parent_order_id = None;
1131            }
1132
1133            if Self::is_contingent_order(report.contingency_type) {
1134                tracing::warn!(
1135                    client_order_id = ?report.client_order_id,
1136                    order_list_id = ?report.order_list_id,
1137                    contingency_type = ?report.contingency_type,
1138                    "BitMEX order status report missing linked ids after grouping",
1139                );
1140                report.contingency_type = ContingencyType::NoContingency;
1141                report.parent_order_id = None;
1142            }
1143
1144            report.linked_order_ids = None;
1145        }
1146    }
1147
1148    /// Cancel all pending HTTP requests.
1149    pub fn cancel_all_requests(&self) {
1150        self.inner.cancel_all_requests();
1151    }
1152
1153    /// Get the cancellation token for this client.
1154    pub fn cancellation_token(&self) -> CancellationToken {
1155        self.inner.cancellation_token().clone()
1156    }
1157
1158    /// Caches a single instrument.
1159    ///
1160    /// Any existing instrument with the same symbol will be replaced.
1161    pub fn cache_instrument(&self, instrument: InstrumentAny) {
1162        self.instruments_cache
1163            .insert(instrument.raw_symbol().inner(), instrument);
1164        self.cache_initialized.store(true, Ordering::Release);
1165    }
1166
1167    /// Gets an instrument from the cache by symbol.
1168    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1169        self.instruments_cache
1170            .get(symbol)
1171            .map(|entry| entry.value().clone())
1172    }
1173
1174    /// Request a single instrument and parse it into a Nautilus type.
1175    ///
1176    /// # Errors
1177    ///
1178    /// Returns `Ok(Some(..))` when the venue returns a definition that parses
1179    /// successfully, `Ok(None)` when the instrument is unknown, unsupported, or the payload
1180    /// cannot be converted into a Nautilus `Instrument`.
1181    pub async fn request_instrument(
1182        &self,
1183        instrument_id: InstrumentId,
1184    ) -> anyhow::Result<Option<InstrumentAny>> {
1185        let response = self
1186            .inner
1187            .get_instrument(instrument_id.symbol.as_str())
1188            .await?;
1189
1190        let instrument = match response {
1191            Some(instrument) => instrument,
1192            None => return Ok(None),
1193        };
1194
1195        let ts_init = self.generate_ts_init();
1196
1197        match parse_instrument_any(&instrument, ts_init) {
1198            InstrumentParseResult::Ok(inst) => Ok(Some(*inst)),
1199            InstrumentParseResult::Unsupported {
1200                symbol,
1201                instrument_type,
1202            } => {
1203                tracing::debug!(
1204                    "Instrument {symbol} has unsupported type {instrument_type:?}, returning None"
1205                );
1206                Ok(None)
1207            }
1208            InstrumentParseResult::Inactive { symbol, state } => {
1209                tracing::debug!("Instrument {symbol} is inactive (state={state}), returning None");
1210                Ok(None)
1211            }
1212            InstrumentParseResult::Failed {
1213                symbol,
1214                instrument_type,
1215                error,
1216            } => {
1217                tracing::error!(
1218                    "Failed to parse instrument {symbol} (type={instrument_type:?}): {error}"
1219                );
1220                Ok(None)
1221            }
1222        }
1223    }
1224
1225    /// Request all available instruments and parse them into Nautilus types.
1226    ///
1227    /// # Errors
1228    ///
1229    /// Returns an error if the HTTP request fails or parsing fails.
1230    pub async fn request_instruments(
1231        &self,
1232        active_only: bool,
1233    ) -> anyhow::Result<Vec<InstrumentAny>> {
1234        let instruments = self.inner.get_instruments(active_only).await?;
1235        let ts_init = self.generate_ts_init();
1236
1237        let mut parsed_instruments = Vec::new();
1238        let mut skipped_count = 0;
1239        let mut inactive_count = 0;
1240        let mut failed_count = 0;
1241        let total_count = instruments.len();
1242
1243        for inst in instruments {
1244            match parse_instrument_any(&inst, ts_init) {
1245                InstrumentParseResult::Ok(instrument_any) => {
1246                    parsed_instruments.push(*instrument_any);
1247                }
1248                InstrumentParseResult::Unsupported {
1249                    symbol,
1250                    instrument_type,
1251                } => {
1252                    skipped_count += 1;
1253                    tracing::debug!(
1254                        "Skipping unsupported instrument type: symbol={symbol}, type={instrument_type:?}"
1255                    );
1256                }
1257                InstrumentParseResult::Inactive { symbol, state } => {
1258                    inactive_count += 1;
1259                    tracing::debug!("Skipping inactive instrument: symbol={symbol}, state={state}");
1260                }
1261                InstrumentParseResult::Failed {
1262                    symbol,
1263                    instrument_type,
1264                    error,
1265                } => {
1266                    failed_count += 1;
1267                    tracing::error!(
1268                        "Failed to parse instrument: symbol={symbol}, type={instrument_type:?}, error={error}"
1269                    );
1270                }
1271            }
1272        }
1273
1274        if skipped_count > 0 {
1275            tracing::info!(
1276                "Skipped {skipped_count} unsupported instrument type(s) out of {total_count} total"
1277            );
1278        }
1279
1280        if inactive_count > 0 {
1281            tracing::info!(
1282                "Skipped {inactive_count} inactive instrument(s) out of {total_count} total"
1283            );
1284        }
1285
1286        if failed_count > 0 {
1287            tracing::error!(
1288                "Instrument parse failures: {failed_count} failed out of {total_count} total ({} successfully parsed)",
1289                parsed_instruments.len()
1290            );
1291        }
1292
1293        Ok(parsed_instruments)
1294    }
1295
1296    /// Get user wallet information.
1297    ///
1298    /// # Errors
1299    ///
1300    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1301    ///
1302    /// # Panics
1303    ///
1304    /// Panics if the inner mutex is poisoned.
1305    pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
1306        let inner = self.inner.clone();
1307        inner.get_wallet().await
1308    }
1309
1310    /// Get user orders.
1311    ///
1312    /// # Errors
1313    ///
1314    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1315    ///
1316    /// # Panics
1317    ///
1318    /// Panics if the inner mutex is poisoned.
1319    pub async fn get_orders(
1320        &self,
1321        params: GetOrderParams,
1322    ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
1323        let inner = self.inner.clone();
1324        inner.get_orders(params).await
1325    }
1326
1327    /// Get instrument from the instruments cache (if found).
1328    ///
1329    /// # Errors
1330    ///
1331    /// Returns an error if the instrument is not found in the cache.
1332    fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1333        self.get_instrument(&symbol).ok_or_else(|| {
1334            anyhow::anyhow!(
1335                "Instrument {symbol} not found in cache, ensure instruments loaded first"
1336            )
1337        })
1338    }
1339
1340    /// Returns the cached price precision for the given symbol.
1341    ///
1342    /// # Errors
1343    ///
1344    /// Returns an error if the instrument was never cached (for example, if
1345    /// instruments were not loaded prior to use).
1346    pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
1347        self.instrument_from_cache(symbol)
1348            .map(|instrument| instrument.price_precision())
1349    }
1350
1351    /// Get user margin information.
1352    ///
1353    /// # Errors
1354    ///
1355    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1356    pub async fn get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
1357        self.inner
1358            .get_margin(currency)
1359            .await
1360            .map_err(|e| anyhow::anyhow!(e))
1361    }
1362
1363    /// Request account state for the given account.
1364    ///
1365    /// # Errors
1366    ///
1367    /// Returns an error if the HTTP request fails or no account state is returned.
1368    pub async fn request_account_state(
1369        &self,
1370        account_id: AccountId,
1371    ) -> anyhow::Result<AccountState> {
1372        // Get margin data for XBt (Bitcoin) by default
1373        let margin = self
1374            .inner
1375            .get_margin("XBt")
1376            .await
1377            .map_err(|e| anyhow::anyhow!(e))?;
1378
1379        let ts_init =
1380            UnixNanos::from(chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64);
1381
1382        // Convert HTTP Margin to WebSocket MarginMsg for parsing
1383        let margin_msg = BitmexMarginMsg {
1384            account: margin.account,
1385            currency: margin.currency,
1386            risk_limit: margin.risk_limit,
1387            amount: margin.amount,
1388            prev_realised_pnl: margin.prev_realised_pnl,
1389            gross_comm: margin.gross_comm,
1390            gross_open_cost: margin.gross_open_cost,
1391            gross_open_premium: margin.gross_open_premium,
1392            gross_exec_cost: margin.gross_exec_cost,
1393            gross_mark_value: margin.gross_mark_value,
1394            risk_value: margin.risk_value,
1395            init_margin: margin.init_margin,
1396            maint_margin: margin.maint_margin,
1397            target_excess_margin: margin.target_excess_margin,
1398            realised_pnl: margin.realised_pnl,
1399            unrealised_pnl: margin.unrealised_pnl,
1400            wallet_balance: margin.wallet_balance,
1401            margin_balance: margin.margin_balance,
1402            margin_leverage: margin.margin_leverage,
1403            margin_used_pcnt: margin.margin_used_pcnt,
1404            excess_margin: margin.excess_margin,
1405            available_margin: margin.available_margin,
1406            withdrawable_margin: margin.withdrawable_margin,
1407            maker_fee_discount: None, // Not in HTTP response
1408            taker_fee_discount: None, // Not in HTTP response
1409            timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
1410            foreign_margin_balance: None,
1411            foreign_requirement: None,
1412        };
1413
1414        parse_account_state(&margin_msg, account_id, ts_init)
1415    }
1416
1417    /// Submit a new order.
1418    ///
1419    /// # Errors
1420    ///
1421    /// Returns an error if credentials are missing, the request fails, order validation fails,
1422    /// the order is rejected, or the API returns an error.
1423    #[allow(clippy::too_many_arguments)]
1424    pub async fn submit_order(
1425        &self,
1426        instrument_id: InstrumentId,
1427        client_order_id: ClientOrderId,
1428        order_side: OrderSide,
1429        order_type: OrderType,
1430        quantity: Quantity,
1431        time_in_force: TimeInForce,
1432        price: Option<Price>,
1433        trigger_price: Option<Price>,
1434        trigger_type: Option<TriggerType>,
1435        display_qty: Option<Quantity>,
1436        post_only: bool,
1437        reduce_only: bool,
1438        order_list_id: Option<OrderListId>,
1439        contingency_type: Option<ContingencyType>,
1440    ) -> anyhow::Result<OrderStatusReport> {
1441        use crate::common::enums::{
1442            BitmexExecInstruction, BitmexOrderType, BitmexSide, BitmexTimeInForce,
1443        };
1444
1445        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1446
1447        let mut params = super::query::PostOrderParamsBuilder::default();
1448        params.text(NAUTILUS_TRADER);
1449        params.symbol(instrument_id.symbol.as_str());
1450        params.cl_ord_id(client_order_id.as_str());
1451
1452        let side = BitmexSide::try_from_order_side(order_side)?;
1453        params.side(side);
1454
1455        let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
1456        params.ord_type(ord_type);
1457
1458        params.order_qty(quantity_to_u32(&quantity, &instrument));
1459
1460        let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
1461        params.time_in_force(tif);
1462
1463        if let Some(price) = price {
1464            params.price(price.as_f64());
1465        }
1466
1467        if let Some(trigger_price) = trigger_price {
1468            params.stop_px(trigger_price.as_f64());
1469        }
1470
1471        if let Some(display_qty) = display_qty {
1472            params.display_qty(quantity_to_u32(&display_qty, &instrument));
1473        }
1474
1475        if let Some(order_list_id) = order_list_id {
1476            params.cl_ord_link_id(order_list_id.as_str());
1477        }
1478
1479        let mut exec_inst = Vec::new();
1480
1481        if post_only {
1482            exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
1483        }
1484
1485        if reduce_only {
1486            exec_inst.push(BitmexExecInstruction::ReduceOnly);
1487        }
1488
1489        if trigger_price.is_some()
1490            && let Some(trigger_type) = trigger_type
1491        {
1492            match trigger_type {
1493                TriggerType::LastPrice => exec_inst.push(BitmexExecInstruction::LastPrice),
1494                TriggerType::MarkPrice => exec_inst.push(BitmexExecInstruction::MarkPrice),
1495                TriggerType::IndexPrice => exec_inst.push(BitmexExecInstruction::IndexPrice),
1496                _ => {} // Use BitMEX default (LastPrice) for other trigger types
1497            }
1498        }
1499
1500        if !exec_inst.is_empty() {
1501            params.exec_inst(exec_inst);
1502        }
1503
1504        if let Some(contingency_type) = contingency_type {
1505            let bitmex_contingency = BitmexContingencyType::try_from(contingency_type)?;
1506            params.contingency_type(bitmex_contingency);
1507        }
1508
1509        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1510
1511        let response = self.inner.place_order(params).await?;
1512
1513        let order: BitmexOrder = serde_json::from_value(response)?;
1514
1515        if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1516            let reason = order
1517                .ord_rej_reason
1518                .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1519            anyhow::bail!("Order rejected: {reason}");
1520        }
1521
1522        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1523        let ts_init = self.generate_ts_init();
1524
1525        parse_order_status_report(&order, &instrument, ts_init)
1526    }
1527
1528    /// Cancel an order.
1529    ///
1530    /// # Errors
1531    ///
1532    /// Returns an error if:
1533    /// - Credentials are missing.
1534    /// - The request fails.
1535    /// - The order doesn't exist.
1536    /// - The API returns an error.
1537    pub async fn cancel_order(
1538        &self,
1539        instrument_id: InstrumentId,
1540        client_order_id: Option<ClientOrderId>,
1541        venue_order_id: Option<VenueOrderId>,
1542    ) -> anyhow::Result<OrderStatusReport> {
1543        let mut params = super::query::DeleteOrderParamsBuilder::default();
1544        params.text(NAUTILUS_TRADER);
1545
1546        if let Some(venue_order_id) = venue_order_id {
1547            params.order_id(vec![venue_order_id.as_str().to_string()]);
1548        } else if let Some(client_order_id) = client_order_id {
1549            params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
1550        } else {
1551            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1552        }
1553
1554        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1555
1556        let response = self.inner.cancel_orders(params).await?;
1557
1558        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1559        let order = orders
1560            .into_iter()
1561            .next()
1562            .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1563
1564        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1565        let ts_init = self.generate_ts_init();
1566
1567        parse_order_status_report(&order, &instrument, ts_init)
1568    }
1569
1570    /// Cancel multiple orders.
1571    ///
1572    /// # Errors
1573    ///
1574    /// Returns an error if:
1575    /// - Credentials are missing.
1576    /// - The request fails.
1577    /// - The order doesn't exist.
1578    /// - The API returns an error.
1579    pub async fn cancel_orders(
1580        &self,
1581        instrument_id: InstrumentId,
1582        client_order_ids: Option<Vec<ClientOrderId>>,
1583        venue_order_ids: Option<Vec<VenueOrderId>>,
1584    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1585        let mut params = super::query::DeleteOrderParamsBuilder::default();
1586        params.text(NAUTILUS_TRADER);
1587
1588        // BitMEX API requires either client order IDs or venue order IDs, not both
1589        // Prioritize venue order IDs if both are provided
1590        if let Some(venue_order_ids) = venue_order_ids {
1591            if venue_order_ids.is_empty() {
1592                anyhow::bail!("venue_order_ids cannot be empty");
1593            }
1594            params.order_id(
1595                venue_order_ids
1596                    .iter()
1597                    .map(|id| id.to_string())
1598                    .collect::<Vec<_>>(),
1599            );
1600        } else if let Some(client_order_ids) = client_order_ids {
1601            if client_order_ids.is_empty() {
1602                anyhow::bail!("client_order_ids cannot be empty");
1603            }
1604            params.cl_ord_id(
1605                client_order_ids
1606                    .iter()
1607                    .map(|id| id.to_string())
1608                    .collect::<Vec<_>>(),
1609            );
1610        } else {
1611            anyhow::bail!("Either client_order_ids or venue_order_ids must be provided");
1612        }
1613
1614        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1615
1616        let response = self.inner.cancel_orders(params).await?;
1617
1618        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1619
1620        let ts_init = self.generate_ts_init();
1621        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1622
1623        let mut reports = Vec::new();
1624
1625        for order in orders {
1626            reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1627        }
1628
1629        Self::populate_linked_order_ids(&mut reports);
1630
1631        Ok(reports)
1632    }
1633
1634    /// Cancel all orders for an instrument and optionally an order side.
1635    ///
1636    /// # Errors
1637    ///
1638    /// Returns an error if:
1639    /// - Credentials are missing.
1640    /// - The request fails.
1641    /// - The order doesn't exist.
1642    /// - The API returns an error.
1643    pub async fn cancel_all_orders(
1644        &self,
1645        instrument_id: InstrumentId,
1646        order_side: Option<OrderSide>,
1647    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1648        let mut params = DeleteAllOrdersParamsBuilder::default();
1649        params.text(NAUTILUS_TRADER);
1650        params.symbol(instrument_id.symbol.as_str());
1651
1652        if let Some(side) = order_side {
1653            let side = BitmexSide::try_from_order_side(side)?;
1654            params.filter(serde_json::json!({
1655                "side": side
1656            }));
1657        }
1658
1659        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1660
1661        let response = self.inner.cancel_all_orders(params).await?;
1662
1663        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1664
1665        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1666        let ts_init = self.generate_ts_init();
1667
1668        let mut reports = Vec::new();
1669
1670        for order in orders {
1671            reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1672        }
1673
1674        Self::populate_linked_order_ids(&mut reports);
1675
1676        Ok(reports)
1677    }
1678
1679    /// Modify an existing order.
1680    ///
1681    /// # Errors
1682    ///
1683    /// Returns an error if:
1684    /// - Credentials are missing.
1685    /// - The request fails.
1686    /// - The order doesn't exist.
1687    /// - The order is already closed.
1688    /// - The API returns an error.
1689    pub async fn modify_order(
1690        &self,
1691        instrument_id: InstrumentId,
1692        client_order_id: Option<ClientOrderId>,
1693        venue_order_id: Option<VenueOrderId>,
1694        quantity: Option<Quantity>,
1695        price: Option<Price>,
1696        trigger_price: Option<Price>,
1697    ) -> anyhow::Result<OrderStatusReport> {
1698        let mut params = PutOrderParamsBuilder::default();
1699        params.text(NAUTILUS_TRADER);
1700
1701        // Set order ID - prefer venue_order_id if available
1702        if let Some(venue_order_id) = venue_order_id {
1703            params.order_id(venue_order_id.as_str());
1704        } else if let Some(client_order_id) = client_order_id {
1705            params.orig_cl_ord_id(client_order_id.as_str());
1706        } else {
1707            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1708        }
1709
1710        if let Some(quantity) = quantity {
1711            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1712            params.order_qty(quantity_to_u32(&quantity, &instrument));
1713        }
1714
1715        if let Some(price) = price {
1716            params.price(price.as_f64());
1717        }
1718
1719        if let Some(trigger_price) = trigger_price {
1720            params.stop_px(trigger_price.as_f64());
1721        }
1722
1723        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1724
1725        let response = self.inner.amend_order(params).await?;
1726
1727        let order: BitmexOrder = serde_json::from_value(response)?;
1728
1729        if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1730            let reason = order
1731                .ord_rej_reason
1732                .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1733            anyhow::bail!("Order modification rejected: {reason}");
1734        }
1735
1736        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1737        let ts_init = self.generate_ts_init();
1738
1739        parse_order_status_report(&order, &instrument, ts_init)
1740    }
1741
1742    /// Query a single order by client order ID or venue order ID.
1743    ///
1744    /// # Errors
1745    ///
1746    /// Returns an error if:
1747    /// - Credentials are missing.
1748    /// - The request fails.
1749    /// - The API returns an error.
1750    pub async fn query_order(
1751        &self,
1752        instrument_id: InstrumentId,
1753        client_order_id: Option<ClientOrderId>,
1754        venue_order_id: Option<VenueOrderId>,
1755    ) -> anyhow::Result<Option<OrderStatusReport>> {
1756        let mut params = GetOrderParamsBuilder::default();
1757
1758        let filter_json = if let Some(client_order_id) = client_order_id {
1759            serde_json::json!({
1760                "clOrdID": client_order_id.to_string()
1761            })
1762        } else if let Some(venue_order_id) = venue_order_id {
1763            serde_json::json!({
1764                "orderID": venue_order_id.to_string()
1765            })
1766        } else {
1767            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1768        };
1769
1770        params.filter(filter_json);
1771        params.count(1); // Only need one order
1772
1773        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1774
1775        let response = self.inner.get_orders(params).await?;
1776
1777        if response.is_empty() {
1778            return Ok(None);
1779        }
1780
1781        let order = &response[0];
1782
1783        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1784        let ts_init = self.generate_ts_init();
1785
1786        let report = parse_order_status_report(order, &instrument, ts_init)?;
1787
1788        Ok(Some(report))
1789    }
1790
1791    /// Request a single order status report.
1792    ///
1793    /// # Errors
1794    ///
1795    /// Returns an error if:
1796    /// - Credentials are missing.
1797    /// - The request fails.
1798    /// - The API returns an error.
1799    pub async fn request_order_status_report(
1800        &self,
1801        instrument_id: InstrumentId,
1802        client_order_id: Option<ClientOrderId>,
1803        venue_order_id: Option<VenueOrderId>,
1804    ) -> anyhow::Result<OrderStatusReport> {
1805        let mut params = GetOrderParamsBuilder::default();
1806        params.symbol(instrument_id.symbol.as_str());
1807
1808        if let Some(venue_order_id) = venue_order_id {
1809            params.filter(serde_json::json!({
1810                "orderID": venue_order_id.as_str()
1811            }));
1812        } else if let Some(client_order_id) = client_order_id {
1813            params.filter(serde_json::json!({
1814                "clOrdID": client_order_id.as_str()
1815            }));
1816        }
1817
1818        params.count(1i32);
1819        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1820
1821        let response = self.inner.get_orders(params).await?;
1822
1823        let order = response
1824            .into_iter()
1825            .next()
1826            .ok_or_else(|| anyhow::anyhow!("Order not found"))?;
1827
1828        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1829        let ts_init = self.generate_ts_init();
1830
1831        parse_order_status_report(&order, &instrument, ts_init)
1832    }
1833
1834    /// Request multiple order status reports.
1835    ///
1836    /// # Errors
1837    ///
1838    /// Returns an error if:
1839    /// - Credentials are missing.
1840    /// - The request fails.
1841    /// - The API returns an error.
1842    pub async fn request_order_status_reports(
1843        &self,
1844        instrument_id: Option<InstrumentId>,
1845        open_only: bool,
1846        limit: Option<u32>,
1847    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1848        let mut params = GetOrderParamsBuilder::default();
1849
1850        if let Some(instrument_id) = &instrument_id {
1851            params.symbol(instrument_id.symbol.as_str());
1852        }
1853
1854        if open_only {
1855            params.filter(serde_json::json!({
1856                "open": true
1857            }));
1858        }
1859
1860        if let Some(limit) = limit {
1861            params.count(limit as i32);
1862        } else {
1863            params.count(500); // Default count to avoid empty query
1864        }
1865
1866        params.reverse(true); // Get newest orders first
1867
1868        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1869
1870        let response = self.inner.get_orders(params).await?;
1871
1872        let ts_init = self.generate_ts_init();
1873
1874        let mut reports = Vec::new();
1875
1876        for order in response {
1877            // Skip orders without symbol (can happen with query responses)
1878            let Some(symbol) = order.symbol else {
1879                tracing::warn!("Order response missing symbol, skipping");
1880                continue;
1881            };
1882
1883            let Ok(instrument) = self.instrument_from_cache(symbol) else {
1884                tracing::debug!(
1885                    symbol = %symbol,
1886                    "Skipping order report for instrument not in cache"
1887                );
1888                continue;
1889            };
1890
1891            match parse_order_status_report(&order, &instrument, ts_init) {
1892                Ok(report) => reports.push(report),
1893                Err(e) => tracing::error!("Failed to parse order status report: {e}"),
1894            }
1895        }
1896
1897        Self::populate_linked_order_ids(&mut reports);
1898
1899        Ok(reports)
1900    }
1901
1902    /// Request trades for the given instrument.
1903    ///
1904    /// # Errors
1905    ///
1906    /// Returns an error if the HTTP request fails or parsing fails.
1907    pub async fn request_trades(
1908        &self,
1909        instrument_id: InstrumentId,
1910        start: Option<DateTime<Utc>>,
1911        end: Option<DateTime<Utc>>,
1912        limit: Option<u32>,
1913    ) -> anyhow::Result<Vec<TradeTick>> {
1914        let mut params = GetTradeParamsBuilder::default();
1915        params.symbol(instrument_id.symbol.as_str());
1916
1917        if let Some(start) = start {
1918            params.start_time(start);
1919        }
1920
1921        if let Some(end) = end {
1922            params.end_time(end);
1923        }
1924
1925        if let (Some(start), Some(end)) = (start, end) {
1926            anyhow::ensure!(
1927                start < end,
1928                "Invalid time range: start={start:?} end={end:?}",
1929            );
1930        }
1931
1932        if let Some(limit) = limit {
1933            let clamped_limit = limit.min(1000);
1934            if limit > 1000 {
1935                tracing::warn!(
1936                    limit,
1937                    clamped_limit,
1938                    "BitMEX trade request limit exceeds venue maximum; clamping",
1939                );
1940            }
1941            params.count(i32::try_from(clamped_limit).unwrap_or(1000));
1942        }
1943        params.reverse(false);
1944        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1945
1946        let response = self.inner.get_trades(params).await?;
1947
1948        let ts_init = self.generate_ts_init();
1949
1950        let mut parsed_trades = Vec::new();
1951
1952        for trade in response {
1953            if let Some(start) = start
1954                && trade.timestamp < start
1955            {
1956                continue;
1957            }
1958
1959            if let Some(end) = end
1960                && trade.timestamp > end
1961            {
1962                continue;
1963            }
1964
1965            let price_precision = self.get_price_precision(trade.symbol)?;
1966
1967            match parse_trade(trade, price_precision, ts_init) {
1968                Ok(trade) => parsed_trades.push(trade),
1969                Err(e) => tracing::error!("Failed to parse trade: {e}"),
1970            }
1971        }
1972
1973        Ok(parsed_trades)
1974    }
1975
1976    /// Request bars for the given bar type.
1977    ///
1978    /// # Errors
1979    ///
1980    /// Returns an error if the HTTP request fails, parsing fails, or the bar specification is
1981    /// unsupported by BitMEX.
1982    pub async fn request_bars(
1983        &self,
1984        mut bar_type: BarType,
1985        start: Option<DateTime<Utc>>,
1986        end: Option<DateTime<Utc>>,
1987        limit: Option<u32>,
1988        partial: bool,
1989    ) -> anyhow::Result<Vec<Bar>> {
1990        bar_type = bar_type.standard();
1991
1992        anyhow::ensure!(
1993            bar_type.aggregation_source() == AggregationSource::External,
1994            "Only EXTERNAL aggregation bars are supported"
1995        );
1996        anyhow::ensure!(
1997            bar_type.spec().price_type == PriceType::Last,
1998            "Only LAST price type bars are supported"
1999        );
2000        if let (Some(start), Some(end)) = (start, end) {
2001            anyhow::ensure!(
2002                start < end,
2003                "Invalid time range: start={start:?} end={end:?}"
2004            );
2005        }
2006
2007        let spec = bar_type.spec();
2008        let bin_size = match (spec.aggregation, spec.step.get()) {
2009            (BarAggregation::Minute, 1) => "1m",
2010            (BarAggregation::Minute, 5) => "5m",
2011            (BarAggregation::Hour, 1) => "1h",
2012            (BarAggregation::Day, 1) => "1d",
2013            _ => anyhow::bail!(
2014                "BitMEX does not support {}-{:?}-{:?} bars",
2015                spec.step.get(),
2016                spec.aggregation,
2017                spec.price_type,
2018            ),
2019        };
2020
2021        let instrument_id = bar_type.instrument_id();
2022        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2023
2024        let mut params = GetTradeBucketedParamsBuilder::default();
2025        params.symbol(instrument_id.symbol.as_str());
2026        params.bin_size(bin_size);
2027        if partial {
2028            params.partial(true);
2029        }
2030        if let Some(start) = start {
2031            params.start_time(start);
2032        }
2033        if let Some(end) = end {
2034            params.end_time(end);
2035        }
2036        if let Some(limit) = limit {
2037            let clamped_limit = limit.min(1000);
2038            if limit > 1000 {
2039                tracing::warn!(
2040                    limit,
2041                    clamped_limit,
2042                    "BitMEX bar request limit exceeds venue maximum; clamping",
2043                );
2044            }
2045            params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2046        }
2047        params.reverse(false);
2048        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2049
2050        let response = self.inner.get_trade_bucketed(params).await?;
2051        let ts_init = self.generate_ts_init();
2052        let mut bars = Vec::new();
2053
2054        for bin in response {
2055            if let Some(start) = start
2056                && bin.timestamp < start
2057            {
2058                continue;
2059            }
2060            if let Some(end) = end
2061                && bin.timestamp > end
2062            {
2063                continue;
2064            }
2065            if bin.symbol != instrument_id.symbol.inner() {
2066                tracing::warn!(
2067                    symbol = %bin.symbol,
2068                    expected = %instrument_id.symbol,
2069                    "Skipping trade bin for unexpected symbol",
2070                );
2071                continue;
2072            }
2073
2074            match parse_trade_bin(bin, &instrument, &bar_type, ts_init) {
2075                Ok(bar) => bars.push(bar),
2076                Err(e) => tracing::warn!("Failed to parse trade bin: {e}"),
2077            }
2078        }
2079
2080        Ok(bars)
2081    }
2082
2083    /// Request fill reports for the given instrument.
2084    ///
2085    /// # Errors
2086    ///
2087    /// Returns an error if the HTTP request fails or parsing fails.
2088    pub async fn request_fill_reports(
2089        &self,
2090        instrument_id: Option<InstrumentId>,
2091        limit: Option<u32>,
2092    ) -> anyhow::Result<Vec<FillReport>> {
2093        let mut params = GetExecutionParamsBuilder::default();
2094        if let Some(instrument_id) = instrument_id {
2095            params.symbol(instrument_id.symbol.as_str());
2096        }
2097        if let Some(limit) = limit {
2098            params.count(limit as i32);
2099        } else {
2100            params.count(500); // Default count
2101        }
2102        params.reverse(true); // Get newest fills first
2103
2104        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2105
2106        let response = self.inner.get_executions(params).await?;
2107
2108        let ts_init = self.generate_ts_init();
2109
2110        let mut reports = Vec::new();
2111
2112        for exec in response {
2113            // Skip executions without symbol (e.g., CancelReject)
2114            let Some(symbol) = exec.symbol else {
2115                tracing::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
2116                continue;
2117            };
2118            let symbol_str = symbol.to_string();
2119
2120            let instrument = match self.instrument_from_cache(symbol) {
2121                Ok(instrument) => instrument,
2122                Err(e) => {
2123                    tracing::error!(symbol = %symbol_str, "Instrument not found in cache for execution parsing: {e}");
2124                    continue;
2125                }
2126            };
2127
2128            match parse_fill_report(exec, &instrument, ts_init) {
2129                Ok(report) => reports.push(report),
2130                Err(e) => {
2131                    // Log at debug level for expected skip cases
2132                    let error_msg = e.to_string();
2133                    if error_msg.starts_with("Skipping non-trade execution")
2134                        || error_msg.starts_with("Skipping execution without order_id")
2135                    {
2136                        tracing::debug!("{e}");
2137                    } else {
2138                        tracing::error!("Failed to parse fill report: {e}");
2139                    }
2140                }
2141            }
2142        }
2143
2144        Ok(reports)
2145    }
2146
2147    /// Request position reports.
2148    ///
2149    /// # Errors
2150    ///
2151    /// Returns an error if the HTTP request fails or parsing fails.
2152    pub async fn request_position_status_reports(
2153        &self,
2154    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2155        let params = GetPositionParamsBuilder::default()
2156            .count(500) // Default count
2157            .build()
2158            .map_err(|e| anyhow::anyhow!(e))?;
2159
2160        let response = self.inner.get_positions(params).await?;
2161
2162        let ts_init = self.generate_ts_init();
2163
2164        let mut reports = Vec::new();
2165
2166        for pos in response {
2167            let symbol = pos.symbol;
2168            let instrument = match self.instrument_from_cache(symbol) {
2169                Ok(instrument) => instrument,
2170                Err(e) => {
2171                    tracing::error!(
2172                        symbol = pos.symbol.as_str(),
2173                        "Instrument not found in cache for position parsing: {e}"
2174                    );
2175                    continue;
2176                }
2177            };
2178
2179            match parse_position_report(pos, &instrument, ts_init) {
2180                Ok(report) => reports.push(report),
2181                Err(e) => tracing::error!("Failed to parse position report: {e}"),
2182            }
2183        }
2184
2185        Ok(reports)
2186    }
2187
2188    /// Update position leverage.
2189    ///
2190    /// # Errors
2191    ///
2192    /// - Credentials are missing.
2193    /// - The request fails.
2194    /// - The API returns an error.
2195    pub async fn update_position_leverage(
2196        &self,
2197        symbol: &str,
2198        leverage: f64,
2199    ) -> anyhow::Result<PositionStatusReport> {
2200        let params = PostPositionLeverageParams {
2201            symbol: symbol.to_string(),
2202            leverage,
2203            target_account_id: None,
2204        };
2205
2206        let response = self.inner.update_position_leverage(params).await?;
2207
2208        let instrument = self.instrument_from_cache(Ustr::from(symbol))?;
2209        let ts_init = self.generate_ts_init();
2210
2211        parse_position_report(response, &instrument, ts_init)
2212    }
2213}
2214
2215#[cfg(test)]
2216mod tests {
2217    use nautilus_core::UUID4;
2218    use nautilus_model::enums::OrderStatus;
2219    use rstest::rstest;
2220    use serde_json::json;
2221
2222    use super::*;
2223
2224    fn build_report(
2225        client_order_id: &str,
2226        venue_order_id: &str,
2227        contingency_type: ContingencyType,
2228        order_list_id: Option<&str>,
2229    ) -> OrderStatusReport {
2230        let mut report = OrderStatusReport::new(
2231            AccountId::from("BITMEX-1"),
2232            InstrumentId::from("XBTUSD.BITMEX"),
2233            Some(ClientOrderId::from(client_order_id)),
2234            VenueOrderId::from(venue_order_id),
2235            OrderSide::Buy,
2236            OrderType::Limit,
2237            TimeInForce::Gtc,
2238            OrderStatus::Accepted,
2239            Quantity::new(100.0, 0),
2240            Quantity::default(),
2241            UnixNanos::from(1_u64),
2242            UnixNanos::from(1_u64),
2243            UnixNanos::from(1_u64),
2244            Some(UUID4::new()),
2245        );
2246
2247        if let Some(id) = order_list_id {
2248            report = report.with_order_list_id(OrderListId::from(id));
2249        }
2250
2251        report.with_contingency_type(contingency_type)
2252    }
2253
2254    #[rstest]
2255    fn test_sign_request_generates_correct_headers() {
2256        let client = BitmexRawHttpClient::with_credentials(
2257            "test_api_key".to_string(),
2258            "test_api_secret".to_string(),
2259            "http://localhost:8080".to_string(),
2260            Some(60),
2261            None, // max_retries
2262            None, // retry_delay_ms
2263            None, // retry_delay_max_ms
2264            None, // recv_window_ms
2265            None, // max_requests_per_second
2266            None, // max_requests_per_minute
2267            None, // proxy_url
2268        )
2269        .expect("Failed to create test client");
2270
2271        let headers = client
2272            .sign_request(&Method::GET, "/api/v1/order", None)
2273            .unwrap();
2274
2275        assert!(headers.contains_key("api-key"));
2276        assert!(headers.contains_key("api-signature"));
2277        assert!(headers.contains_key("api-expires"));
2278        assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
2279    }
2280
2281    #[rstest]
2282    fn test_sign_request_with_body() {
2283        let client = BitmexRawHttpClient::with_credentials(
2284            "test_api_key".to_string(),
2285            "test_api_secret".to_string(),
2286            "http://localhost:8080".to_string(),
2287            Some(60),
2288            None, // max_retries
2289            None, // retry_delay_ms
2290            None, // retry_delay_max_ms
2291            None, // recv_window_ms
2292            None, // max_requests_per_second
2293            None, // max_requests_per_minute
2294            None, // proxy_url
2295        )
2296        .expect("Failed to create test client");
2297
2298        let body = json!({"symbol": "XBTUSD", "orderQty": 100});
2299        let body_bytes = serde_json::to_vec(&body).unwrap();
2300
2301        let headers_without_body = client
2302            .sign_request(&Method::POST, "/api/v1/order", None)
2303            .unwrap();
2304        let headers_with_body = client
2305            .sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
2306            .unwrap();
2307
2308        // Signatures should be different when body is included
2309        assert_ne!(
2310            headers_without_body.get("api-signature").unwrap(),
2311            headers_with_body.get("api-signature").unwrap()
2312        );
2313    }
2314
2315    #[rstest]
2316    fn test_sign_request_uses_custom_recv_window() {
2317        let client_default = BitmexRawHttpClient::with_credentials(
2318            "test_api_key".to_string(),
2319            "test_api_secret".to_string(),
2320            "http://localhost:8080".to_string(),
2321            Some(60),
2322            None,
2323            None,
2324            None,
2325            None, // Use default recv_window_ms (10000ms = 10s)
2326            None, // max_requests_per_second
2327            None, // max_requests_per_minute
2328            None, // proxy_url
2329        )
2330        .expect("Failed to create test client");
2331
2332        let client_custom = BitmexRawHttpClient::with_credentials(
2333            "test_api_key".to_string(),
2334            "test_api_secret".to_string(),
2335            "http://localhost:8080".to_string(),
2336            Some(60),
2337            None,
2338            None,
2339            None,
2340            Some(30_000), // 30 seconds
2341            None,         // max_requests_per_second
2342            None,         // max_requests_per_minute
2343            None,         // proxy_url
2344        )
2345        .expect("Failed to create test client");
2346
2347        let headers_default = client_default
2348            .sign_request(&Method::GET, "/api/v1/order", None)
2349            .unwrap();
2350        let headers_custom = client_custom
2351            .sign_request(&Method::GET, "/api/v1/order", None)
2352            .unwrap();
2353
2354        // Parse expires timestamps
2355        let expires_default: i64 = headers_default.get("api-expires").unwrap().parse().unwrap();
2356        let expires_custom: i64 = headers_custom.get("api-expires").unwrap().parse().unwrap();
2357
2358        // Verify both are valid future timestamps
2359        let now = Utc::now().timestamp();
2360        assert!(expires_default > now);
2361        assert!(expires_custom > now);
2362
2363        // Custom window should be greater than default
2364        assert!(expires_custom > expires_default);
2365
2366        // The difference should be approximately 20 seconds (30s - 10s)
2367        // Allow wider tolerance for delays between calls on slow CI runners
2368        let diff = expires_custom - expires_default;
2369        assert!((18..=25).contains(&diff));
2370    }
2371
2372    #[rstest]
2373    fn test_populate_linked_order_ids_from_order_list() {
2374        let base = "O-20250922-002219-001-000";
2375        let entry = format!("{base}-1");
2376        let stop = format!("{base}-2");
2377        let take = format!("{base}-3");
2378
2379        let mut reports = vec![
2380            build_report(&entry, "V-1", ContingencyType::Oto, Some("OL-1")),
2381            build_report(&stop, "V-2", ContingencyType::Ouo, Some("OL-1")),
2382            build_report(&take, "V-3", ContingencyType::Ouo, Some("OL-1")),
2383        ];
2384
2385        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2386
2387        assert_eq!(
2388            reports[0].linked_order_ids,
2389            Some(vec![
2390                ClientOrderId::from(stop.as_str()),
2391                ClientOrderId::from(take.as_str()),
2392            ]),
2393        );
2394        assert_eq!(
2395            reports[1].linked_order_ids,
2396            Some(vec![
2397                ClientOrderId::from(entry.as_str()),
2398                ClientOrderId::from(take.as_str()),
2399            ]),
2400        );
2401        assert_eq!(
2402            reports[2].linked_order_ids,
2403            Some(vec![
2404                ClientOrderId::from(entry.as_str()),
2405                ClientOrderId::from(stop.as_str()),
2406            ]),
2407        );
2408    }
2409
2410    #[rstest]
2411    fn test_populate_linked_order_ids_from_id_prefix() {
2412        let base = "O-20250922-002220-001-000";
2413        let entry = format!("{base}-1");
2414        let stop = format!("{base}-2");
2415        let take = format!("{base}-3");
2416
2417        let mut reports = vec![
2418            build_report(&entry, "V-1", ContingencyType::Oto, None),
2419            build_report(&stop, "V-2", ContingencyType::Ouo, None),
2420            build_report(&take, "V-3", ContingencyType::Ouo, None),
2421        ];
2422
2423        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2424
2425        assert_eq!(
2426            reports[0].linked_order_ids,
2427            Some(vec![
2428                ClientOrderId::from(stop.as_str()),
2429                ClientOrderId::from(take.as_str()),
2430            ]),
2431        );
2432        assert_eq!(
2433            reports[1].linked_order_ids,
2434            Some(vec![
2435                ClientOrderId::from(entry.as_str()),
2436                ClientOrderId::from(take.as_str()),
2437            ]),
2438        );
2439        assert_eq!(
2440            reports[2].linked_order_ids,
2441            Some(vec![
2442                ClientOrderId::from(entry.as_str()),
2443                ClientOrderId::from(stop.as_str()),
2444            ]),
2445        );
2446    }
2447
2448    #[rstest]
2449    fn test_populate_linked_order_ids_respects_non_contingent_orders() {
2450        let base = "O-20250922-002221-001-000";
2451        let entry = format!("{base}-1");
2452        let passive = format!("{base}-2");
2453
2454        let mut reports = vec![
2455            build_report(&entry, "V-1", ContingencyType::NoContingency, None),
2456            build_report(&passive, "V-2", ContingencyType::Ouo, None),
2457        ];
2458
2459        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2460
2461        // Non-contingent orders should not be linked
2462        assert!(reports[0].linked_order_ids.is_none());
2463
2464        // A contingent order with no other contingent peers should have contingency reset
2465        assert!(reports[1].linked_order_ids.is_none());
2466        assert_eq!(reports[1].contingency_type, ContingencyType::NoContingency);
2467    }
2468}