nautilus_bitmex/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the HTTP client integration for the [BitMEX](https://bitmex.com) REST API.
17//!
18//! This module defines and implements a [`BitmexHttpClient`] for
19//! sending requests to various BitMEX endpoints. It handles request signing
20//! (when credentials are provided), constructs valid HTTP requests
21//! using the [`HttpClient`], and parses the responses back into structured data or a [`BitmexHttpError`].
22//!
23//! BitMEX API reference <https://www.bitmex.com/api/explorer/#/default>.
24
25use std::{
26    collections::HashMap,
27    num::NonZeroU32,
28    sync::{
29        Arc, LazyLock,
30        atomic::{AtomicBool, Ordering},
31    },
32};
33
34use chrono::{DateTime, Utc};
35use dashmap::DashMap;
36use nautilus_core::{
37    UnixNanos,
38    consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
39    env::get_or_env_var_opt,
40    time::get_atomic_clock_realtime,
41};
42use nautilus_model::{
43    data::{Bar, BarType, TradeTick},
44    enums::{
45        AggregationSource, BarAggregation, ContingencyType, OrderSide, OrderType, PriceType,
46        TimeInForce, TriggerType,
47    },
48    events::AccountState,
49    identifiers::{AccountId, ClientOrderId, InstrumentId, OrderListId, VenueOrderId},
50    instruments::{Instrument as InstrumentTrait, InstrumentAny},
51    reports::{FillReport, OrderStatusReport, PositionStatusReport},
52    types::{Price, Quantity},
53};
54use nautilus_network::{
55    http::{HttpClient, Method, StatusCode, USER_AGENT},
56    ratelimiter::quota::Quota,
57    retry::{RetryConfig, RetryManager},
58};
59use serde::{Deserialize, Serialize, de::DeserializeOwned};
60use serde_json::Value;
61use tokio_util::sync::CancellationToken;
62use ustr::Ustr;
63
64use super::{
65    error::{BitmexErrorResponse, BitmexHttpError},
66    models::{
67        BitmexApiInfo, BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder,
68        BitmexPosition, BitmexTrade, BitmexTradeBin, BitmexWallet,
69    },
70    query::{
71        DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
72        GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeBucketedParams,
73        GetTradeBucketedParamsBuilder, GetTradeParams, GetTradeParamsBuilder, PostOrderParams,
74        PostPositionLeverageParams, PutOrderParams,
75    },
76};
77use crate::{
78    common::{
79        consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL},
80        credential::Credential,
81        enums::{BitmexContingencyType, BitmexOrderStatus, BitmexSide},
82        parse::{parse_account_state, quantity_to_u32},
83    },
84    http::{
85        parse::{
86            InstrumentParseResult, parse_fill_report, parse_instrument_any,
87            parse_order_status_report, parse_position_report, parse_trade, parse_trade_bin,
88        },
89        query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
90    },
91    websocket::messages::BitmexMarginMsg,
92};
93
94/// Default BitMEX REST API rate limits.
95///
96/// BitMEX implements a dual-layer rate limiting system:
97/// - Primary limit: 120 requests per minute for authenticated users (30 for unauthenticated).
98/// - Secondary limit: 10 requests per second burst limit for specific endpoints.
99const BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 10;
100const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED: u32 = 120;
101const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED: u32 = 30;
102
103const BITMEX_GLOBAL_RATE_KEY: &str = "bitmex:global";
104const BITMEX_MINUTE_RATE_KEY: &str = "bitmex:minute";
105
106static RATE_LIMIT_KEYS: LazyLock<Vec<Ustr>> = LazyLock::new(|| {
107    vec![
108        Ustr::from(BITMEX_GLOBAL_RATE_KEY),
109        Ustr::from(BITMEX_MINUTE_RATE_KEY),
110    ]
111});
112
113/// Represents a BitMEX HTTP response.
114#[derive(Debug, Serialize, Deserialize)]
115pub struct BitmexResponse<T> {
116    /// The typed data returned by the BitMEX endpoint.
117    pub data: Vec<T>,
118}
119
120/// Provides a lower-level HTTP client for connecting to the [BitMEX](https://bitmex.com) REST API.
121///
122/// This client wraps the underlying [`HttpClient`] to handle functionality
123/// specific to BitMEX, such as request signing (for authenticated endpoints),
124/// forming request URLs, and deserializing responses into specific data models.
125///
126/// # Connection Management
127///
128/// The client uses HTTP keep-alive for connection pooling with a 90-second idle timeout,
129/// which matches BitMEX's server-side keep-alive timeout. Connections are automatically
130/// reused for subsequent requests to minimize latency.
131///
132/// # Rate Limiting
133///
134/// BitMEX enforces the following rate limits:
135/// - 120 requests per minute for authenticated users (30 for unauthenticated).
136/// - 10 requests per second burst limit for certain endpoints (order management).
137///
138/// The client automatically respects these limits through the configured quota.
139#[derive(Debug, Clone)]
140pub struct BitmexRawHttpClient {
141    base_url: String,
142    client: HttpClient,
143    credential: Option<Credential>,
144    recv_window_ms: u64,
145    retry_manager: RetryManager<BitmexHttpError>,
146    cancellation_token: CancellationToken,
147}
148
149impl Default for BitmexRawHttpClient {
150    fn default() -> Self {
151        Self::new(None, Some(60), None, None, None, None, None, None, None)
152            .expect("Failed to create default BitmexHttpInnerClient")
153    }
154}
155
156impl BitmexRawHttpClient {
157    /// Creates a new [`BitmexRawHttpClient`] using the default BitMEX HTTP URL,
158    /// optionally overridden with a custom base URL.
159    ///
160    /// This version of the client has **no credentials**, so it can only
161    /// call publicly accessible endpoints.
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if the retry manager cannot be created.
166    #[allow(clippy::too_many_arguments)]
167    pub fn new(
168        base_url: Option<String>,
169        timeout_secs: Option<u64>,
170        max_retries: Option<u32>,
171        retry_delay_ms: Option<u64>,
172        retry_delay_max_ms: Option<u64>,
173        recv_window_ms: Option<u64>,
174        max_requests_per_second: Option<u32>,
175        max_requests_per_minute: Option<u32>,
176        proxy_url: Option<String>,
177    ) -> Result<Self, BitmexHttpError> {
178        let retry_config = RetryConfig {
179            max_retries: max_retries.unwrap_or(3),
180            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
181            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
182            backoff_factor: 2.0,
183            jitter_ms: 1000,
184            operation_timeout_ms: Some(60_000),
185            immediate_first: false,
186            max_elapsed_ms: Some(180_000),
187        };
188
189        let retry_manager = RetryManager::new(retry_config);
190
191        let max_req_per_sec =
192            max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
193        let max_req_per_min =
194            max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED);
195
196        Ok(Self {
197            base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
198            client: HttpClient::new(
199                Self::default_headers(),
200                vec![],
201                Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
202                Some(Self::default_quota(max_req_per_sec)),
203                timeout_secs,
204                proxy_url,
205            )
206            .map_err(|e| {
207                BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
208            })?,
209            credential: None,
210            recv_window_ms: recv_window_ms.unwrap_or(10_000),
211            retry_manager,
212            cancellation_token: CancellationToken::new(),
213        })
214    }
215
216    /// Creates a new [`BitmexRawHttpClient`] configured with credentials
217    /// for authenticated requests, optionally using a custom base URL.
218    ///
219    /// # Errors
220    ///
221    /// Returns an error if the retry manager cannot be created.
222    #[allow(clippy::too_many_arguments)]
223    pub fn with_credentials(
224        api_key: String,
225        api_secret: String,
226        base_url: String,
227        timeout_secs: Option<u64>,
228        max_retries: Option<u32>,
229        retry_delay_ms: Option<u64>,
230        retry_delay_max_ms: Option<u64>,
231        recv_window_ms: Option<u64>,
232        max_requests_per_second: Option<u32>,
233        max_requests_per_minute: Option<u32>,
234        proxy_url: Option<String>,
235    ) -> Result<Self, BitmexHttpError> {
236        let retry_config = RetryConfig {
237            max_retries: max_retries.unwrap_or(3),
238            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
239            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
240            backoff_factor: 2.0,
241            jitter_ms: 1000,
242            operation_timeout_ms: Some(60_000),
243            immediate_first: false,
244            max_elapsed_ms: Some(180_000),
245        };
246
247        let retry_manager = RetryManager::new(retry_config);
248
249        let max_req_per_sec =
250            max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
251        let max_req_per_min =
252            max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED);
253
254        Ok(Self {
255            base_url,
256            client: HttpClient::new(
257                Self::default_headers(),
258                vec![],
259                Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
260                Some(Self::default_quota(max_req_per_sec)),
261                timeout_secs,
262                proxy_url,
263            )
264            .map_err(|e| {
265                BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
266            })?,
267            credential: Some(Credential::new(api_key, api_secret)),
268            recv_window_ms: recv_window_ms.unwrap_or(10_000),
269            retry_manager,
270            cancellation_token: CancellationToken::new(),
271        })
272    }
273
274    fn default_headers() -> HashMap<String, String> {
275        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
276    }
277
278    fn default_quota(max_requests_per_second: u32) -> Quota {
279        Quota::per_second(
280            NonZeroU32::new(max_requests_per_second)
281                .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
282        )
283    }
284
285    fn rate_limiter_quotas(
286        max_requests_per_second: u32,
287        max_requests_per_minute: u32,
288    ) -> Vec<(String, Quota)> {
289        let per_sec_quota = Quota::per_second(
290            NonZeroU32::new(max_requests_per_second)
291                .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
292        );
293        let per_min_quota =
294            Quota::per_minute(NonZeroU32::new(max_requests_per_minute).unwrap_or_else(|| {
295                NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED).unwrap()
296            }));
297
298        vec![
299            (BITMEX_GLOBAL_RATE_KEY.to_string(), per_sec_quota),
300            (BITMEX_MINUTE_RATE_KEY.to_string(), per_min_quota),
301        ]
302    }
303
304    fn rate_limit_keys() -> Vec<Ustr> {
305        RATE_LIMIT_KEYS.clone()
306    }
307
308    /// Cancel all pending HTTP requests.
309    pub fn cancel_all_requests(&self) {
310        self.cancellation_token.cancel();
311    }
312
313    /// Get the cancellation token for this client.
314    pub fn cancellation_token(&self) -> &CancellationToken {
315        &self.cancellation_token
316    }
317
318    fn sign_request(
319        &self,
320        method: &Method,
321        endpoint: &str,
322        body: Option<&[u8]>,
323    ) -> Result<HashMap<String, String>, BitmexHttpError> {
324        let credential = self
325            .credential
326            .as_ref()
327            .ok_or(BitmexHttpError::MissingCredentials)?;
328
329        let expires = Utc::now().timestamp() + (self.recv_window_ms / 1000) as i64;
330        let body_str = body.and_then(|b| std::str::from_utf8(b).ok()).unwrap_or("");
331
332        let full_path = if endpoint.starts_with("/api/v1") {
333            endpoint.to_string()
334        } else {
335            format!("/api/v1{endpoint}")
336        };
337
338        let signature = credential.sign(method.as_str(), &full_path, expires, body_str);
339
340        let mut headers = HashMap::new();
341        headers.insert("api-expires".to_string(), expires.to_string());
342        headers.insert("api-key".to_string(), credential.api_key.to_string());
343        headers.insert("api-signature".to_string(), signature);
344
345        // Add Content-Type header for form-encoded body
346        if body.is_some()
347            && (*method == Method::POST || *method == Method::PUT || *method == Method::DELETE)
348        {
349            headers.insert(
350                "Content-Type".to_string(),
351                "application/x-www-form-urlencoded".to_string(),
352            );
353        }
354
355        Ok(headers)
356    }
357
358    async fn send_request<T: DeserializeOwned, P: Serialize>(
359        &self,
360        method: Method,
361        endpoint: &str,
362        params: Option<&P>,
363        body: Option<Vec<u8>>,
364        authenticate: bool,
365    ) -> Result<T, BitmexHttpError> {
366        let endpoint = endpoint.to_string();
367        let method_clone = method.clone();
368        let body_clone = body.clone();
369
370        // Serialize params before closure to avoid reference lifetime issues
371        // Query params are used with GET and DELETE methods
372        let params_str = if method == Method::GET || method == Method::DELETE {
373            params
374                .map(serde_urlencoded::to_string)
375                .transpose()
376                .map_err(|e| {
377                    BitmexHttpError::JsonError(format!("Failed to serialize params: {e}"))
378                })?
379        } else {
380            None
381        };
382
383        let full_endpoint = match params_str {
384            Some(ref query) if !query.is_empty() => format!("{endpoint}?{query}"),
385            _ => endpoint.clone(),
386        };
387
388        let url = format!("{}{}", self.base_url, full_endpoint);
389
390        let operation = || {
391            let url = url.clone();
392            let method = method_clone.clone();
393            let body = body_clone.clone();
394            let full_endpoint = full_endpoint.clone();
395
396            async move {
397                let headers = if authenticate {
398                    Some(self.sign_request(&method, &full_endpoint, body.as_deref())?)
399                } else {
400                    None
401                };
402
403                let rate_keys = Self::rate_limit_keys();
404                let resp = self
405                    .client
406                    .request_with_ustr_keys(method, url, None, headers, body, None, Some(rate_keys))
407                    .await?;
408
409                if resp.status.is_success() {
410                    serde_json::from_slice(&resp.body).map_err(Into::into)
411                } else if let Ok(error_resp) =
412                    serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
413                {
414                    Err(error_resp.into())
415                } else {
416                    Err(BitmexHttpError::UnexpectedStatus {
417                        status: StatusCode::from_u16(resp.status.as_u16())
418                            .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
419                        body: String::from_utf8_lossy(&resp.body).to_string(),
420                    })
421                }
422            }
423        };
424
425        // Retry strategy based on BitMEX error responses and HTTP status codes:
426        //
427        // 1. Network errors: always retry (transient connection issues).
428        // 2. HTTP 5xx/429: server errors and rate limiting should be retried.
429        // 3. BitMEX JSON errors with specific handling:
430        //    - "RateLimitError": explicit rate limit error from BitMEX.
431        //    - "HTTPError": generic error name used by BitMEX for various issues
432        //      Only retry if message contains "rate limit" to avoid retrying
433        //      non-transient errors like authentication failures, validation errors,
434        //      insufficient balance, etc. which also return as "HTTPError".
435        //
436        // Note: BitMEX returns many permanent errors as "HTTPError" (e.g., "Invalid orderQty",
437        // "Account has insufficient Available Balance", "Invalid API Key") which should NOT
438        // be retried. We only retry when the message explicitly mentions rate limiting.
439        //
440        // See tests in tests/http.rs for retry behavior validation.
441        let should_retry = |error: &BitmexHttpError| -> bool {
442            match error {
443                BitmexHttpError::NetworkError(_) => true,
444                BitmexHttpError::UnexpectedStatus { status, .. } => {
445                    status.as_u16() >= 500 || status.as_u16() == 429
446                }
447                BitmexHttpError::BitmexError {
448                    error_name,
449                    message,
450                } => {
451                    error_name == "RateLimitError"
452                        || (error_name == "HTTPError"
453                            && message.to_lowercase().contains("rate limit"))
454                }
455                _ => false,
456            }
457        };
458
459        let create_error = |msg: String| -> BitmexHttpError {
460            if msg == "canceled" {
461                BitmexHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
462            } else {
463                BitmexHttpError::NetworkError(msg)
464            }
465        };
466
467        self.retry_manager
468            .execute_with_retry_with_cancel(
469                endpoint.as_str(),
470                operation,
471                should_retry,
472                create_error,
473                &self.cancellation_token,
474            )
475            .await
476    }
477
478    /// Get all instruments.
479    ///
480    /// # Errors
481    ///
482    /// Returns an error if the request fails, the response cannot be parsed, or the API returns an error.
483    pub async fn get_instruments(
484        &self,
485        active_only: bool,
486    ) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
487        let path = if active_only {
488            "/instrument/active"
489        } else {
490            "/instrument"
491        };
492        self.send_request::<_, ()>(Method::GET, path, None, None, false)
493            .await
494    }
495
496    /// Requests the current server time from BitMEX.
497    ///
498    /// Retrieves the BitMEX API info including the system time in Unix timestamp (milliseconds).
499    /// This is useful for synchronizing local clocks with the exchange server and logging time drift.
500    ///
501    /// # Errors
502    ///
503    /// Returns an error if the HTTP request fails or if the response body
504    /// cannot be parsed into [`BitmexApiInfo`].
505    pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
506        let response: BitmexApiInfo = self
507            .send_request::<_, ()>(Method::GET, "", None, None, false)
508            .await?;
509        Ok(response.timestamp)
510    }
511
512    /// Get the instrument definition for the specified symbol.
513    ///
514    /// BitMEX responds to `/instrument?symbol=...` with an array, even when
515    /// a single symbol is requested. This helper returns the first element of
516    /// that array and yields `Ok(None)` when the venue returns an empty list
517    /// (e.g. unknown symbol).
518    ///
519    /// # Errors
520    ///
521    /// Returns an error if the request fails or the payload cannot be deserialized.
522    pub async fn get_instrument(
523        &self,
524        symbol: &str,
525    ) -> Result<Option<BitmexInstrument>, BitmexHttpError> {
526        let path = &format!("/instrument?symbol={symbol}");
527        let instruments: Vec<BitmexInstrument> = self
528            .send_request::<_, ()>(Method::GET, path, None, None, false)
529            .await?;
530
531        Ok(instruments.into_iter().next())
532    }
533
534    /// Get user wallet information.
535    ///
536    /// # Errors
537    ///
538    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
539    pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
540        let endpoint = "/user/wallet";
541        self.send_request::<_, ()>(Method::GET, endpoint, None, None, true)
542            .await
543    }
544
545    /// Get user margin information.
546    ///
547    /// # Errors
548    ///
549    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
550    pub async fn get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
551        let path = format!("/user/margin?currency={currency}");
552        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
553            .await
554    }
555
556    /// Get historical trades.
557    ///
558    /// # Errors
559    ///
560    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
561    ///
562    /// # Panics
563    ///
564    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
565    pub async fn get_trades(
566        &self,
567        params: GetTradeParams,
568    ) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
569        self.send_request(Method::GET, "/trade", Some(&params), None, true)
570            .await
571    }
572
573    /// Get bucketed (aggregated) trade data.
574    ///
575    /// # Errors
576    ///
577    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
578    pub async fn get_trade_bucketed(
579        &self,
580        params: GetTradeBucketedParams,
581    ) -> Result<Vec<BitmexTradeBin>, BitmexHttpError> {
582        self.send_request(Method::GET, "/trade/bucketed", Some(&params), None, true)
583            .await
584    }
585
586    /// Get user orders.
587    ///
588    /// # Errors
589    ///
590    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
591    ///
592    /// # Panics
593    ///
594    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
595    pub async fn get_orders(
596        &self,
597        params: GetOrderParams,
598    ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
599        self.send_request(Method::GET, "/order", Some(&params), None, true)
600            .await
601    }
602
603    /// Place a new order.
604    ///
605    /// # Errors
606    ///
607    /// Returns an error if credentials are missing, the request fails, order validation fails, or the API returns an error.
608    ///
609    /// # Panics
610    ///
611    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
612    pub async fn place_order(&self, params: PostOrderParams) -> Result<Value, BitmexHttpError> {
613        // BitMEX spec requires form-encoded body for POST /order
614        let body = serde_urlencoded::to_string(&params)
615            .map_err(|e| {
616                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
617            })?
618            .into_bytes();
619        let path = "/order";
620        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
621            .await
622    }
623
624    /// Cancel user orders.
625    ///
626    /// # Errors
627    ///
628    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
629    ///
630    /// # Panics
631    ///
632    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
633    pub async fn cancel_orders(&self, params: DeleteOrderParams) -> Result<Value, BitmexHttpError> {
634        // BitMEX spec requires form-encoded body for DELETE /order
635        let body = serde_urlencoded::to_string(&params)
636            .map_err(|e| {
637                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
638            })?
639            .into_bytes();
640        let path = "/order";
641        self.send_request::<_, ()>(Method::DELETE, path, None, Some(body), true)
642            .await
643    }
644
645    /// Amend an existing order.
646    ///
647    /// # Errors
648    ///
649    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
650    ///
651    /// # Panics
652    ///
653    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
654    pub async fn amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
655        // BitMEX spec requires form-encoded body for PUT /order
656        let body = serde_urlencoded::to_string(&params)
657            .map_err(|e| {
658                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
659            })?
660            .into_bytes();
661        let path = "/order";
662        self.send_request::<_, ()>(Method::PUT, path, None, Some(body), true)
663            .await
664    }
665
666    /// Cancel all orders.
667    ///
668    /// # Errors
669    ///
670    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
671    ///
672    /// # Panics
673    ///
674    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
675    ///
676    /// # References
677    ///
678    /// <https://www.bitmex.com/api/explorer/#!/Order/Order_cancelAll>
679    pub async fn cancel_all_orders(
680        &self,
681        params: DeleteAllOrdersParams,
682    ) -> Result<Value, BitmexHttpError> {
683        self.send_request(Method::DELETE, "/order/all", Some(&params), None, true)
684            .await
685    }
686
687    /// Get user executions.
688    ///
689    /// # Errors
690    ///
691    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
692    ///
693    /// # Panics
694    ///
695    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
696    pub async fn get_executions(
697        &self,
698        params: GetExecutionParams,
699    ) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
700        let query = serde_urlencoded::to_string(&params).map_err(|e| {
701            BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
702        })?;
703        let path = format!("/execution/tradeHistory?{query}");
704        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
705            .await
706    }
707
708    /// Get user positions.
709    ///
710    /// # Errors
711    ///
712    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
713    ///
714    /// # Panics
715    ///
716    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
717    pub async fn get_positions(
718        &self,
719        params: GetPositionParams,
720    ) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
721        self.send_request(Method::GET, "/position", Some(&params), None, true)
722            .await
723    }
724
725    /// Update position leverage.
726    ///
727    /// # Errors
728    ///
729    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
730    ///
731    /// # Panics
732    ///
733    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
734    pub async fn update_position_leverage(
735        &self,
736        params: PostPositionLeverageParams,
737    ) -> Result<BitmexPosition, BitmexHttpError> {
738        // BitMEX spec requires form-encoded body for POST endpoints
739        let body = serde_urlencoded::to_string(&params)
740            .map_err(|e| {
741                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
742            })?
743            .into_bytes();
744        let path = "/position/leverage";
745        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
746            .await
747    }
748}
749
750/// Provides a HTTP client for connecting to the [BitMEX](https://bitmex.com) REST API.
751///
752/// This is the high-level client that wraps the inner client and provides
753/// Nautilus-specific functionality for trading operations.
754#[derive(Debug)]
755#[cfg_attr(
756    feature = "python",
757    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.bitmex")
758)]
759pub struct BitmexHttpClient {
760    inner: Arc<BitmexRawHttpClient>,
761    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
762    cache_initialized: AtomicBool,
763}
764
765impl Clone for BitmexHttpClient {
766    fn clone(&self) -> Self {
767        let cache_initialized = AtomicBool::new(false);
768
769        let is_initialized = self.cache_initialized.load(Ordering::Acquire);
770        if is_initialized {
771            cache_initialized.store(true, Ordering::Release);
772        }
773
774        Self {
775            inner: self.inner.clone(),
776            instruments_cache: self.instruments_cache.clone(),
777            cache_initialized,
778        }
779    }
780}
781
782impl Default for BitmexHttpClient {
783    fn default() -> Self {
784        Self::new(
785            None,
786            None,
787            None,
788            false,
789            Some(60),
790            None,
791            None,
792            None,
793            None,
794            None,
795            None,
796            None, // proxy_url
797        )
798        .expect("Failed to create default BitmexHttpClient")
799    }
800}
801
802impl BitmexHttpClient {
803    /// Creates a new [`BitmexHttpClient`] instance.
804    ///
805    /// # Errors
806    ///
807    /// Returns an error if the HTTP client cannot be created.
808    #[allow(clippy::too_many_arguments)]
809    pub fn new(
810        base_url: Option<String>,
811        api_key: Option<String>,
812        api_secret: Option<String>,
813        testnet: bool,
814        timeout_secs: Option<u64>,
815        max_retries: Option<u32>,
816        retry_delay_ms: Option<u64>,
817        retry_delay_max_ms: Option<u64>,
818        recv_window_ms: Option<u64>,
819        max_requests_per_second: Option<u32>,
820        max_requests_per_minute: Option<u32>,
821        proxy_url: Option<String>,
822    ) -> Result<Self, BitmexHttpError> {
823        // Determine the base URL
824        let url = base_url.unwrap_or_else(|| {
825            if testnet {
826                BITMEX_HTTP_TESTNET_URL.to_string()
827            } else {
828                BITMEX_HTTP_URL.to_string()
829            }
830        });
831
832        let inner = match (api_key, api_secret) {
833            (Some(key), Some(secret)) => BitmexRawHttpClient::with_credentials(
834                key,
835                secret,
836                url,
837                timeout_secs,
838                max_retries,
839                retry_delay_ms,
840                retry_delay_max_ms,
841                recv_window_ms,
842                max_requests_per_second,
843                max_requests_per_minute,
844                proxy_url,
845            )?,
846            _ => BitmexRawHttpClient::new(
847                Some(url),
848                timeout_secs,
849                max_retries,
850                retry_delay_ms,
851                retry_delay_max_ms,
852                recv_window_ms,
853                max_requests_per_second,
854                max_requests_per_minute,
855                proxy_url,
856            )?,
857        };
858
859        Ok(Self {
860            inner: Arc::new(inner),
861            instruments_cache: Arc::new(DashMap::new()),
862            cache_initialized: AtomicBool::new(false),
863        })
864    }
865
866    /// Creates a new [`BitmexHttpClient`] instance using environment variables and
867    /// the default BitMEX HTTP base URL.
868    ///
869    /// # Errors
870    ///
871    /// Returns an error if required environment variables are not set or invalid.
872    pub fn from_env() -> anyhow::Result<Self> {
873        Self::with_credentials(
874            None, None, None, None, None, None, None, None, None, None, None,
875        )
876        .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
877    }
878
879    /// Creates a new [`BitmexHttpClient`] configured with credentials
880    /// for authenticated requests.
881    ///
882    /// If `api_key` or `api_secret` are `None`, they will be sourced from the
883    /// `BITMEX_API_KEY` and `BITMEX_API_SECRET` environment variables.
884    ///
885    /// # Errors
886    ///
887    /// Returns an error if one credential is provided without the other.
888    #[allow(clippy::too_many_arguments)]
889    pub fn with_credentials(
890        api_key: Option<String>,
891        api_secret: Option<String>,
892        base_url: Option<String>,
893        timeout_secs: Option<u64>,
894        max_retries: Option<u32>,
895        retry_delay_ms: Option<u64>,
896        retry_delay_max_ms: Option<u64>,
897        recv_window_ms: Option<u64>,
898        max_requests_per_second: Option<u32>,
899        max_requests_per_minute: Option<u32>,
900        proxy_url: Option<String>,
901    ) -> anyhow::Result<Self> {
902        // Determine testnet from URL first to select correct environment variables
903        let testnet = base_url.as_ref().is_some_and(|url| url.contains("testnet"));
904
905        // Choose environment variables based on testnet flag
906        let (key_var, secret_var) = if testnet {
907            ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
908        } else {
909            ("BITMEX_API_KEY", "BITMEX_API_SECRET")
910        };
911
912        let api_key = get_or_env_var_opt(api_key, key_var);
913        let api_secret = get_or_env_var_opt(api_secret, secret_var);
914
915        // If we're trying to create an authenticated client, we need both key and secret
916        if api_key.is_some() && api_secret.is_none() {
917            anyhow::bail!("{secret_var} is required when {key_var} is provided");
918        }
919        if api_key.is_none() && api_secret.is_some() {
920            anyhow::bail!("{key_var} is required when {secret_var} is provided");
921        }
922
923        Self::new(
924            base_url,
925            api_key,
926            api_secret,
927            testnet,
928            timeout_secs,
929            max_retries,
930            retry_delay_ms,
931            retry_delay_max_ms,
932            recv_window_ms,
933            max_requests_per_second,
934            max_requests_per_minute,
935            proxy_url,
936        )
937        .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
938    }
939
940    /// Returns the base url being used by the client.
941    #[must_use]
942    pub fn base_url(&self) -> &str {
943        self.inner.base_url.as_str()
944    }
945
946    /// Returns the public API key being used by the client.
947    #[must_use]
948    pub fn api_key(&self) -> Option<&str> {
949        self.inner.credential.as_ref().map(|c| c.api_key.as_str())
950    }
951
952    /// Returns a masked version of the API key for logging purposes.
953    #[must_use]
954    pub fn api_key_masked(&self) -> Option<String> {
955        self.inner.credential.as_ref().map(|c| c.api_key_masked())
956    }
957
958    /// Requests the current server time from BitMEX.
959    ///
960    /// Returns the BitMEX system time as a Unix timestamp in milliseconds.
961    ///
962    /// # Errors
963    ///
964    /// Returns an error if the HTTP request fails or if the response cannot be parsed.
965    pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
966        self.inner.get_server_time().await
967    }
968
969    /// Generates a timestamp for initialization.
970    fn generate_ts_init(&self) -> UnixNanos {
971        get_atomic_clock_realtime().get_time_ns()
972    }
973
974    /// Check if the order has a contingency type that requires linking.
975    fn is_contingent_order(contingency_type: ContingencyType) -> bool {
976        matches!(
977            contingency_type,
978            ContingencyType::Oco | ContingencyType::Oto | ContingencyType::Ouo
979        )
980    }
981
982    /// Check if the order is a parent in contingency relationships.
983    fn is_parent_contingency(contingency_type: ContingencyType) -> bool {
984        matches!(
985            contingency_type,
986            ContingencyType::Oco | ContingencyType::Oto
987        )
988    }
989
990    /// Populate missing `linked_order_ids` for contingency orders by grouping on `order_list_id`.
991    fn populate_linked_order_ids(reports: &mut [OrderStatusReport]) {
992        let mut order_list_groups: HashMap<OrderListId, Vec<ClientOrderId>> = HashMap::new();
993        let mut order_list_parents: HashMap<OrderListId, ClientOrderId> = HashMap::new();
994        let mut prefix_groups: HashMap<String, Vec<ClientOrderId>> = HashMap::new();
995        let mut prefix_parents: HashMap<String, ClientOrderId> = HashMap::new();
996
997        for report in reports.iter() {
998            let Some(client_order_id) = report.client_order_id else {
999                continue;
1000            };
1001
1002            if let Some(order_list_id) = report.order_list_id {
1003                order_list_groups
1004                    .entry(order_list_id)
1005                    .or_default()
1006                    .push(client_order_id);
1007
1008                if Self::is_parent_contingency(report.contingency_type) {
1009                    order_list_parents
1010                        .entry(order_list_id)
1011                        .or_insert(client_order_id);
1012                }
1013            }
1014
1015            if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1016                && Self::is_contingent_order(report.contingency_type)
1017            {
1018                prefix_groups
1019                    .entry(base.to_owned())
1020                    .or_default()
1021                    .push(client_order_id);
1022
1023                if Self::is_parent_contingency(report.contingency_type) {
1024                    prefix_parents
1025                        .entry(base.to_owned())
1026                        .or_insert(client_order_id);
1027                }
1028            }
1029        }
1030
1031        for report in reports.iter_mut() {
1032            let Some(client_order_id) = report.client_order_id else {
1033                continue;
1034            };
1035
1036            if report.linked_order_ids.is_some() {
1037                continue;
1038            }
1039
1040            // Only process contingent orders
1041            if !Self::is_contingent_order(report.contingency_type) {
1042                continue;
1043            }
1044
1045            if let Some(order_list_id) = report.order_list_id
1046                && let Some(group) = order_list_groups.get(&order_list_id)
1047            {
1048                let mut linked: Vec<ClientOrderId> = group
1049                    .iter()
1050                    .copied()
1051                    .filter(|candidate| candidate != &client_order_id)
1052                    .collect();
1053
1054                if !linked.is_empty() {
1055                    if let Some(parent_id) = order_list_parents.get(&order_list_id) {
1056                        if client_order_id == *parent_id {
1057                            report.parent_order_id = None;
1058                        } else {
1059                            linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1060                            report.parent_order_id = Some(*parent_id);
1061                        }
1062                    } else {
1063                        report.parent_order_id = None;
1064                    }
1065
1066                    log::trace!(
1067                        "BitMEX linked ids sourced from order list id: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}, linked_order_ids={:?}",
1068                        client_order_id,
1069                        order_list_id,
1070                        report.contingency_type,
1071                        linked,
1072                    );
1073                    report.linked_order_ids = Some(linked);
1074                    continue;
1075                }
1076
1077                log::trace!(
1078                    "BitMEX order list id group had no peers: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}, order_list_group={:?}",
1079                    client_order_id,
1080                    order_list_id,
1081                    report.contingency_type,
1082                    group,
1083                );
1084                report.parent_order_id = None;
1085            } else if report.order_list_id.is_none() {
1086                report.parent_order_id = None;
1087            }
1088
1089            if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1090                && let Some(group) = prefix_groups.get(base)
1091            {
1092                let mut linked: Vec<ClientOrderId> = group
1093                    .iter()
1094                    .copied()
1095                    .filter(|candidate| candidate != &client_order_id)
1096                    .collect();
1097
1098                if !linked.is_empty() {
1099                    if let Some(parent_id) = prefix_parents.get(base) {
1100                        if client_order_id == *parent_id {
1101                            report.parent_order_id = None;
1102                        } else {
1103                            linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1104                            report.parent_order_id = Some(*parent_id);
1105                        }
1106                    } else {
1107                        report.parent_order_id = None;
1108                    }
1109
1110                    log::trace!(
1111                        "BitMEX linked ids constructed from client order id prefix: client_order_id={:?}, contingency_type={:?}, base={}, linked_order_ids={:?}",
1112                        client_order_id,
1113                        report.contingency_type,
1114                        base,
1115                        linked,
1116                    );
1117                    report.linked_order_ids = Some(linked);
1118                    continue;
1119                }
1120
1121                log::trace!(
1122                    "BitMEX client order id prefix group had no peers: client_order_id={:?}, contingency_type={:?}, base={}, prefix_group={:?}",
1123                    client_order_id,
1124                    report.contingency_type,
1125                    base,
1126                    group,
1127                );
1128                report.parent_order_id = None;
1129            } else if client_order_id.as_str().contains('-') {
1130                report.parent_order_id = None;
1131            }
1132
1133            if Self::is_contingent_order(report.contingency_type) {
1134                log::warn!(
1135                    "BitMEX order status report missing linked ids after grouping: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}",
1136                    report.client_order_id,
1137                    report.order_list_id,
1138                    report.contingency_type,
1139                );
1140                report.contingency_type = ContingencyType::NoContingency;
1141                report.parent_order_id = None;
1142            }
1143
1144            report.linked_order_ids = None;
1145        }
1146    }
1147
1148    /// Cancel all pending HTTP requests.
1149    pub fn cancel_all_requests(&self) {
1150        self.inner.cancel_all_requests();
1151    }
1152
1153    /// Get the cancellation token for this client.
1154    pub fn cancellation_token(&self) -> CancellationToken {
1155        self.inner.cancellation_token().clone()
1156    }
1157
1158    /// Caches a single instrument.
1159    ///
1160    /// Any existing instrument with the same symbol will be replaced.
1161    pub fn cache_instrument(&self, instrument: InstrumentAny) {
1162        self.instruments_cache
1163            .insert(instrument.raw_symbol().inner(), instrument);
1164        self.cache_initialized.store(true, Ordering::Release);
1165    }
1166
1167    /// Gets an instrument from the cache by symbol.
1168    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1169        self.instruments_cache
1170            .get(symbol)
1171            .map(|entry| entry.value().clone())
1172    }
1173
1174    /// Request a single instrument and parse it into a Nautilus type.
1175    ///
1176    /// # Errors
1177    ///
1178    /// Returns `Ok(Some(..))` when the venue returns a definition that parses
1179    /// successfully, `Ok(None)` when the instrument is unknown, unsupported, or the payload
1180    /// cannot be converted into a Nautilus `Instrument`.
1181    pub async fn request_instrument(
1182        &self,
1183        instrument_id: InstrumentId,
1184    ) -> anyhow::Result<Option<InstrumentAny>> {
1185        let response = self
1186            .inner
1187            .get_instrument(instrument_id.symbol.as_str())
1188            .await?;
1189
1190        let instrument = match response {
1191            Some(instrument) => instrument,
1192            None => return Ok(None),
1193        };
1194
1195        let ts_init = self.generate_ts_init();
1196
1197        match parse_instrument_any(&instrument, ts_init) {
1198            InstrumentParseResult::Ok(inst) => Ok(Some(*inst)),
1199            InstrumentParseResult::Unsupported {
1200                symbol,
1201                instrument_type,
1202            } => {
1203                log::debug!(
1204                    "Instrument {symbol} has unsupported type {instrument_type:?}, returning None"
1205                );
1206                Ok(None)
1207            }
1208            InstrumentParseResult::Inactive { symbol, state } => {
1209                log::debug!("Instrument {symbol} is inactive (state={state}), returning None");
1210                Ok(None)
1211            }
1212            InstrumentParseResult::Failed {
1213                symbol,
1214                instrument_type,
1215                error,
1216            } => {
1217                log::error!(
1218                    "Failed to parse instrument {symbol} (type={instrument_type:?}): {error}"
1219                );
1220                Ok(None)
1221            }
1222        }
1223    }
1224
1225    /// Request all available instruments and parse them into Nautilus types.
1226    ///
1227    /// # Errors
1228    ///
1229    /// Returns an error if the HTTP request fails or parsing fails.
1230    pub async fn request_instruments(
1231        &self,
1232        active_only: bool,
1233    ) -> anyhow::Result<Vec<InstrumentAny>> {
1234        let instruments = self.inner.get_instruments(active_only).await?;
1235        let ts_init = self.generate_ts_init();
1236
1237        let mut parsed_instruments = Vec::new();
1238        let mut skipped_count = 0;
1239        let mut inactive_count = 0;
1240        let mut failed_count = 0;
1241        let total_count = instruments.len();
1242
1243        for inst in instruments {
1244            match parse_instrument_any(&inst, ts_init) {
1245                InstrumentParseResult::Ok(instrument_any) => {
1246                    parsed_instruments.push(*instrument_any);
1247                }
1248                InstrumentParseResult::Unsupported {
1249                    symbol,
1250                    instrument_type,
1251                } => {
1252                    skipped_count += 1;
1253                    log::debug!(
1254                        "Skipping unsupported instrument type: symbol={symbol}, type={instrument_type:?}"
1255                    );
1256                }
1257                InstrumentParseResult::Inactive { symbol, state } => {
1258                    inactive_count += 1;
1259                    log::debug!("Skipping inactive instrument: symbol={symbol}, state={state}");
1260                }
1261                InstrumentParseResult::Failed {
1262                    symbol,
1263                    instrument_type,
1264                    error,
1265                } => {
1266                    failed_count += 1;
1267                    log::error!(
1268                        "Failed to parse instrument: symbol={symbol}, type={instrument_type:?}, error={error}"
1269                    );
1270                }
1271            }
1272        }
1273
1274        if skipped_count > 0 {
1275            log::info!(
1276                "Skipped {skipped_count} unsupported instrument type(s) out of {total_count} total"
1277            );
1278        }
1279
1280        if inactive_count > 0 {
1281            log::info!(
1282                "Skipped {inactive_count} inactive instrument(s) out of {total_count} total"
1283            );
1284        }
1285
1286        if failed_count > 0 {
1287            log::error!(
1288                "Instrument parse failures: {failed_count} failed out of {total_count} total ({} successfully parsed)",
1289                parsed_instruments.len()
1290            );
1291        }
1292
1293        Ok(parsed_instruments)
1294    }
1295
1296    /// Get user wallet information.
1297    ///
1298    /// # Errors
1299    ///
1300    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1301    ///
1302    /// # Panics
1303    ///
1304    /// Panics if the inner mutex is poisoned.
1305    pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
1306        let inner = self.inner.clone();
1307        inner.get_wallet().await
1308    }
1309
1310    /// Get user orders.
1311    ///
1312    /// # Errors
1313    ///
1314    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1315    ///
1316    /// # Panics
1317    ///
1318    /// Panics if the inner mutex is poisoned.
1319    pub async fn get_orders(
1320        &self,
1321        params: GetOrderParams,
1322    ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
1323        let inner = self.inner.clone();
1324        inner.get_orders(params).await
1325    }
1326
1327    /// Get instrument from the instruments cache (if found).
1328    ///
1329    /// # Errors
1330    ///
1331    /// Returns an error if the instrument is not found in the cache.
1332    fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1333        self.get_instrument(&symbol).ok_or_else(|| {
1334            anyhow::anyhow!(
1335                "Instrument {symbol} not found in cache, ensure instruments loaded first"
1336            )
1337        })
1338    }
1339
1340    /// Returns the cached price precision for the given symbol.
1341    ///
1342    /// # Errors
1343    ///
1344    /// Returns an error if the instrument was never cached (for example, if
1345    /// instruments were not loaded prior to use).
1346    pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
1347        self.instrument_from_cache(symbol)
1348            .map(|instrument| instrument.price_precision())
1349    }
1350
1351    /// Get user margin information.
1352    ///
1353    /// # Errors
1354    ///
1355    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1356    pub async fn get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
1357        self.inner
1358            .get_margin(currency)
1359            .await
1360            .map_err(|e| anyhow::anyhow!(e))
1361    }
1362
1363    /// Request account state for the given account.
1364    ///
1365    /// # Errors
1366    ///
1367    /// Returns an error if the HTTP request fails or no account state is returned.
1368    pub async fn request_account_state(
1369        &self,
1370        account_id: AccountId,
1371    ) -> anyhow::Result<AccountState> {
1372        // Get margin data for XBt (Bitcoin) by default
1373        let margin = self
1374            .inner
1375            .get_margin("XBt")
1376            .await
1377            .map_err(|e| anyhow::anyhow!(e))?;
1378
1379        let ts_init =
1380            UnixNanos::from(chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64);
1381
1382        // Convert HTTP Margin to WebSocket MarginMsg for parsing
1383        let margin_msg = BitmexMarginMsg {
1384            account: margin.account,
1385            currency: margin.currency,
1386            risk_limit: margin.risk_limit,
1387            amount: margin.amount,
1388            prev_realised_pnl: margin.prev_realised_pnl,
1389            gross_comm: margin.gross_comm,
1390            gross_open_cost: margin.gross_open_cost,
1391            gross_open_premium: margin.gross_open_premium,
1392            gross_exec_cost: margin.gross_exec_cost,
1393            gross_mark_value: margin.gross_mark_value,
1394            risk_value: margin.risk_value,
1395            init_margin: margin.init_margin,
1396            maint_margin: margin.maint_margin,
1397            target_excess_margin: margin.target_excess_margin,
1398            realised_pnl: margin.realised_pnl,
1399            unrealised_pnl: margin.unrealised_pnl,
1400            wallet_balance: margin.wallet_balance,
1401            margin_balance: margin.margin_balance,
1402            margin_leverage: margin.margin_leverage,
1403            margin_used_pcnt: margin.margin_used_pcnt,
1404            excess_margin: margin.excess_margin,
1405            available_margin: margin.available_margin,
1406            withdrawable_margin: margin.withdrawable_margin,
1407            maker_fee_discount: None, // Not in HTTP response
1408            taker_fee_discount: None, // Not in HTTP response
1409            timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
1410            foreign_margin_balance: None,
1411            foreign_requirement: None,
1412        };
1413
1414        parse_account_state(&margin_msg, account_id, ts_init)
1415    }
1416
1417    /// Submit a new order.
1418    ///
1419    /// # Errors
1420    ///
1421    /// Returns an error if credentials are missing, the request fails, order validation fails,
1422    /// the order is rejected, or the API returns an error.
1423    #[allow(clippy::too_many_arguments)]
1424    pub async fn submit_order(
1425        &self,
1426        instrument_id: InstrumentId,
1427        client_order_id: ClientOrderId,
1428        order_side: OrderSide,
1429        order_type: OrderType,
1430        quantity: Quantity,
1431        time_in_force: TimeInForce,
1432        price: Option<Price>,
1433        trigger_price: Option<Price>,
1434        trigger_type: Option<TriggerType>,
1435        display_qty: Option<Quantity>,
1436        post_only: bool,
1437        reduce_only: bool,
1438        order_list_id: Option<OrderListId>,
1439        contingency_type: Option<ContingencyType>,
1440    ) -> anyhow::Result<OrderStatusReport> {
1441        use crate::common::enums::{
1442            BitmexExecInstruction, BitmexOrderType, BitmexSide, BitmexTimeInForce,
1443        };
1444
1445        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1446
1447        let mut params = super::query::PostOrderParamsBuilder::default();
1448        params.text(NAUTILUS_TRADER);
1449        params.symbol(instrument_id.symbol.as_str());
1450        params.cl_ord_id(client_order_id.as_str());
1451
1452        let side = BitmexSide::try_from_order_side(order_side)?;
1453        params.side(side);
1454
1455        let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
1456        params.ord_type(ord_type);
1457
1458        params.order_qty(quantity_to_u32(&quantity, &instrument));
1459
1460        let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
1461        params.time_in_force(tif);
1462
1463        if let Some(price) = price {
1464            params.price(price.as_f64());
1465        }
1466
1467        if let Some(trigger_price) = trigger_price {
1468            params.stop_px(trigger_price.as_f64());
1469        }
1470
1471        if let Some(display_qty) = display_qty {
1472            params.display_qty(quantity_to_u32(&display_qty, &instrument));
1473        }
1474
1475        if let Some(order_list_id) = order_list_id {
1476            params.cl_ord_link_id(order_list_id.as_str());
1477        }
1478
1479        let mut exec_inst = Vec::new();
1480
1481        if post_only {
1482            exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
1483        }
1484
1485        if reduce_only {
1486            exec_inst.push(BitmexExecInstruction::ReduceOnly);
1487        }
1488
1489        if trigger_price.is_some()
1490            && let Some(trigger_type) = trigger_type
1491        {
1492            match trigger_type {
1493                TriggerType::LastPrice => exec_inst.push(BitmexExecInstruction::LastPrice),
1494                TriggerType::MarkPrice => exec_inst.push(BitmexExecInstruction::MarkPrice),
1495                TriggerType::IndexPrice => exec_inst.push(BitmexExecInstruction::IndexPrice),
1496                _ => {} // Use BitMEX default (LastPrice) for other trigger types
1497            }
1498        }
1499
1500        if !exec_inst.is_empty() {
1501            params.exec_inst(exec_inst);
1502        }
1503
1504        if let Some(contingency_type) = contingency_type {
1505            let bitmex_contingency = BitmexContingencyType::try_from(contingency_type)?;
1506            params.contingency_type(bitmex_contingency);
1507        }
1508
1509        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1510
1511        let response = self.inner.place_order(params).await?;
1512
1513        let order: BitmexOrder = serde_json::from_value(response)?;
1514
1515        if order.ord_status == Some(BitmexOrderStatus::Rejected) {
1516            let reason = order
1517                .ord_rej_reason
1518                .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1519            anyhow::bail!("Order rejected: {reason}");
1520        }
1521
1522        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1523        let ts_init = self.generate_ts_init();
1524
1525        parse_order_status_report(&order, &instrument, ts_init)
1526    }
1527
1528    /// Cancel an order.
1529    ///
1530    /// # Errors
1531    ///
1532    /// Returns an error if:
1533    /// - Credentials are missing.
1534    /// - The request fails.
1535    /// - The order doesn't exist.
1536    /// - The API returns an error.
1537    pub async fn cancel_order(
1538        &self,
1539        instrument_id: InstrumentId,
1540        client_order_id: Option<ClientOrderId>,
1541        venue_order_id: Option<VenueOrderId>,
1542    ) -> anyhow::Result<OrderStatusReport> {
1543        let mut params = super::query::DeleteOrderParamsBuilder::default();
1544        params.text(NAUTILUS_TRADER);
1545
1546        if let Some(venue_order_id) = venue_order_id {
1547            params.order_id(vec![venue_order_id.as_str().to_string()]);
1548        } else if let Some(client_order_id) = client_order_id {
1549            params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
1550        } else {
1551            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1552        }
1553
1554        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1555
1556        let response = self.inner.cancel_orders(params).await?;
1557
1558        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1559        let order = orders
1560            .into_iter()
1561            .next()
1562            .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1563
1564        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1565        let ts_init = self.generate_ts_init();
1566
1567        parse_order_status_report(&order, &instrument, ts_init)
1568    }
1569
1570    /// Cancel multiple orders.
1571    ///
1572    /// # Errors
1573    ///
1574    /// Returns an error if:
1575    /// - Credentials are missing.
1576    /// - The request fails.
1577    /// - The order doesn't exist.
1578    /// - The API returns an error.
1579    pub async fn cancel_orders(
1580        &self,
1581        instrument_id: InstrumentId,
1582        client_order_ids: Option<Vec<ClientOrderId>>,
1583        venue_order_ids: Option<Vec<VenueOrderId>>,
1584    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1585        let mut params = super::query::DeleteOrderParamsBuilder::default();
1586        params.text(NAUTILUS_TRADER);
1587
1588        // BitMEX API requires either client order IDs or venue order IDs, not both
1589        // Prioritize venue order IDs if both are provided
1590        if let Some(venue_order_ids) = venue_order_ids {
1591            if venue_order_ids.is_empty() {
1592                anyhow::bail!("venue_order_ids cannot be empty");
1593            }
1594            params.order_id(
1595                venue_order_ids
1596                    .iter()
1597                    .map(|id| id.to_string())
1598                    .collect::<Vec<_>>(),
1599            );
1600        } else if let Some(client_order_ids) = client_order_ids {
1601            if client_order_ids.is_empty() {
1602                anyhow::bail!("client_order_ids cannot be empty");
1603            }
1604            params.cl_ord_id(
1605                client_order_ids
1606                    .iter()
1607                    .map(|id| id.to_string())
1608                    .collect::<Vec<_>>(),
1609            );
1610        } else {
1611            anyhow::bail!("Either client_order_ids or venue_order_ids must be provided");
1612        }
1613
1614        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1615
1616        let response = self.inner.cancel_orders(params).await?;
1617
1618        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1619
1620        let ts_init = self.generate_ts_init();
1621        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1622
1623        let mut reports = Vec::new();
1624
1625        for order in orders {
1626            reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1627        }
1628
1629        Self::populate_linked_order_ids(&mut reports);
1630
1631        Ok(reports)
1632    }
1633
1634    /// Cancel all orders for an instrument and optionally an order side.
1635    ///
1636    /// # Errors
1637    ///
1638    /// Returns an error if:
1639    /// - Credentials are missing.
1640    /// - The request fails.
1641    /// - The order doesn't exist.
1642    /// - The API returns an error.
1643    pub async fn cancel_all_orders(
1644        &self,
1645        instrument_id: InstrumentId,
1646        order_side: Option<OrderSide>,
1647    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1648        let mut params = DeleteAllOrdersParamsBuilder::default();
1649        params.text(NAUTILUS_TRADER);
1650        params.symbol(instrument_id.symbol.as_str());
1651
1652        if let Some(side) = order_side {
1653            let side = BitmexSide::try_from_order_side(side)?;
1654            params.filter(serde_json::json!({
1655                "side": side
1656            }));
1657        }
1658
1659        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1660
1661        let response = self.inner.cancel_all_orders(params).await?;
1662
1663        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1664
1665        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1666        let ts_init = self.generate_ts_init();
1667
1668        let mut reports = Vec::new();
1669
1670        for order in orders {
1671            reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1672        }
1673
1674        Self::populate_linked_order_ids(&mut reports);
1675
1676        Ok(reports)
1677    }
1678
1679    /// Modify an existing order.
1680    ///
1681    /// # Errors
1682    ///
1683    /// Returns an error if:
1684    /// - Credentials are missing.
1685    /// - The request fails.
1686    /// - The order doesn't exist.
1687    /// - The order is already closed.
1688    /// - The API returns an error.
1689    pub async fn modify_order(
1690        &self,
1691        instrument_id: InstrumentId,
1692        client_order_id: Option<ClientOrderId>,
1693        venue_order_id: Option<VenueOrderId>,
1694        quantity: Option<Quantity>,
1695        price: Option<Price>,
1696        trigger_price: Option<Price>,
1697    ) -> anyhow::Result<OrderStatusReport> {
1698        let mut params = PutOrderParamsBuilder::default();
1699        params.text(NAUTILUS_TRADER);
1700
1701        // Set order ID - prefer venue_order_id if available
1702        if let Some(venue_order_id) = venue_order_id {
1703            params.order_id(venue_order_id.as_str());
1704        } else if let Some(client_order_id) = client_order_id {
1705            params.orig_cl_ord_id(client_order_id.as_str());
1706        } else {
1707            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1708        }
1709
1710        if let Some(quantity) = quantity {
1711            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1712            params.order_qty(quantity_to_u32(&quantity, &instrument));
1713        }
1714
1715        if let Some(price) = price {
1716            params.price(price.as_f64());
1717        }
1718
1719        if let Some(trigger_price) = trigger_price {
1720            params.stop_px(trigger_price.as_f64());
1721        }
1722
1723        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1724
1725        let response = self.inner.amend_order(params).await?;
1726
1727        let order: BitmexOrder = serde_json::from_value(response)?;
1728
1729        if order.ord_status == Some(BitmexOrderStatus::Rejected) {
1730            let reason = order
1731                .ord_rej_reason
1732                .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1733            anyhow::bail!("Order modification rejected: {reason}");
1734        }
1735
1736        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1737        let ts_init = self.generate_ts_init();
1738
1739        parse_order_status_report(&order, &instrument, ts_init)
1740    }
1741
1742    /// Query a single order by client order ID or venue order ID.
1743    ///
1744    /// # Errors
1745    ///
1746    /// Returns an error if:
1747    /// - Credentials are missing.
1748    /// - The request fails.
1749    /// - The API returns an error.
1750    pub async fn query_order(
1751        &self,
1752        instrument_id: InstrumentId,
1753        client_order_id: Option<ClientOrderId>,
1754        venue_order_id: Option<VenueOrderId>,
1755    ) -> anyhow::Result<Option<OrderStatusReport>> {
1756        let mut params = GetOrderParamsBuilder::default();
1757
1758        let filter_json = if let Some(client_order_id) = client_order_id {
1759            serde_json::json!({
1760                "clOrdID": client_order_id.to_string()
1761            })
1762        } else if let Some(venue_order_id) = venue_order_id {
1763            serde_json::json!({
1764                "orderID": venue_order_id.to_string()
1765            })
1766        } else {
1767            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1768        };
1769
1770        params.filter(filter_json);
1771        params.count(1); // Only need one order
1772
1773        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1774
1775        let response = self.inner.get_orders(params).await?;
1776
1777        if response.is_empty() {
1778            return Ok(None);
1779        }
1780
1781        let order = &response[0];
1782
1783        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1784        let ts_init = self.generate_ts_init();
1785
1786        let report = parse_order_status_report(order, &instrument, ts_init)?;
1787
1788        Ok(Some(report))
1789    }
1790
1791    /// Request a single order status report.
1792    ///
1793    /// # Errors
1794    ///
1795    /// Returns an error if:
1796    /// - Credentials are missing.
1797    /// - The request fails.
1798    /// - The API returns an error.
1799    pub async fn request_order_status_report(
1800        &self,
1801        instrument_id: InstrumentId,
1802        client_order_id: Option<ClientOrderId>,
1803        venue_order_id: Option<VenueOrderId>,
1804    ) -> anyhow::Result<OrderStatusReport> {
1805        let mut params = GetOrderParamsBuilder::default();
1806        params.symbol(instrument_id.symbol.as_str());
1807
1808        if let Some(venue_order_id) = venue_order_id {
1809            params.filter(serde_json::json!({
1810                "orderID": venue_order_id.as_str()
1811            }));
1812        } else if let Some(client_order_id) = client_order_id {
1813            params.filter(serde_json::json!({
1814                "clOrdID": client_order_id.as_str()
1815            }));
1816        }
1817
1818        params.count(1i32);
1819        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1820
1821        let response = self.inner.get_orders(params).await?;
1822
1823        let order = response
1824            .into_iter()
1825            .next()
1826            .ok_or_else(|| anyhow::anyhow!("Order not found"))?;
1827
1828        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1829        let ts_init = self.generate_ts_init();
1830
1831        parse_order_status_report(&order, &instrument, ts_init)
1832    }
1833
1834    /// Request multiple order status reports.
1835    ///
1836    /// # Errors
1837    ///
1838    /// Returns an error if:
1839    /// - Credentials are missing.
1840    /// - The request fails.
1841    /// - The API returns an error.
1842    pub async fn request_order_status_reports(
1843        &self,
1844        instrument_id: Option<InstrumentId>,
1845        open_only: bool,
1846        limit: Option<u32>,
1847    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1848        let mut params = GetOrderParamsBuilder::default();
1849
1850        if let Some(instrument_id) = &instrument_id {
1851            params.symbol(instrument_id.symbol.as_str());
1852        }
1853
1854        if open_only {
1855            params.filter(serde_json::json!({
1856                "open": true
1857            }));
1858        }
1859
1860        if let Some(limit) = limit {
1861            params.count(limit as i32);
1862        } else {
1863            params.count(500); // Default count to avoid empty query
1864        }
1865
1866        params.reverse(true); // Get newest orders first
1867
1868        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1869
1870        let response = self.inner.get_orders(params).await?;
1871
1872        let ts_init = self.generate_ts_init();
1873
1874        let mut reports = Vec::new();
1875
1876        for order in response {
1877            // Skip orders without symbol (can happen with query responses)
1878            let Some(symbol) = order.symbol else {
1879                log::warn!("Order response missing symbol, skipping");
1880                continue;
1881            };
1882
1883            let Ok(instrument) = self.instrument_from_cache(symbol) else {
1884                log::debug!("Skipping order report for instrument not in cache: symbol={symbol}");
1885                continue;
1886            };
1887
1888            match parse_order_status_report(&order, &instrument, ts_init) {
1889                Ok(report) => reports.push(report),
1890                Err(e) => log::error!("Failed to parse order status report: {e}"),
1891            }
1892        }
1893
1894        Self::populate_linked_order_ids(&mut reports);
1895
1896        Ok(reports)
1897    }
1898
1899    /// Request trades for the given instrument.
1900    ///
1901    /// # Errors
1902    ///
1903    /// Returns an error if the HTTP request fails or parsing fails.
1904    pub async fn request_trades(
1905        &self,
1906        instrument_id: InstrumentId,
1907        start: Option<DateTime<Utc>>,
1908        end: Option<DateTime<Utc>>,
1909        limit: Option<u32>,
1910    ) -> anyhow::Result<Vec<TradeTick>> {
1911        let mut params = GetTradeParamsBuilder::default();
1912        params.symbol(instrument_id.symbol.as_str());
1913
1914        if let Some(start) = start {
1915            params.start_time(start);
1916        }
1917
1918        if let Some(end) = end {
1919            params.end_time(end);
1920        }
1921
1922        if let (Some(start), Some(end)) = (start, end) {
1923            anyhow::ensure!(
1924                start < end,
1925                "Invalid time range: start={start:?} end={end:?}",
1926            );
1927        }
1928
1929        if let Some(limit) = limit {
1930            let clamped_limit = limit.min(1000);
1931            if limit > 1000 {
1932                log::warn!(
1933                    "BitMEX trade request limit exceeds venue maximum; clamping: limit={limit}, clamped_limit={clamped_limit}",
1934                );
1935            }
1936            params.count(i32::try_from(clamped_limit).unwrap_or(1000));
1937        }
1938        params.reverse(false);
1939        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1940
1941        let response = self.inner.get_trades(params).await?;
1942
1943        let ts_init = self.generate_ts_init();
1944
1945        let mut parsed_trades = Vec::new();
1946
1947        for trade in response {
1948            if let Some(start) = start
1949                && trade.timestamp < start
1950            {
1951                continue;
1952            }
1953
1954            if let Some(end) = end
1955                && trade.timestamp > end
1956            {
1957                continue;
1958            }
1959
1960            let price_precision = self.get_price_precision(trade.symbol)?;
1961
1962            match parse_trade(trade, price_precision, ts_init) {
1963                Ok(trade) => parsed_trades.push(trade),
1964                Err(e) => log::error!("Failed to parse trade: {e}"),
1965            }
1966        }
1967
1968        Ok(parsed_trades)
1969    }
1970
1971    /// Request bars for the given bar type.
1972    ///
1973    /// # Errors
1974    ///
1975    /// Returns an error if the HTTP request fails, parsing fails, or the bar specification is
1976    /// unsupported by BitMEX.
1977    pub async fn request_bars(
1978        &self,
1979        mut bar_type: BarType,
1980        start: Option<DateTime<Utc>>,
1981        end: Option<DateTime<Utc>>,
1982        limit: Option<u32>,
1983        partial: bool,
1984    ) -> anyhow::Result<Vec<Bar>> {
1985        bar_type = bar_type.standard();
1986
1987        anyhow::ensure!(
1988            bar_type.aggregation_source() == AggregationSource::External,
1989            "Only EXTERNAL aggregation bars are supported"
1990        );
1991        anyhow::ensure!(
1992            bar_type.spec().price_type == PriceType::Last,
1993            "Only LAST price type bars are supported"
1994        );
1995        if let (Some(start), Some(end)) = (start, end) {
1996            anyhow::ensure!(
1997                start < end,
1998                "Invalid time range: start={start:?} end={end:?}"
1999            );
2000        }
2001
2002        let spec = bar_type.spec();
2003        let bin_size = match (spec.aggregation, spec.step.get()) {
2004            (BarAggregation::Minute, 1) => "1m",
2005            (BarAggregation::Minute, 5) => "5m",
2006            (BarAggregation::Hour, 1) => "1h",
2007            (BarAggregation::Day, 1) => "1d",
2008            _ => anyhow::bail!(
2009                "BitMEX does not support {}-{:?}-{:?} bars",
2010                spec.step.get(),
2011                spec.aggregation,
2012                spec.price_type,
2013            ),
2014        };
2015
2016        let instrument_id = bar_type.instrument_id();
2017        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2018
2019        let mut params = GetTradeBucketedParamsBuilder::default();
2020        params.symbol(instrument_id.symbol.as_str());
2021        params.bin_size(bin_size);
2022        if partial {
2023            params.partial(true);
2024        }
2025        if let Some(start) = start {
2026            params.start_time(start);
2027        }
2028        if let Some(end) = end {
2029            params.end_time(end);
2030        }
2031        if let Some(limit) = limit {
2032            let clamped_limit = limit.min(1000);
2033            if limit > 1000 {
2034                log::warn!(
2035                    "BitMEX bar request limit exceeds venue maximum; clamping: limit={limit}, clamped_limit={clamped_limit}",
2036                );
2037            }
2038            params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2039        }
2040        params.reverse(false);
2041        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2042
2043        let response = self.inner.get_trade_bucketed(params).await?;
2044        let ts_init = self.generate_ts_init();
2045        let mut bars = Vec::new();
2046
2047        for bin in response {
2048            if let Some(start) = start
2049                && bin.timestamp < start
2050            {
2051                continue;
2052            }
2053            if let Some(end) = end
2054                && bin.timestamp > end
2055            {
2056                continue;
2057            }
2058            if bin.symbol != instrument_id.symbol.inner() {
2059                log::warn!(
2060                    "Skipping trade bin for unexpected symbol: symbol={}, expected={}",
2061                    bin.symbol,
2062                    instrument_id.symbol,
2063                );
2064                continue;
2065            }
2066
2067            match parse_trade_bin(bin, &instrument, &bar_type, ts_init) {
2068                Ok(bar) => bars.push(bar),
2069                Err(e) => log::warn!("Failed to parse trade bin: {e}"),
2070            }
2071        }
2072
2073        Ok(bars)
2074    }
2075
2076    /// Request fill reports for the given instrument.
2077    ///
2078    /// # Errors
2079    ///
2080    /// Returns an error if the HTTP request fails or parsing fails.
2081    pub async fn request_fill_reports(
2082        &self,
2083        instrument_id: Option<InstrumentId>,
2084        limit: Option<u32>,
2085    ) -> anyhow::Result<Vec<FillReport>> {
2086        let mut params = GetExecutionParamsBuilder::default();
2087        if let Some(instrument_id) = instrument_id {
2088            params.symbol(instrument_id.symbol.as_str());
2089        }
2090        if let Some(limit) = limit {
2091            params.count(limit as i32);
2092        } else {
2093            params.count(500); // Default count
2094        }
2095        params.reverse(true); // Get newest fills first
2096
2097        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2098
2099        let response = self.inner.get_executions(params).await?;
2100
2101        let ts_init = self.generate_ts_init();
2102
2103        let mut reports = Vec::new();
2104
2105        for exec in response {
2106            // Skip executions without symbol (e.g., CancelReject)
2107            let Some(symbol) = exec.symbol else {
2108                log::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
2109                continue;
2110            };
2111            let symbol_str = symbol.to_string();
2112
2113            let instrument = match self.instrument_from_cache(symbol) {
2114                Ok(instrument) => instrument,
2115                Err(e) => {
2116                    log::error!(
2117                        "Instrument not found in cache for execution parsing: symbol={symbol_str}, {e}"
2118                    );
2119                    continue;
2120                }
2121            };
2122
2123            match parse_fill_report(exec, &instrument, ts_init) {
2124                Ok(report) => reports.push(report),
2125                Err(e) => {
2126                    // Log at debug level for expected skip cases
2127                    let error_msg = e.to_string();
2128                    if error_msg.starts_with("Skipping non-trade execution")
2129                        || error_msg.starts_with("Skipping execution without order_id")
2130                    {
2131                        log::debug!("{e}");
2132                    } else {
2133                        log::error!("Failed to parse fill report: {e}");
2134                    }
2135                }
2136            }
2137        }
2138
2139        Ok(reports)
2140    }
2141
2142    /// Request position reports.
2143    ///
2144    /// # Errors
2145    ///
2146    /// Returns an error if the HTTP request fails or parsing fails.
2147    pub async fn request_position_status_reports(
2148        &self,
2149    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2150        let params = GetPositionParamsBuilder::default()
2151            .count(500) // Default count
2152            .build()
2153            .map_err(|e| anyhow::anyhow!(e))?;
2154
2155        let response = self.inner.get_positions(params).await?;
2156
2157        let ts_init = self.generate_ts_init();
2158
2159        let mut reports = Vec::new();
2160
2161        for pos in response {
2162            let symbol = pos.symbol;
2163            let instrument = match self.instrument_from_cache(symbol) {
2164                Ok(instrument) => instrument,
2165                Err(e) => {
2166                    log::error!(
2167                        "Instrument not found in cache for position parsing: symbol={}, {e}",
2168                        pos.symbol.as_str(),
2169                    );
2170                    continue;
2171                }
2172            };
2173
2174            match parse_position_report(pos, &instrument, ts_init) {
2175                Ok(report) => reports.push(report),
2176                Err(e) => log::error!("Failed to parse position report: {e}"),
2177            }
2178        }
2179
2180        Ok(reports)
2181    }
2182
2183    /// Update position leverage.
2184    ///
2185    /// # Errors
2186    ///
2187    /// - Credentials are missing.
2188    /// - The request fails.
2189    /// - The API returns an error.
2190    pub async fn update_position_leverage(
2191        &self,
2192        symbol: &str,
2193        leverage: f64,
2194    ) -> anyhow::Result<PositionStatusReport> {
2195        let params = PostPositionLeverageParams {
2196            symbol: symbol.to_string(),
2197            leverage,
2198            target_account_id: None,
2199        };
2200
2201        let response = self.inner.update_position_leverage(params).await?;
2202
2203        let instrument = self.instrument_from_cache(Ustr::from(symbol))?;
2204        let ts_init = self.generate_ts_init();
2205
2206        parse_position_report(response, &instrument, ts_init)
2207    }
2208}
2209
2210#[cfg(test)]
2211mod tests {
2212    use nautilus_core::UUID4;
2213    use nautilus_model::enums::OrderStatus;
2214    use rstest::rstest;
2215    use serde_json::json;
2216
2217    use super::*;
2218
2219    fn build_report(
2220        client_order_id: &str,
2221        venue_order_id: &str,
2222        contingency_type: ContingencyType,
2223        order_list_id: Option<&str>,
2224    ) -> OrderStatusReport {
2225        let mut report = OrderStatusReport::new(
2226            AccountId::from("BITMEX-1"),
2227            InstrumentId::from("XBTUSD.BITMEX"),
2228            Some(ClientOrderId::from(client_order_id)),
2229            VenueOrderId::from(venue_order_id),
2230            OrderSide::Buy,
2231            OrderType::Limit,
2232            TimeInForce::Gtc,
2233            OrderStatus::Accepted,
2234            Quantity::new(100.0, 0),
2235            Quantity::default(),
2236            UnixNanos::from(1_u64),
2237            UnixNanos::from(1_u64),
2238            UnixNanos::from(1_u64),
2239            Some(UUID4::new()),
2240        );
2241
2242        if let Some(id) = order_list_id {
2243            report = report.with_order_list_id(OrderListId::from(id));
2244        }
2245
2246        report.with_contingency_type(contingency_type)
2247    }
2248
2249    #[rstest]
2250    fn test_sign_request_generates_correct_headers() {
2251        let client = BitmexRawHttpClient::with_credentials(
2252            "test_api_key".to_string(),
2253            "test_api_secret".to_string(),
2254            "http://localhost:8080".to_string(),
2255            Some(60),
2256            None, // max_retries
2257            None, // retry_delay_ms
2258            None, // retry_delay_max_ms
2259            None, // recv_window_ms
2260            None, // max_requests_per_second
2261            None, // max_requests_per_minute
2262            None, // proxy_url
2263        )
2264        .expect("Failed to create test client");
2265
2266        let headers = client
2267            .sign_request(&Method::GET, "/api/v1/order", None)
2268            .unwrap();
2269
2270        assert!(headers.contains_key("api-key"));
2271        assert!(headers.contains_key("api-signature"));
2272        assert!(headers.contains_key("api-expires"));
2273        assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
2274    }
2275
2276    #[rstest]
2277    fn test_sign_request_with_body() {
2278        let client = BitmexRawHttpClient::with_credentials(
2279            "test_api_key".to_string(),
2280            "test_api_secret".to_string(),
2281            "http://localhost:8080".to_string(),
2282            Some(60),
2283            None, // max_retries
2284            None, // retry_delay_ms
2285            None, // retry_delay_max_ms
2286            None, // recv_window_ms
2287            None, // max_requests_per_second
2288            None, // max_requests_per_minute
2289            None, // proxy_url
2290        )
2291        .expect("Failed to create test client");
2292
2293        let body = json!({"symbol": "XBTUSD", "orderQty": 100});
2294        let body_bytes = serde_json::to_vec(&body).unwrap();
2295
2296        let headers_without_body = client
2297            .sign_request(&Method::POST, "/api/v1/order", None)
2298            .unwrap();
2299        let headers_with_body = client
2300            .sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
2301            .unwrap();
2302
2303        // Signatures should be different when body is included
2304        assert_ne!(
2305            headers_without_body.get("api-signature").unwrap(),
2306            headers_with_body.get("api-signature").unwrap()
2307        );
2308    }
2309
2310    #[rstest]
2311    fn test_sign_request_uses_custom_recv_window() {
2312        let client_default = BitmexRawHttpClient::with_credentials(
2313            "test_api_key".to_string(),
2314            "test_api_secret".to_string(),
2315            "http://localhost:8080".to_string(),
2316            Some(60),
2317            None,
2318            None,
2319            None,
2320            None, // Use default recv_window_ms (10000ms = 10s)
2321            None, // max_requests_per_second
2322            None, // max_requests_per_minute
2323            None, // proxy_url
2324        )
2325        .expect("Failed to create test client");
2326
2327        let client_custom = BitmexRawHttpClient::with_credentials(
2328            "test_api_key".to_string(),
2329            "test_api_secret".to_string(),
2330            "http://localhost:8080".to_string(),
2331            Some(60),
2332            None,
2333            None,
2334            None,
2335            Some(30_000), // 30 seconds
2336            None,         // max_requests_per_second
2337            None,         // max_requests_per_minute
2338            None,         // proxy_url
2339        )
2340        .expect("Failed to create test client");
2341
2342        let headers_default = client_default
2343            .sign_request(&Method::GET, "/api/v1/order", None)
2344            .unwrap();
2345        let headers_custom = client_custom
2346            .sign_request(&Method::GET, "/api/v1/order", None)
2347            .unwrap();
2348
2349        // Parse expires timestamps
2350        let expires_default: i64 = headers_default.get("api-expires").unwrap().parse().unwrap();
2351        let expires_custom: i64 = headers_custom.get("api-expires").unwrap().parse().unwrap();
2352
2353        // Verify both are valid future timestamps
2354        let now = Utc::now().timestamp();
2355        assert!(expires_default > now);
2356        assert!(expires_custom > now);
2357
2358        // Custom window should be greater than default
2359        assert!(expires_custom > expires_default);
2360
2361        // The difference should be approximately 20 seconds (30s - 10s)
2362        // Allow wider tolerance for delays between calls on slow CI runners
2363        let diff = expires_custom - expires_default;
2364        assert!((18..=25).contains(&diff));
2365    }
2366
2367    #[rstest]
2368    fn test_populate_linked_order_ids_from_order_list() {
2369        let base = "O-20250922-002219-001-000";
2370        let entry = format!("{base}-1");
2371        let stop = format!("{base}-2");
2372        let take = format!("{base}-3");
2373
2374        let mut reports = vec![
2375            build_report(&entry, "V-1", ContingencyType::Oto, Some("OL-1")),
2376            build_report(&stop, "V-2", ContingencyType::Ouo, Some("OL-1")),
2377            build_report(&take, "V-3", ContingencyType::Ouo, Some("OL-1")),
2378        ];
2379
2380        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2381
2382        assert_eq!(
2383            reports[0].linked_order_ids,
2384            Some(vec![
2385                ClientOrderId::from(stop.as_str()),
2386                ClientOrderId::from(take.as_str()),
2387            ]),
2388        );
2389        assert_eq!(
2390            reports[1].linked_order_ids,
2391            Some(vec![
2392                ClientOrderId::from(entry.as_str()),
2393                ClientOrderId::from(take.as_str()),
2394            ]),
2395        );
2396        assert_eq!(
2397            reports[2].linked_order_ids,
2398            Some(vec![
2399                ClientOrderId::from(entry.as_str()),
2400                ClientOrderId::from(stop.as_str()),
2401            ]),
2402        );
2403    }
2404
2405    #[rstest]
2406    fn test_populate_linked_order_ids_from_id_prefix() {
2407        let base = "O-20250922-002220-001-000";
2408        let entry = format!("{base}-1");
2409        let stop = format!("{base}-2");
2410        let take = format!("{base}-3");
2411
2412        let mut reports = vec![
2413            build_report(&entry, "V-1", ContingencyType::Oto, None),
2414            build_report(&stop, "V-2", ContingencyType::Ouo, None),
2415            build_report(&take, "V-3", ContingencyType::Ouo, None),
2416        ];
2417
2418        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2419
2420        assert_eq!(
2421            reports[0].linked_order_ids,
2422            Some(vec![
2423                ClientOrderId::from(stop.as_str()),
2424                ClientOrderId::from(take.as_str()),
2425            ]),
2426        );
2427        assert_eq!(
2428            reports[1].linked_order_ids,
2429            Some(vec![
2430                ClientOrderId::from(entry.as_str()),
2431                ClientOrderId::from(take.as_str()),
2432            ]),
2433        );
2434        assert_eq!(
2435            reports[2].linked_order_ids,
2436            Some(vec![
2437                ClientOrderId::from(entry.as_str()),
2438                ClientOrderId::from(stop.as_str()),
2439            ]),
2440        );
2441    }
2442
2443    #[rstest]
2444    fn test_populate_linked_order_ids_respects_non_contingent_orders() {
2445        let base = "O-20250922-002221-001-000";
2446        let entry = format!("{base}-1");
2447        let passive = format!("{base}-2");
2448
2449        let mut reports = vec![
2450            build_report(&entry, "V-1", ContingencyType::NoContingency, None),
2451            build_report(&passive, "V-2", ContingencyType::Ouo, None),
2452        ];
2453
2454        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2455
2456        // Non-contingent orders should not be linked
2457        assert!(reports[0].linked_order_ids.is_none());
2458
2459        // A contingent order with no other contingent peers should have contingency reset
2460        assert!(reports[1].linked_order_ids.is_none());
2461        assert_eq!(reports[1].contingency_type, ContingencyType::NoContingency);
2462    }
2463}