nautilus_bitmex/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the HTTP client integration for the [BitMEX](https://bitmex.com) REST API.
17//!
18//! This module defines and implements a [`BitmexHttpClient`] for
19//! sending requests to various BitMEX endpoints. It handles request signing
20//! (when credentials are provided), constructs valid HTTP requests
21//! using the [`HttpClient`], and parses the responses back into structured data or a [`BitmexHttpError`].
22//!
23//! BitMEX API reference <https://www.bitmex.com/api/explorer/#/default>.
24
25use std::{
26    collections::HashMap,
27    num::NonZeroU32,
28    sync::{
29        Arc, LazyLock,
30        atomic::{AtomicBool, Ordering},
31    },
32};
33
34use chrono::{DateTime, Utc};
35use dashmap::DashMap;
36use nautilus_core::{
37    UnixNanos,
38    consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
39    env::get_or_env_var_opt,
40    time::get_atomic_clock_realtime,
41};
42use nautilus_model::{
43    data::{Bar, BarType, TradeTick},
44    enums::{
45        AggregationSource, BarAggregation, ContingencyType, OrderSide, OrderType, PriceType,
46        TimeInForce, TriggerType,
47    },
48    events::AccountState,
49    identifiers::{AccountId, ClientOrderId, InstrumentId, OrderListId, VenueOrderId},
50    instruments::{Instrument as InstrumentTrait, InstrumentAny},
51    reports::{FillReport, OrderStatusReport, PositionStatusReport},
52    types::{Price, Quantity},
53};
54use nautilus_network::{
55    http::{HttpClient, Method, StatusCode, USER_AGENT},
56    ratelimiter::quota::Quota,
57    retry::{RetryConfig, RetryManager},
58};
59use serde::{Deserialize, Serialize, de::DeserializeOwned};
60use serde_json::Value;
61use tokio_util::sync::CancellationToken;
62use ustr::Ustr;
63
64use super::{
65    error::{BitmexErrorResponse, BitmexHttpError},
66    models::{
67        BitmexApiInfo, BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder,
68        BitmexPosition, BitmexTrade, BitmexTradeBin, BitmexWallet,
69    },
70    query::{
71        DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
72        GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeBucketedParams,
73        GetTradeBucketedParamsBuilder, GetTradeParams, GetTradeParamsBuilder, PostOrderParams,
74        PostPositionLeverageParams, PutOrderParams,
75    },
76};
77use crate::{
78    common::{
79        consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL},
80        credential::Credential,
81        enums::{BitmexContingencyType, BitmexOrderStatus, BitmexSide},
82        parse::{parse_account_state, quantity_to_u32},
83    },
84    http::{
85        parse::{
86            InstrumentParseResult, parse_fill_report, parse_instrument_any,
87            parse_order_status_report, parse_position_report, parse_trade, parse_trade_bin,
88        },
89        query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
90    },
91    websocket::messages::BitmexMarginMsg,
92};
93
94/// Default BitMEX REST API rate limits.
95///
96/// BitMEX implements a dual-layer rate limiting system:
97/// - Primary limit: 120 requests per minute for authenticated users (30 for unauthenticated).
98/// - Secondary limit: 10 requests per second burst limit for specific endpoints.
99const BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 10;
100const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED: u32 = 120;
101const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED: u32 = 30;
102
103const BITMEX_GLOBAL_RATE_KEY: &str = "bitmex:global";
104const BITMEX_MINUTE_RATE_KEY: &str = "bitmex:minute";
105
106static RATE_LIMIT_KEYS: LazyLock<Vec<Ustr>> = LazyLock::new(|| {
107    vec![
108        Ustr::from(BITMEX_GLOBAL_RATE_KEY),
109        Ustr::from(BITMEX_MINUTE_RATE_KEY),
110    ]
111});
112
113/// Represents a BitMEX HTTP response.
114#[derive(Debug, Serialize, Deserialize)]
115pub struct BitmexResponse<T> {
116    /// The typed data returned by the BitMEX endpoint.
117    pub data: Vec<T>,
118}
119
120/// Provides a lower-level HTTP client for connecting to the [BitMEX](https://bitmex.com) REST API.
121///
122/// This client wraps the underlying [`HttpClient`] to handle functionality
123/// specific to BitMEX, such as request signing (for authenticated endpoints),
124/// forming request URLs, and deserializing responses into specific data models.
125///
126/// # Connection Management
127///
128/// The client uses HTTP keep-alive for connection pooling with a 90-second idle timeout,
129/// which matches BitMEX's server-side keep-alive timeout. Connections are automatically
130/// reused for subsequent requests to minimize latency.
131///
132/// # Rate Limiting
133///
134/// BitMEX enforces the following rate limits:
135/// - 120 requests per minute for authenticated users (30 for unauthenticated).
136/// - 10 requests per second burst limit for certain endpoints (order management).
137///
138/// The client automatically respects these limits through the configured quota.
139#[derive(Debug, Clone)]
140pub struct BitmexRawHttpClient {
141    base_url: String,
142    client: HttpClient,
143    credential: Option<Credential>,
144    recv_window_ms: u64,
145    retry_manager: RetryManager<BitmexHttpError>,
146    cancellation_token: CancellationToken,
147}
148
149impl Default for BitmexRawHttpClient {
150    fn default() -> Self {
151        Self::new(None, Some(60), None, None, None, None, None, None, None)
152            .expect("Failed to create default BitmexHttpInnerClient")
153    }
154}
155
156impl BitmexRawHttpClient {
157    /// Creates a new [`BitmexRawHttpClient`] using the default BitMEX HTTP URL,
158    /// optionally overridden with a custom base URL.
159    ///
160    /// This version of the client has **no credentials**, so it can only
161    /// call publicly accessible endpoints.
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if the retry manager cannot be created.
166    #[allow(clippy::too_many_arguments)]
167    pub fn new(
168        base_url: Option<String>,
169        timeout_secs: Option<u64>,
170        max_retries: Option<u32>,
171        retry_delay_ms: Option<u64>,
172        retry_delay_max_ms: Option<u64>,
173        recv_window_ms: Option<u64>,
174        max_requests_per_second: Option<u32>,
175        max_requests_per_minute: Option<u32>,
176        proxy_url: Option<String>,
177    ) -> Result<Self, BitmexHttpError> {
178        let retry_config = RetryConfig {
179            max_retries: max_retries.unwrap_or(3),
180            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
181            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
182            backoff_factor: 2.0,
183            jitter_ms: 1000,
184            operation_timeout_ms: Some(60_000),
185            immediate_first: false,
186            max_elapsed_ms: Some(180_000),
187        };
188
189        let retry_manager = RetryManager::new(retry_config);
190
191        let max_req_per_sec =
192            max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
193        let max_req_per_min =
194            max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED);
195
196        Ok(Self {
197            base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
198            client: HttpClient::new(
199                Self::default_headers(),
200                vec![],
201                Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
202                Some(Self::default_quota(max_req_per_sec)),
203                timeout_secs,
204                proxy_url,
205            )
206            .map_err(|e| {
207                BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
208            })?,
209            credential: None,
210            recv_window_ms: recv_window_ms.unwrap_or(10_000),
211            retry_manager,
212            cancellation_token: CancellationToken::new(),
213        })
214    }
215
216    /// Creates a new [`BitmexRawHttpClient`] configured with credentials
217    /// for authenticated requests, optionally using a custom base URL.
218    ///
219    /// # Errors
220    ///
221    /// Returns an error if the retry manager cannot be created.
222    #[allow(clippy::too_many_arguments)]
223    pub fn with_credentials(
224        api_key: String,
225        api_secret: String,
226        base_url: String,
227        timeout_secs: Option<u64>,
228        max_retries: Option<u32>,
229        retry_delay_ms: Option<u64>,
230        retry_delay_max_ms: Option<u64>,
231        recv_window_ms: Option<u64>,
232        max_requests_per_second: Option<u32>,
233        max_requests_per_minute: Option<u32>,
234        proxy_url: Option<String>,
235    ) -> Result<Self, BitmexHttpError> {
236        let retry_config = RetryConfig {
237            max_retries: max_retries.unwrap_or(3),
238            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
239            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
240            backoff_factor: 2.0,
241            jitter_ms: 1000,
242            operation_timeout_ms: Some(60_000),
243            immediate_first: false,
244            max_elapsed_ms: Some(180_000),
245        };
246
247        let retry_manager = RetryManager::new(retry_config);
248
249        let max_req_per_sec =
250            max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
251        let max_req_per_min =
252            max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED);
253
254        Ok(Self {
255            base_url,
256            client: HttpClient::new(
257                Self::default_headers(),
258                vec![],
259                Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
260                Some(Self::default_quota(max_req_per_sec)),
261                timeout_secs,
262                proxy_url,
263            )
264            .map_err(|e| {
265                BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
266            })?,
267            credential: Some(Credential::new(api_key, api_secret)),
268            recv_window_ms: recv_window_ms.unwrap_or(10_000),
269            retry_manager,
270            cancellation_token: CancellationToken::new(),
271        })
272    }
273
274    fn default_headers() -> HashMap<String, String> {
275        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
276    }
277
278    fn default_quota(max_requests_per_second: u32) -> Quota {
279        Quota::per_second(
280            NonZeroU32::new(max_requests_per_second)
281                .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
282        )
283    }
284
285    fn rate_limiter_quotas(
286        max_requests_per_second: u32,
287        max_requests_per_minute: u32,
288    ) -> Vec<(String, Quota)> {
289        let per_sec_quota = Quota::per_second(
290            NonZeroU32::new(max_requests_per_second)
291                .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
292        );
293        let per_min_quota =
294            Quota::per_minute(NonZeroU32::new(max_requests_per_minute).unwrap_or_else(|| {
295                NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED).unwrap()
296            }));
297
298        vec![
299            (BITMEX_GLOBAL_RATE_KEY.to_string(), per_sec_quota),
300            (BITMEX_MINUTE_RATE_KEY.to_string(), per_min_quota),
301        ]
302    }
303
304    fn rate_limit_keys() -> Vec<Ustr> {
305        RATE_LIMIT_KEYS.clone()
306    }
307
308    /// Cancel all pending HTTP requests.
309    pub fn cancel_all_requests(&self) {
310        self.cancellation_token.cancel();
311    }
312
313    /// Get the cancellation token for this client.
314    pub fn cancellation_token(&self) -> &CancellationToken {
315        &self.cancellation_token
316    }
317
318    fn sign_request(
319        &self,
320        method: &Method,
321        endpoint: &str,
322        body: Option<&[u8]>,
323    ) -> Result<HashMap<String, String>, BitmexHttpError> {
324        let credential = self
325            .credential
326            .as_ref()
327            .ok_or(BitmexHttpError::MissingCredentials)?;
328
329        let expires = Utc::now().timestamp() + (self.recv_window_ms / 1000) as i64;
330        let body_str = body.and_then(|b| std::str::from_utf8(b).ok()).unwrap_or("");
331
332        let full_path = if endpoint.starts_with("/api/v1") {
333            endpoint.to_string()
334        } else {
335            format!("/api/v1{endpoint}")
336        };
337
338        let signature = credential.sign(method.as_str(), &full_path, expires, body_str);
339
340        let mut headers = HashMap::new();
341        headers.insert("api-expires".to_string(), expires.to_string());
342        headers.insert("api-key".to_string(), credential.api_key.to_string());
343        headers.insert("api-signature".to_string(), signature);
344
345        // Add Content-Type header for form-encoded body
346        if body.is_some()
347            && (*method == Method::POST || *method == Method::PUT || *method == Method::DELETE)
348        {
349            headers.insert(
350                "Content-Type".to_string(),
351                "application/x-www-form-urlencoded".to_string(),
352            );
353        }
354
355        Ok(headers)
356    }
357
358    async fn send_request<T: DeserializeOwned, P: Serialize>(
359        &self,
360        method: Method,
361        endpoint: &str,
362        params: Option<&P>,
363        body: Option<Vec<u8>>,
364        authenticate: bool,
365    ) -> Result<T, BitmexHttpError> {
366        let endpoint = endpoint.to_string();
367        let method_clone = method.clone();
368        let body_clone = body.clone();
369
370        // Serialize params before closure to avoid reference lifetime issues
371        // Query params are used with GET and DELETE methods
372        let params_str = if method == Method::GET || method == Method::DELETE {
373            params
374                .map(serde_urlencoded::to_string)
375                .transpose()
376                .map_err(|e| {
377                    BitmexHttpError::JsonError(format!("Failed to serialize params: {e}"))
378                })?
379        } else {
380            None
381        };
382
383        let full_endpoint = match params_str {
384            Some(ref query) if !query.is_empty() => format!("{endpoint}?{query}"),
385            _ => endpoint.clone(),
386        };
387
388        let url = format!("{}{}", self.base_url, full_endpoint);
389
390        let operation = || {
391            let url = url.clone();
392            let method = method_clone.clone();
393            let body = body_clone.clone();
394            let full_endpoint = full_endpoint.clone();
395
396            async move {
397                let headers = if authenticate {
398                    Some(self.sign_request(&method, &full_endpoint, body.as_deref())?)
399                } else {
400                    None
401                };
402
403                let rate_keys = Self::rate_limit_keys();
404                let resp = self
405                    .client
406                    .request_with_ustr_keys(method, url, None, headers, body, None, Some(rate_keys))
407                    .await?;
408
409                if resp.status.is_success() {
410                    serde_json::from_slice(&resp.body).map_err(Into::into)
411                } else if let Ok(error_resp) =
412                    serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
413                {
414                    Err(error_resp.into())
415                } else {
416                    Err(BitmexHttpError::UnexpectedStatus {
417                        status: StatusCode::from_u16(resp.status.as_u16())
418                            .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
419                        body: String::from_utf8_lossy(&resp.body).to_string(),
420                    })
421                }
422            }
423        };
424
425        // Retry strategy based on BitMEX error responses and HTTP status codes:
426        //
427        // 1. Network errors: always retry (transient connection issues).
428        // 2. HTTP 5xx/429: server errors and rate limiting should be retried.
429        // 3. BitMEX JSON errors with specific handling:
430        //    - "RateLimitError": explicit rate limit error from BitMEX.
431        //    - "HTTPError": generic error name used by BitMEX for various issues
432        //      Only retry if message contains "rate limit" to avoid retrying
433        //      non-transient errors like authentication failures, validation errors,
434        //      insufficient balance, etc. which also return as "HTTPError".
435        //
436        // Note: BitMEX returns many permanent errors as "HTTPError" (e.g., "Invalid orderQty",
437        // "Account has insufficient Available Balance", "Invalid API Key") which should NOT
438        // be retried. We only retry when the message explicitly mentions rate limiting.
439        //
440        // See tests in tests/http.rs for retry behavior validation.
441        let should_retry = |error: &BitmexHttpError| -> bool {
442            match error {
443                BitmexHttpError::NetworkError(_) => true,
444                BitmexHttpError::UnexpectedStatus { status, .. } => {
445                    status.as_u16() >= 500 || status.as_u16() == 429
446                }
447                BitmexHttpError::BitmexError {
448                    error_name,
449                    message,
450                } => {
451                    error_name == "RateLimitError"
452                        || (error_name == "HTTPError"
453                            && message.to_lowercase().contains("rate limit"))
454                }
455                _ => false,
456            }
457        };
458
459        let create_error = |msg: String| -> BitmexHttpError {
460            if msg == "canceled" {
461                BitmexHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
462            } else {
463                BitmexHttpError::NetworkError(msg)
464            }
465        };
466
467        self.retry_manager
468            .execute_with_retry_with_cancel(
469                endpoint.as_str(),
470                operation,
471                should_retry,
472                create_error,
473                &self.cancellation_token,
474            )
475            .await
476    }
477
478    /// Get all instruments.
479    ///
480    /// # Errors
481    ///
482    /// Returns an error if the request fails, the response cannot be parsed, or the API returns an error.
483    pub async fn get_instruments(
484        &self,
485        active_only: bool,
486    ) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
487        let path = if active_only {
488            "/instrument/active"
489        } else {
490            "/instrument"
491        };
492        self.send_request::<_, ()>(Method::GET, path, None, None, false)
493            .await
494    }
495
496    /// Requests the current server time from BitMEX.
497    ///
498    /// Retrieves the BitMEX API info including the system time in Unix timestamp (milliseconds).
499    /// This is useful for synchronizing local clocks with the exchange server and logging time drift.
500    ///
501    /// # Errors
502    ///
503    /// Returns an error if the HTTP request fails or if the response body
504    /// cannot be parsed into [`BitmexApiInfo`].
505    pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
506        let response: BitmexApiInfo = self
507            .send_request::<_, ()>(Method::GET, "", None, None, false)
508            .await?;
509        Ok(response.timestamp)
510    }
511
512    /// Get the instrument definition for the specified symbol.
513    ///
514    /// BitMEX responds to `/instrument?symbol=...` with an array, even when
515    /// a single symbol is requested. This helper returns the first element of
516    /// that array and yields `Ok(None)` when the venue returns an empty list
517    /// (e.g. unknown symbol).
518    ///
519    /// # Errors
520    ///
521    /// Returns an error if the request fails or the payload cannot be deserialized.
522    pub async fn get_instrument(
523        &self,
524        symbol: &str,
525    ) -> Result<Option<BitmexInstrument>, BitmexHttpError> {
526        let path = &format!("/instrument?symbol={symbol}");
527        let instruments: Vec<BitmexInstrument> = self
528            .send_request::<_, ()>(Method::GET, path, None, None, false)
529            .await?;
530
531        Ok(instruments.into_iter().next())
532    }
533
534    /// Get user wallet information.
535    ///
536    /// # Errors
537    ///
538    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
539    pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
540        let endpoint = "/user/wallet";
541        self.send_request::<_, ()>(Method::GET, endpoint, None, None, true)
542            .await
543    }
544
545    /// Get user margin information.
546    ///
547    /// # Errors
548    ///
549    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
550    pub async fn get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
551        let path = format!("/user/margin?currency={currency}");
552        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
553            .await
554    }
555
556    /// Get historical trades.
557    ///
558    /// # Errors
559    ///
560    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
561    ///
562    /// # Panics
563    ///
564    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
565    pub async fn get_trades(
566        &self,
567        params: GetTradeParams,
568    ) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
569        self.send_request(Method::GET, "/trade", Some(&params), None, true)
570            .await
571    }
572
573    /// Get bucketed (aggregated) trade data.
574    ///
575    /// # Errors
576    ///
577    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
578    pub async fn get_trade_bucketed(
579        &self,
580        params: GetTradeBucketedParams,
581    ) -> Result<Vec<BitmexTradeBin>, BitmexHttpError> {
582        self.send_request(Method::GET, "/trade/bucketed", Some(&params), None, true)
583            .await
584    }
585
586    /// Get user orders.
587    ///
588    /// # Errors
589    ///
590    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
591    ///
592    /// # Panics
593    ///
594    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
595    pub async fn get_orders(
596        &self,
597        params: GetOrderParams,
598    ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
599        self.send_request(Method::GET, "/order", Some(&params), None, true)
600            .await
601    }
602
603    /// Place a new order.
604    ///
605    /// # Errors
606    ///
607    /// Returns an error if credentials are missing, the request fails, order validation fails, or the API returns an error.
608    ///
609    /// # Panics
610    ///
611    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
612    pub async fn place_order(&self, params: PostOrderParams) -> Result<Value, BitmexHttpError> {
613        // BitMEX spec requires form-encoded body for POST /order
614        let body = serde_urlencoded::to_string(&params)
615            .map_err(|e| {
616                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
617            })?
618            .into_bytes();
619        let path = "/order";
620        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
621            .await
622    }
623
624    /// Cancel user orders.
625    ///
626    /// # Errors
627    ///
628    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
629    ///
630    /// # Panics
631    ///
632    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
633    pub async fn cancel_orders(&self, params: DeleteOrderParams) -> Result<Value, BitmexHttpError> {
634        // BitMEX spec requires form-encoded body for DELETE /order
635        let body = serde_urlencoded::to_string(&params)
636            .map_err(|e| {
637                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
638            })?
639            .into_bytes();
640        let path = "/order";
641        self.send_request::<_, ()>(Method::DELETE, path, None, Some(body), true)
642            .await
643    }
644
645    /// Amend an existing order.
646    ///
647    /// # Errors
648    ///
649    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
650    ///
651    /// # Panics
652    ///
653    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
654    pub async fn amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
655        // BitMEX spec requires form-encoded body for PUT /order
656        let body = serde_urlencoded::to_string(&params)
657            .map_err(|e| {
658                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
659            })?
660            .into_bytes();
661        let path = "/order";
662        self.send_request::<_, ()>(Method::PUT, path, None, Some(body), true)
663            .await
664    }
665
666    /// Cancel all orders.
667    ///
668    /// # Errors
669    ///
670    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
671    ///
672    /// # Panics
673    ///
674    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
675    ///
676    /// # References
677    ///
678    /// <https://www.bitmex.com/api/explorer/#!/Order/Order_cancelAll>
679    pub async fn cancel_all_orders(
680        &self,
681        params: DeleteAllOrdersParams,
682    ) -> Result<Value, BitmexHttpError> {
683        self.send_request(Method::DELETE, "/order/all", Some(&params), None, true)
684            .await
685    }
686
687    /// Get user executions.
688    ///
689    /// # Errors
690    ///
691    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
692    ///
693    /// # Panics
694    ///
695    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
696    pub async fn get_executions(
697        &self,
698        params: GetExecutionParams,
699    ) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
700        let query = serde_urlencoded::to_string(&params).map_err(|e| {
701            BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
702        })?;
703        let path = format!("/execution/tradeHistory?{query}");
704        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
705            .await
706    }
707
708    /// Get user positions.
709    ///
710    /// # Errors
711    ///
712    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
713    ///
714    /// # Panics
715    ///
716    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
717    pub async fn get_positions(
718        &self,
719        params: GetPositionParams,
720    ) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
721        self.send_request(Method::GET, "/position", Some(&params), None, true)
722            .await
723    }
724
725    /// Update position leverage.
726    ///
727    /// # Errors
728    ///
729    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
730    ///
731    /// # Panics
732    ///
733    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
734    pub async fn update_position_leverage(
735        &self,
736        params: PostPositionLeverageParams,
737    ) -> Result<BitmexPosition, BitmexHttpError> {
738        // BitMEX spec requires form-encoded body for POST endpoints
739        let body = serde_urlencoded::to_string(&params)
740            .map_err(|e| {
741                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
742            })?
743            .into_bytes();
744        let path = "/position/leverage";
745        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
746            .await
747    }
748}
749
750/// Provides a HTTP client for connecting to the [BitMEX](https://bitmex.com) REST API.
751///
752/// This is the high-level client that wraps the inner client and provides
753/// Nautilus-specific functionality for trading operations.
754#[derive(Debug)]
755#[cfg_attr(
756    feature = "python",
757    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
758)]
759pub struct BitmexHttpClient {
760    inner: Arc<BitmexRawHttpClient>,
761    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
762    cache_initialized: AtomicBool,
763}
764
765impl Clone for BitmexHttpClient {
766    fn clone(&self) -> Self {
767        let cache_initialized = AtomicBool::new(false);
768
769        let is_initialized = self.cache_initialized.load(Ordering::Acquire);
770        if is_initialized {
771            cache_initialized.store(true, Ordering::Release);
772        }
773
774        Self {
775            inner: self.inner.clone(),
776            instruments_cache: self.instruments_cache.clone(),
777            cache_initialized,
778        }
779    }
780}
781
782impl Default for BitmexHttpClient {
783    fn default() -> Self {
784        Self::new(
785            None,
786            None,
787            None,
788            false,
789            Some(60),
790            None,
791            None,
792            None,
793            None,
794            None,
795            None,
796            None, // proxy_url
797        )
798        .expect("Failed to create default BitmexHttpClient")
799    }
800}
801
802impl BitmexHttpClient {
803    /// Creates a new [`BitmexHttpClient`] instance.
804    ///
805    /// # Errors
806    ///
807    /// Returns an error if the HTTP client cannot be created.
808    #[allow(clippy::too_many_arguments)]
809    pub fn new(
810        base_url: Option<String>,
811        api_key: Option<String>,
812        api_secret: Option<String>,
813        testnet: bool,
814        timeout_secs: Option<u64>,
815        max_retries: Option<u32>,
816        retry_delay_ms: Option<u64>,
817        retry_delay_max_ms: Option<u64>,
818        recv_window_ms: Option<u64>,
819        max_requests_per_second: Option<u32>,
820        max_requests_per_minute: Option<u32>,
821        proxy_url: Option<String>,
822    ) -> Result<Self, BitmexHttpError> {
823        // Determine the base URL
824        let url = base_url.unwrap_or_else(|| {
825            if testnet {
826                BITMEX_HTTP_TESTNET_URL.to_string()
827            } else {
828                BITMEX_HTTP_URL.to_string()
829            }
830        });
831
832        let inner = match (api_key, api_secret) {
833            (Some(key), Some(secret)) => BitmexRawHttpClient::with_credentials(
834                key,
835                secret,
836                url,
837                timeout_secs,
838                max_retries,
839                retry_delay_ms,
840                retry_delay_max_ms,
841                recv_window_ms,
842                max_requests_per_second,
843                max_requests_per_minute,
844                proxy_url,
845            )?,
846            _ => BitmexRawHttpClient::new(
847                Some(url),
848                timeout_secs,
849                max_retries,
850                retry_delay_ms,
851                retry_delay_max_ms,
852                recv_window_ms,
853                max_requests_per_second,
854                max_requests_per_minute,
855                proxy_url,
856            )?,
857        };
858
859        Ok(Self {
860            inner: Arc::new(inner),
861            instruments_cache: Arc::new(DashMap::new()),
862            cache_initialized: AtomicBool::new(false),
863        })
864    }
865
866    /// Creates a new [`BitmexHttpClient`] instance using environment variables and
867    /// the default BitMEX HTTP base URL.
868    ///
869    /// # Errors
870    ///
871    /// Returns an error if required environment variables are not set or invalid.
872    pub fn from_env() -> anyhow::Result<Self> {
873        Self::with_credentials(
874            None, None, None, None, None, None, None, None, None, None, None,
875        )
876        .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
877    }
878
879    /// Creates a new [`BitmexHttpClient`] configured with credentials
880    /// for authenticated requests.
881    ///
882    /// If `api_key` or `api_secret` are `None`, they will be sourced from the
883    /// `BITMEX_API_KEY` and `BITMEX_API_SECRET` environment variables.
884    ///
885    /// # Errors
886    ///
887    /// Returns an error if one credential is provided without the other.
888    #[allow(clippy::too_many_arguments)]
889    pub fn with_credentials(
890        api_key: Option<String>,
891        api_secret: Option<String>,
892        base_url: Option<String>,
893        timeout_secs: Option<u64>,
894        max_retries: Option<u32>,
895        retry_delay_ms: Option<u64>,
896        retry_delay_max_ms: Option<u64>,
897        recv_window_ms: Option<u64>,
898        max_requests_per_second: Option<u32>,
899        max_requests_per_minute: Option<u32>,
900        proxy_url: Option<String>,
901    ) -> anyhow::Result<Self> {
902        // Determine testnet from URL first to select correct environment variables
903        let testnet = base_url.as_ref().is_some_and(|url| url.contains("testnet"));
904
905        // Choose environment variables based on testnet flag
906        let (key_var, secret_var) = if testnet {
907            ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
908        } else {
909            ("BITMEX_API_KEY", "BITMEX_API_SECRET")
910        };
911
912        let api_key = get_or_env_var_opt(api_key, key_var);
913        let api_secret = get_or_env_var_opt(api_secret, secret_var);
914
915        // If we're trying to create an authenticated client, we need both key and secret
916        if api_key.is_some() && api_secret.is_none() {
917            anyhow::bail!("{secret_var} is required when {key_var} is provided");
918        }
919        if api_key.is_none() && api_secret.is_some() {
920            anyhow::bail!("{key_var} is required when {secret_var} is provided");
921        }
922
923        Self::new(
924            base_url,
925            api_key,
926            api_secret,
927            testnet,
928            timeout_secs,
929            max_retries,
930            retry_delay_ms,
931            retry_delay_max_ms,
932            recv_window_ms,
933            max_requests_per_second,
934            max_requests_per_minute,
935            proxy_url,
936        )
937        .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
938    }
939
940    /// Returns the base url being used by the client.
941    #[must_use]
942    pub fn base_url(&self) -> &str {
943        self.inner.base_url.as_str()
944    }
945
946    /// Returns the public API key being used by the client.
947    #[must_use]
948    pub fn api_key(&self) -> Option<&str> {
949        self.inner.credential.as_ref().map(|c| c.api_key.as_str())
950    }
951
952    /// Returns a masked version of the API key for logging purposes.
953    #[must_use]
954    pub fn api_key_masked(&self) -> Option<String> {
955        self.inner.credential.as_ref().map(|c| c.api_key_masked())
956    }
957
958    /// Requests the current server time from BitMEX.
959    ///
960    /// Returns the BitMEX system time as a Unix timestamp in milliseconds.
961    ///
962    /// # Errors
963    ///
964    /// Returns an error if the HTTP request fails or if the response cannot be parsed.
965    pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
966        self.inner.get_server_time().await
967    }
968
969    /// Generates a timestamp for initialization.
970    fn generate_ts_init(&self) -> UnixNanos {
971        get_atomic_clock_realtime().get_time_ns()
972    }
973
974    /// Check if the order has a contingency type that requires linking.
975    fn is_contingent_order(contingency_type: ContingencyType) -> bool {
976        matches!(
977            contingency_type,
978            ContingencyType::Oco | ContingencyType::Oto | ContingencyType::Ouo
979        )
980    }
981
982    /// Check if the order is a parent in contingency relationships.
983    fn is_parent_contingency(contingency_type: ContingencyType) -> bool {
984        matches!(
985            contingency_type,
986            ContingencyType::Oco | ContingencyType::Oto
987        )
988    }
989
990    /// Populate missing `linked_order_ids` for contingency orders by grouping on `order_list_id`.
991    fn populate_linked_order_ids(reports: &mut [OrderStatusReport]) {
992        let mut order_list_groups: HashMap<OrderListId, Vec<ClientOrderId>> = HashMap::new();
993        let mut order_list_parents: HashMap<OrderListId, ClientOrderId> = HashMap::new();
994        let mut prefix_groups: HashMap<String, Vec<ClientOrderId>> = HashMap::new();
995        let mut prefix_parents: HashMap<String, ClientOrderId> = HashMap::new();
996
997        for report in reports.iter() {
998            let Some(client_order_id) = report.client_order_id else {
999                continue;
1000            };
1001
1002            if let Some(order_list_id) = report.order_list_id {
1003                order_list_groups
1004                    .entry(order_list_id)
1005                    .or_default()
1006                    .push(client_order_id);
1007
1008                if Self::is_parent_contingency(report.contingency_type) {
1009                    order_list_parents
1010                        .entry(order_list_id)
1011                        .or_insert(client_order_id);
1012                }
1013            }
1014
1015            if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1016                && Self::is_contingent_order(report.contingency_type)
1017            {
1018                prefix_groups
1019                    .entry(base.to_owned())
1020                    .or_default()
1021                    .push(client_order_id);
1022
1023                if Self::is_parent_contingency(report.contingency_type) {
1024                    prefix_parents
1025                        .entry(base.to_owned())
1026                        .or_insert(client_order_id);
1027                }
1028            }
1029        }
1030
1031        for report in reports.iter_mut() {
1032            let Some(client_order_id) = report.client_order_id else {
1033                continue;
1034            };
1035
1036            if report.linked_order_ids.is_some() {
1037                continue;
1038            }
1039
1040            // Only process contingent orders
1041            if !Self::is_contingent_order(report.contingency_type) {
1042                continue;
1043            }
1044
1045            if let Some(order_list_id) = report.order_list_id
1046                && let Some(group) = order_list_groups.get(&order_list_id)
1047            {
1048                let mut linked: Vec<ClientOrderId> = group
1049                    .iter()
1050                    .copied()
1051                    .filter(|candidate| candidate != &client_order_id)
1052                    .collect();
1053
1054                if !linked.is_empty() {
1055                    if let Some(parent_id) = order_list_parents.get(&order_list_id) {
1056                        if client_order_id != *parent_id {
1057                            linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1058                            report.parent_order_id = Some(*parent_id);
1059                        } else {
1060                            report.parent_order_id = None;
1061                        }
1062                    } else {
1063                        report.parent_order_id = None;
1064                    }
1065
1066                    tracing::trace!(
1067                        client_order_id = ?client_order_id,
1068                        order_list_id = ?order_list_id,
1069                        contingency_type = ?report.contingency_type,
1070                        linked_order_ids = ?linked,
1071                        "BitMEX linked ids sourced from order list id",
1072                    );
1073                    report.linked_order_ids = Some(linked);
1074                    continue;
1075                }
1076
1077                tracing::trace!(
1078                    client_order_id = ?client_order_id,
1079                    order_list_id = ?order_list_id,
1080                    contingency_type = ?report.contingency_type,
1081                    order_list_group = ?group,
1082                    "BitMEX order list id group had no peers",
1083                );
1084                report.parent_order_id = None;
1085            } else if report.order_list_id.is_none() {
1086                report.parent_order_id = None;
1087            }
1088
1089            if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1090                && let Some(group) = prefix_groups.get(base)
1091            {
1092                let mut linked: Vec<ClientOrderId> = group
1093                    .iter()
1094                    .copied()
1095                    .filter(|candidate| candidate != &client_order_id)
1096                    .collect();
1097
1098                if !linked.is_empty() {
1099                    if let Some(parent_id) = prefix_parents.get(base) {
1100                        if client_order_id != *parent_id {
1101                            linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1102                            report.parent_order_id = Some(*parent_id);
1103                        } else {
1104                            report.parent_order_id = None;
1105                        }
1106                    } else {
1107                        report.parent_order_id = None;
1108                    }
1109
1110                    tracing::trace!(
1111                        client_order_id = ?client_order_id,
1112                        contingency_type = ?report.contingency_type,
1113                        base = base,
1114                        linked_order_ids = ?linked,
1115                        "BitMEX linked ids constructed from client order id prefix",
1116                    );
1117                    report.linked_order_ids = Some(linked);
1118                    continue;
1119                }
1120
1121                tracing::trace!(
1122                    client_order_id = ?client_order_id,
1123                    contingency_type = ?report.contingency_type,
1124                    base = base,
1125                    prefix_group = ?group,
1126                    "BitMEX client order id prefix group had no peers",
1127                );
1128                report.parent_order_id = None;
1129            } else if client_order_id.as_str().contains('-') {
1130                report.parent_order_id = None;
1131            }
1132
1133            if Self::is_contingent_order(report.contingency_type) {
1134                tracing::warn!(
1135                    client_order_id = ?report.client_order_id,
1136                    order_list_id = ?report.order_list_id,
1137                    contingency_type = ?report.contingency_type,
1138                    "BitMEX order status report missing linked ids after grouping",
1139                );
1140                report.contingency_type = ContingencyType::NoContingency;
1141                report.parent_order_id = None;
1142            }
1143
1144            report.linked_order_ids = None;
1145        }
1146    }
1147
1148    /// Cancel all pending HTTP requests.
1149    pub fn cancel_all_requests(&self) {
1150        self.inner.cancel_all_requests();
1151    }
1152
1153    /// Get the cancellation token for this client.
1154    pub fn cancellation_token(&self) -> CancellationToken {
1155        self.inner.cancellation_token().clone()
1156    }
1157
1158    /// Caches a single instrument.
1159    ///
1160    /// Any existing instrument with the same symbol will be replaced.
1161    pub fn cache_instrument(&self, instrument: InstrumentAny) {
1162        self.instruments_cache
1163            .insert(instrument.raw_symbol().inner(), instrument);
1164        self.cache_initialized.store(true, Ordering::Release);
1165    }
1166
1167    /// Gets an instrument from the cache by symbol.
1168    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1169        self.instruments_cache
1170            .get(symbol)
1171            .map(|entry| entry.value().clone())
1172    }
1173
1174    /// Request a single instrument and parse it into a Nautilus type.
1175    ///
1176    /// # Errors
1177    ///
1178    /// Returns `Ok(Some(..))` when the venue returns a definition that parses
1179    /// successfully, `Ok(None)` when the instrument is unknown, unsupported, or the payload
1180    /// cannot be converted into a Nautilus `Instrument`.
1181    pub async fn request_instrument(
1182        &self,
1183        instrument_id: InstrumentId,
1184    ) -> anyhow::Result<Option<InstrumentAny>> {
1185        let response = self
1186            .inner
1187            .get_instrument(instrument_id.symbol.as_str())
1188            .await?;
1189
1190        let instrument = match response {
1191            Some(instrument) => instrument,
1192            None => return Ok(None),
1193        };
1194
1195        let ts_init = self.generate_ts_init();
1196
1197        match parse_instrument_any(&instrument, ts_init) {
1198            InstrumentParseResult::Ok(inst) => Ok(Some(*inst)),
1199            InstrumentParseResult::Unsupported {
1200                symbol,
1201                instrument_type,
1202            } => {
1203                tracing::debug!(
1204                    "Instrument {symbol} has unsupported type {instrument_type:?}, returning None"
1205                );
1206                Ok(None)
1207            }
1208            InstrumentParseResult::Failed {
1209                symbol,
1210                instrument_type,
1211                error,
1212            } => {
1213                tracing::error!(
1214                    "Failed to parse instrument {symbol} (type={instrument_type:?}): {error}"
1215                );
1216                Ok(None)
1217            }
1218        }
1219    }
1220
1221    /// Request all available instruments and parse them into Nautilus types.
1222    ///
1223    /// # Errors
1224    ///
1225    /// Returns an error if the HTTP request fails or parsing fails.
1226    pub async fn request_instruments(
1227        &self,
1228        active_only: bool,
1229    ) -> anyhow::Result<Vec<InstrumentAny>> {
1230        let instruments = self.inner.get_instruments(active_only).await?;
1231        let ts_init = self.generate_ts_init();
1232
1233        let mut parsed_instruments = Vec::new();
1234        let mut skipped_count = 0;
1235        let mut failed_count = 0;
1236        let total_count = instruments.len();
1237
1238        for inst in instruments {
1239            match parse_instrument_any(&inst, ts_init) {
1240                InstrumentParseResult::Ok(instrument_any) => {
1241                    parsed_instruments.push(*instrument_any);
1242                }
1243                InstrumentParseResult::Unsupported {
1244                    symbol,
1245                    instrument_type,
1246                } => {
1247                    skipped_count += 1;
1248                    tracing::debug!(
1249                        "Skipping unsupported instrument type: symbol={symbol}, type={instrument_type:?}"
1250                    );
1251                }
1252                InstrumentParseResult::Failed {
1253                    symbol,
1254                    instrument_type,
1255                    error,
1256                } => {
1257                    failed_count += 1;
1258                    tracing::error!(
1259                        "Failed to parse instrument: symbol={symbol}, type={instrument_type:?}, error={error}"
1260                    );
1261                }
1262            }
1263        }
1264
1265        if skipped_count > 0 {
1266            tracing::info!(
1267                "Skipped {skipped_count} unsupported instrument type(s) out of {total_count} total"
1268            );
1269        }
1270
1271        if failed_count > 0 {
1272            tracing::error!(
1273                "Instrument parse failures: {failed_count} failed out of {total_count} total ({} successfully parsed)",
1274                parsed_instruments.len()
1275            );
1276        }
1277
1278        Ok(parsed_instruments)
1279    }
1280
1281    /// Get user wallet information.
1282    ///
1283    /// # Errors
1284    ///
1285    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1286    ///
1287    /// # Panics
1288    ///
1289    /// Panics if the inner mutex is poisoned.
1290    pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
1291        let inner = self.inner.clone();
1292        inner.get_wallet().await
1293    }
1294
1295    /// Get user orders.
1296    ///
1297    /// # Errors
1298    ///
1299    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1300    ///
1301    /// # Panics
1302    ///
1303    /// Panics if the inner mutex is poisoned.
1304    pub async fn get_orders(
1305        &self,
1306        params: GetOrderParams,
1307    ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
1308        let inner = self.inner.clone();
1309        inner.get_orders(params).await
1310    }
1311
1312    /// Get instrument from the instruments cache (if found).
1313    ///
1314    /// # Errors
1315    ///
1316    /// Returns an error if the instrument is not found in the cache.
1317    fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1318        self.get_instrument(&symbol).ok_or_else(|| {
1319            anyhow::anyhow!(
1320                "Instrument {symbol} not found in cache, ensure instruments loaded first"
1321            )
1322        })
1323    }
1324
1325    /// Returns the cached price precision for the given symbol.
1326    ///
1327    /// # Errors
1328    ///
1329    /// Returns an error if the instrument was never cached (for example, if
1330    /// instruments were not loaded prior to use).
1331    pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
1332        self.instrument_from_cache(symbol)
1333            .map(|instrument| instrument.price_precision())
1334    }
1335
1336    /// Get user margin information.
1337    ///
1338    /// # Errors
1339    ///
1340    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1341    pub async fn get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
1342        self.inner
1343            .get_margin(currency)
1344            .await
1345            .map_err(|e| anyhow::anyhow!(e))
1346    }
1347
1348    /// Request account state for the given account.
1349    ///
1350    /// # Errors
1351    ///
1352    /// Returns an error if the HTTP request fails or no account state is returned.
1353    pub async fn request_account_state(
1354        &self,
1355        account_id: AccountId,
1356    ) -> anyhow::Result<AccountState> {
1357        // Get margin data for XBt (Bitcoin) by default
1358        let margin = self
1359            .inner
1360            .get_margin("XBt")
1361            .await
1362            .map_err(|e| anyhow::anyhow!(e))?;
1363
1364        let ts_init =
1365            UnixNanos::from(chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64);
1366
1367        // Convert HTTP Margin to WebSocket MarginMsg for parsing
1368        let margin_msg = BitmexMarginMsg {
1369            account: margin.account,
1370            currency: margin.currency,
1371            risk_limit: margin.risk_limit,
1372            amount: margin.amount,
1373            prev_realised_pnl: margin.prev_realised_pnl,
1374            gross_comm: margin.gross_comm,
1375            gross_open_cost: margin.gross_open_cost,
1376            gross_open_premium: margin.gross_open_premium,
1377            gross_exec_cost: margin.gross_exec_cost,
1378            gross_mark_value: margin.gross_mark_value,
1379            risk_value: margin.risk_value,
1380            init_margin: margin.init_margin,
1381            maint_margin: margin.maint_margin,
1382            target_excess_margin: margin.target_excess_margin,
1383            realised_pnl: margin.realised_pnl,
1384            unrealised_pnl: margin.unrealised_pnl,
1385            wallet_balance: margin.wallet_balance,
1386            margin_balance: margin.margin_balance,
1387            margin_leverage: margin.margin_leverage,
1388            margin_used_pcnt: margin.margin_used_pcnt,
1389            excess_margin: margin.excess_margin,
1390            available_margin: margin.available_margin,
1391            withdrawable_margin: margin.withdrawable_margin,
1392            maker_fee_discount: None, // Not in HTTP response
1393            taker_fee_discount: None, // Not in HTTP response
1394            timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
1395            foreign_margin_balance: None,
1396            foreign_requirement: None,
1397        };
1398
1399        parse_account_state(&margin_msg, account_id, ts_init)
1400    }
1401
1402    /// Submit a new order.
1403    ///
1404    /// # Errors
1405    ///
1406    /// Returns an error if credentials are missing, the request fails, order validation fails,
1407    /// the order is rejected, or the API returns an error.
1408    #[allow(clippy::too_many_arguments)]
1409    pub async fn submit_order(
1410        &self,
1411        instrument_id: InstrumentId,
1412        client_order_id: ClientOrderId,
1413        order_side: OrderSide,
1414        order_type: OrderType,
1415        quantity: Quantity,
1416        time_in_force: TimeInForce,
1417        price: Option<Price>,
1418        trigger_price: Option<Price>,
1419        trigger_type: Option<TriggerType>,
1420        display_qty: Option<Quantity>,
1421        post_only: bool,
1422        reduce_only: bool,
1423        order_list_id: Option<OrderListId>,
1424        contingency_type: Option<ContingencyType>,
1425    ) -> anyhow::Result<OrderStatusReport> {
1426        use crate::common::enums::{
1427            BitmexExecInstruction, BitmexOrderType, BitmexSide, BitmexTimeInForce,
1428        };
1429
1430        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1431
1432        let mut params = super::query::PostOrderParamsBuilder::default();
1433        params.text(NAUTILUS_TRADER);
1434        params.symbol(instrument_id.symbol.as_str());
1435        params.cl_ord_id(client_order_id.as_str());
1436
1437        let side = BitmexSide::try_from_order_side(order_side)?;
1438        params.side(side);
1439
1440        let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
1441        params.ord_type(ord_type);
1442
1443        params.order_qty(quantity_to_u32(&quantity, &instrument));
1444
1445        let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
1446        params.time_in_force(tif);
1447
1448        if let Some(price) = price {
1449            params.price(price.as_f64());
1450        }
1451
1452        if let Some(trigger_price) = trigger_price {
1453            params.stop_px(trigger_price.as_f64());
1454        }
1455
1456        if let Some(display_qty) = display_qty {
1457            params.display_qty(quantity_to_u32(&display_qty, &instrument));
1458        }
1459
1460        if let Some(order_list_id) = order_list_id {
1461            params.cl_ord_link_id(order_list_id.as_str());
1462        }
1463
1464        let mut exec_inst = Vec::new();
1465
1466        if post_only {
1467            exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
1468        }
1469
1470        if reduce_only {
1471            exec_inst.push(BitmexExecInstruction::ReduceOnly);
1472        }
1473
1474        if trigger_price.is_some()
1475            && let Some(trigger_type) = trigger_type
1476        {
1477            match trigger_type {
1478                TriggerType::LastPrice => exec_inst.push(BitmexExecInstruction::LastPrice),
1479                TriggerType::MarkPrice => exec_inst.push(BitmexExecInstruction::MarkPrice),
1480                TriggerType::IndexPrice => exec_inst.push(BitmexExecInstruction::IndexPrice),
1481                _ => {} // Use BitMEX default (LastPrice) for other trigger types
1482            }
1483        }
1484
1485        if !exec_inst.is_empty() {
1486            params.exec_inst(exec_inst);
1487        }
1488
1489        if let Some(contingency_type) = contingency_type {
1490            let bitmex_contingency = BitmexContingencyType::try_from(contingency_type)?;
1491            params.contingency_type(bitmex_contingency);
1492        }
1493
1494        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1495
1496        let response = self.inner.place_order(params).await?;
1497
1498        let order: BitmexOrder = serde_json::from_value(response)?;
1499
1500        if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1501            let reason = order
1502                .ord_rej_reason
1503                .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1504            anyhow::bail!("Order rejected: {reason}");
1505        }
1506
1507        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1508        let ts_init = self.generate_ts_init();
1509
1510        parse_order_status_report(&order, &instrument, ts_init)
1511    }
1512
1513    /// Cancel an order.
1514    ///
1515    /// # Errors
1516    ///
1517    /// Returns an error if:
1518    /// - Credentials are missing.
1519    /// - The request fails.
1520    /// - The order doesn't exist.
1521    /// - The API returns an error.
1522    pub async fn cancel_order(
1523        &self,
1524        instrument_id: InstrumentId,
1525        client_order_id: Option<ClientOrderId>,
1526        venue_order_id: Option<VenueOrderId>,
1527    ) -> anyhow::Result<OrderStatusReport> {
1528        let mut params = super::query::DeleteOrderParamsBuilder::default();
1529        params.text(NAUTILUS_TRADER);
1530
1531        if let Some(venue_order_id) = venue_order_id {
1532            params.order_id(vec![venue_order_id.as_str().to_string()]);
1533        } else if let Some(client_order_id) = client_order_id {
1534            params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
1535        } else {
1536            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1537        }
1538
1539        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1540
1541        let response = self.inner.cancel_orders(params).await?;
1542
1543        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1544        let order = orders
1545            .into_iter()
1546            .next()
1547            .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1548
1549        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1550        let ts_init = self.generate_ts_init();
1551
1552        parse_order_status_report(&order, &instrument, ts_init)
1553    }
1554
1555    /// Cancel multiple orders.
1556    ///
1557    /// # Errors
1558    ///
1559    /// Returns an error if:
1560    /// - Credentials are missing.
1561    /// - The request fails.
1562    /// - The order doesn't exist.
1563    /// - The API returns an error.
1564    pub async fn cancel_orders(
1565        &self,
1566        instrument_id: InstrumentId,
1567        client_order_ids: Option<Vec<ClientOrderId>>,
1568        venue_order_ids: Option<Vec<VenueOrderId>>,
1569    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1570        let mut params = super::query::DeleteOrderParamsBuilder::default();
1571        params.text(NAUTILUS_TRADER);
1572
1573        // BitMEX API requires either client order IDs or venue order IDs, not both
1574        // Prioritize venue order IDs if both are provided
1575        if let Some(venue_order_ids) = venue_order_ids {
1576            if venue_order_ids.is_empty() {
1577                anyhow::bail!("venue_order_ids cannot be empty");
1578            }
1579            params.order_id(
1580                venue_order_ids
1581                    .iter()
1582                    .map(|id| id.to_string())
1583                    .collect::<Vec<_>>(),
1584            );
1585        } else if let Some(client_order_ids) = client_order_ids {
1586            if client_order_ids.is_empty() {
1587                anyhow::bail!("client_order_ids cannot be empty");
1588            }
1589            params.cl_ord_id(
1590                client_order_ids
1591                    .iter()
1592                    .map(|id| id.to_string())
1593                    .collect::<Vec<_>>(),
1594            );
1595        } else {
1596            anyhow::bail!("Either client_order_ids or venue_order_ids must be provided");
1597        }
1598
1599        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1600
1601        let response = self.inner.cancel_orders(params).await?;
1602
1603        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1604
1605        let ts_init = self.generate_ts_init();
1606        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1607
1608        let mut reports = Vec::new();
1609
1610        for order in orders {
1611            reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1612        }
1613
1614        Self::populate_linked_order_ids(&mut reports);
1615
1616        Ok(reports)
1617    }
1618
1619    /// Cancel all orders for an instrument and optionally an order side.
1620    ///
1621    /// # Errors
1622    ///
1623    /// Returns an error if:
1624    /// - Credentials are missing.
1625    /// - The request fails.
1626    /// - The order doesn't exist.
1627    /// - The API returns an error.
1628    pub async fn cancel_all_orders(
1629        &self,
1630        instrument_id: InstrumentId,
1631        order_side: Option<OrderSide>,
1632    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1633        let mut params = DeleteAllOrdersParamsBuilder::default();
1634        params.text(NAUTILUS_TRADER);
1635        params.symbol(instrument_id.symbol.as_str());
1636
1637        if let Some(side) = order_side {
1638            let side = BitmexSide::try_from_order_side(side)?;
1639            params.filter(serde_json::json!({
1640                "side": side
1641            }));
1642        }
1643
1644        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1645
1646        let response = self.inner.cancel_all_orders(params).await?;
1647
1648        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1649
1650        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1651        let ts_init = self.generate_ts_init();
1652
1653        let mut reports = Vec::new();
1654
1655        for order in orders {
1656            reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1657        }
1658
1659        Self::populate_linked_order_ids(&mut reports);
1660
1661        Ok(reports)
1662    }
1663
1664    /// Modify an existing order.
1665    ///
1666    /// # Errors
1667    ///
1668    /// Returns an error if:
1669    /// - Credentials are missing.
1670    /// - The request fails.
1671    /// - The order doesn't exist.
1672    /// - The order is already closed.
1673    /// - The API returns an error.
1674    pub async fn modify_order(
1675        &self,
1676        instrument_id: InstrumentId,
1677        client_order_id: Option<ClientOrderId>,
1678        venue_order_id: Option<VenueOrderId>,
1679        quantity: Option<Quantity>,
1680        price: Option<Price>,
1681        trigger_price: Option<Price>,
1682    ) -> anyhow::Result<OrderStatusReport> {
1683        let mut params = PutOrderParamsBuilder::default();
1684        params.text(NAUTILUS_TRADER);
1685
1686        // Set order ID - prefer venue_order_id if available
1687        if let Some(venue_order_id) = venue_order_id {
1688            params.order_id(venue_order_id.as_str());
1689        } else if let Some(client_order_id) = client_order_id {
1690            params.orig_cl_ord_id(client_order_id.as_str());
1691        } else {
1692            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1693        }
1694
1695        if let Some(quantity) = quantity {
1696            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1697            params.order_qty(quantity_to_u32(&quantity, &instrument));
1698        }
1699
1700        if let Some(price) = price {
1701            params.price(price.as_f64());
1702        }
1703
1704        if let Some(trigger_price) = trigger_price {
1705            params.stop_px(trigger_price.as_f64());
1706        }
1707
1708        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1709
1710        let response = self.inner.amend_order(params).await?;
1711
1712        let order: BitmexOrder = serde_json::from_value(response)?;
1713
1714        if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1715            let reason = order
1716                .ord_rej_reason
1717                .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1718            anyhow::bail!("Order modification rejected: {reason}");
1719        }
1720
1721        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1722        let ts_init = self.generate_ts_init();
1723
1724        parse_order_status_report(&order, &instrument, ts_init)
1725    }
1726
1727    /// Query a single order by client order ID or venue order ID.
1728    ///
1729    /// # Errors
1730    ///
1731    /// Returns an error if:
1732    /// - Credentials are missing.
1733    /// - The request fails.
1734    /// - The API returns an error.
1735    pub async fn query_order(
1736        &self,
1737        instrument_id: InstrumentId,
1738        client_order_id: Option<ClientOrderId>,
1739        venue_order_id: Option<VenueOrderId>,
1740    ) -> anyhow::Result<Option<OrderStatusReport>> {
1741        let mut params = GetOrderParamsBuilder::default();
1742
1743        let filter_json = if let Some(client_order_id) = client_order_id {
1744            serde_json::json!({
1745                "clOrdID": client_order_id.to_string()
1746            })
1747        } else if let Some(venue_order_id) = venue_order_id {
1748            serde_json::json!({
1749                "orderID": venue_order_id.to_string()
1750            })
1751        } else {
1752            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1753        };
1754
1755        params.filter(filter_json);
1756        params.count(1); // Only need one order
1757
1758        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1759
1760        let response = self.inner.get_orders(params).await?;
1761
1762        if response.is_empty() {
1763            return Ok(None);
1764        }
1765
1766        let order = &response[0];
1767
1768        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1769        let ts_init = self.generate_ts_init();
1770
1771        let report = parse_order_status_report(order, &instrument, ts_init)?;
1772
1773        Ok(Some(report))
1774    }
1775
1776    /// Request a single order status report.
1777    ///
1778    /// # Errors
1779    ///
1780    /// Returns an error if:
1781    /// - Credentials are missing.
1782    /// - The request fails.
1783    /// - The API returns an error.
1784    pub async fn request_order_status_report(
1785        &self,
1786        instrument_id: InstrumentId,
1787        client_order_id: Option<ClientOrderId>,
1788        venue_order_id: Option<VenueOrderId>,
1789    ) -> anyhow::Result<OrderStatusReport> {
1790        let mut params = GetOrderParamsBuilder::default();
1791        params.symbol(instrument_id.symbol.as_str());
1792
1793        if let Some(venue_order_id) = venue_order_id {
1794            params.filter(serde_json::json!({
1795                "orderID": venue_order_id.as_str()
1796            }));
1797        } else if let Some(client_order_id) = client_order_id {
1798            params.filter(serde_json::json!({
1799                "clOrdID": client_order_id.as_str()
1800            }));
1801        }
1802
1803        params.count(1i32);
1804        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1805
1806        let response = self.inner.get_orders(params).await?;
1807
1808        let order = response
1809            .into_iter()
1810            .next()
1811            .ok_or_else(|| anyhow::anyhow!("Order not found"))?;
1812
1813        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1814        let ts_init = self.generate_ts_init();
1815
1816        parse_order_status_report(&order, &instrument, ts_init)
1817    }
1818
1819    /// Request multiple order status reports.
1820    ///
1821    /// # Errors
1822    ///
1823    /// Returns an error if:
1824    /// - Credentials are missing.
1825    /// - The request fails.
1826    /// - The API returns an error.
1827    pub async fn request_order_status_reports(
1828        &self,
1829        instrument_id: Option<InstrumentId>,
1830        open_only: bool,
1831        limit: Option<u32>,
1832    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1833        let mut params = GetOrderParamsBuilder::default();
1834
1835        if let Some(instrument_id) = &instrument_id {
1836            params.symbol(instrument_id.symbol.as_str());
1837        }
1838
1839        if open_only {
1840            params.filter(serde_json::json!({
1841                "open": true
1842            }));
1843        }
1844
1845        if let Some(limit) = limit {
1846            params.count(limit as i32);
1847        } else {
1848            params.count(500); // Default count to avoid empty query
1849        }
1850
1851        params.reverse(true); // Get newest orders first
1852
1853        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1854
1855        let response = self.inner.get_orders(params).await?;
1856
1857        let ts_init = self.generate_ts_init();
1858
1859        let mut reports = Vec::new();
1860
1861        for order in response {
1862            // Skip orders without symbol (can happen with query responses)
1863            let Some(symbol) = order.symbol else {
1864                tracing::warn!("Order response missing symbol, skipping");
1865                continue;
1866            };
1867
1868            let Ok(instrument) = self.instrument_from_cache(symbol) else {
1869                tracing::debug!(
1870                    symbol = %symbol,
1871                    "Skipping order report for instrument not in cache"
1872                );
1873                continue;
1874            };
1875
1876            match parse_order_status_report(&order, &instrument, ts_init) {
1877                Ok(report) => reports.push(report),
1878                Err(e) => tracing::error!("Failed to parse order status report: {e}"),
1879            }
1880        }
1881
1882        Self::populate_linked_order_ids(&mut reports);
1883
1884        Ok(reports)
1885    }
1886
1887    /// Request trades for the given instrument.
1888    ///
1889    /// # Errors
1890    ///
1891    /// Returns an error if the HTTP request fails or parsing fails.
1892    pub async fn request_trades(
1893        &self,
1894        instrument_id: InstrumentId,
1895        start: Option<DateTime<Utc>>,
1896        end: Option<DateTime<Utc>>,
1897        limit: Option<u32>,
1898    ) -> anyhow::Result<Vec<TradeTick>> {
1899        let mut params = GetTradeParamsBuilder::default();
1900        params.symbol(instrument_id.symbol.as_str());
1901
1902        if let Some(start) = start {
1903            params.start_time(start);
1904        }
1905
1906        if let Some(end) = end {
1907            params.end_time(end);
1908        }
1909
1910        if let (Some(start), Some(end)) = (start, end) {
1911            anyhow::ensure!(
1912                start < end,
1913                "Invalid time range: start={start:?} end={end:?}",
1914            );
1915        }
1916
1917        if let Some(limit) = limit {
1918            let clamped_limit = limit.min(1000);
1919            if limit > 1000 {
1920                tracing::warn!(
1921                    limit,
1922                    clamped_limit,
1923                    "BitMEX trade request limit exceeds venue maximum; clamping",
1924                );
1925            }
1926            params.count(i32::try_from(clamped_limit).unwrap_or(1000));
1927        }
1928        params.reverse(false);
1929        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1930
1931        let response = self.inner.get_trades(params).await?;
1932
1933        let ts_init = self.generate_ts_init();
1934
1935        let mut parsed_trades = Vec::new();
1936
1937        for trade in response {
1938            if let Some(start) = start
1939                && trade.timestamp < start
1940            {
1941                continue;
1942            }
1943
1944            if let Some(end) = end
1945                && trade.timestamp > end
1946            {
1947                continue;
1948            }
1949
1950            let price_precision = self.get_price_precision(trade.symbol)?;
1951
1952            match parse_trade(trade, price_precision, ts_init) {
1953                Ok(trade) => parsed_trades.push(trade),
1954                Err(e) => tracing::error!("Failed to parse trade: {e}"),
1955            }
1956        }
1957
1958        Ok(parsed_trades)
1959    }
1960
1961    /// Request bars for the given bar type.
1962    ///
1963    /// # Errors
1964    ///
1965    /// Returns an error if the HTTP request fails, parsing fails, or the bar specification is
1966    /// unsupported by BitMEX.
1967    pub async fn request_bars(
1968        &self,
1969        mut bar_type: BarType,
1970        start: Option<DateTime<Utc>>,
1971        end: Option<DateTime<Utc>>,
1972        limit: Option<u32>,
1973        partial: bool,
1974    ) -> anyhow::Result<Vec<Bar>> {
1975        bar_type = bar_type.standard();
1976
1977        anyhow::ensure!(
1978            bar_type.aggregation_source() == AggregationSource::External,
1979            "Only EXTERNAL aggregation bars are supported"
1980        );
1981        anyhow::ensure!(
1982            bar_type.spec().price_type == PriceType::Last,
1983            "Only LAST price type bars are supported"
1984        );
1985        if let (Some(start), Some(end)) = (start, end) {
1986            anyhow::ensure!(
1987                start < end,
1988                "Invalid time range: start={start:?} end={end:?}"
1989            );
1990        }
1991
1992        let spec = bar_type.spec();
1993        let bin_size = match (spec.aggregation, spec.step.get()) {
1994            (BarAggregation::Minute, 1) => "1m",
1995            (BarAggregation::Minute, 5) => "5m",
1996            (BarAggregation::Hour, 1) => "1h",
1997            (BarAggregation::Day, 1) => "1d",
1998            _ => anyhow::bail!(
1999                "BitMEX does not support {}-{:?}-{:?} bars",
2000                spec.step.get(),
2001                spec.aggregation,
2002                spec.price_type,
2003            ),
2004        };
2005
2006        let instrument_id = bar_type.instrument_id();
2007        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2008
2009        let mut params = GetTradeBucketedParamsBuilder::default();
2010        params.symbol(instrument_id.symbol.as_str());
2011        params.bin_size(bin_size);
2012        if partial {
2013            params.partial(true);
2014        }
2015        if let Some(start) = start {
2016            params.start_time(start);
2017        }
2018        if let Some(end) = end {
2019            params.end_time(end);
2020        }
2021        if let Some(limit) = limit {
2022            let clamped_limit = limit.min(1000);
2023            if limit > 1000 {
2024                tracing::warn!(
2025                    limit,
2026                    clamped_limit,
2027                    "BitMEX bar request limit exceeds venue maximum; clamping",
2028                );
2029            }
2030            params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2031        }
2032        params.reverse(false);
2033        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2034
2035        let response = self.inner.get_trade_bucketed(params).await?;
2036        let ts_init = self.generate_ts_init();
2037        let mut bars = Vec::new();
2038
2039        for bin in response {
2040            if let Some(start) = start
2041                && bin.timestamp < start
2042            {
2043                continue;
2044            }
2045            if let Some(end) = end
2046                && bin.timestamp > end
2047            {
2048                continue;
2049            }
2050            if bin.symbol != instrument_id.symbol.inner() {
2051                tracing::warn!(
2052                    symbol = %bin.symbol,
2053                    expected = %instrument_id.symbol,
2054                    "Skipping trade bin for unexpected symbol",
2055                );
2056                continue;
2057            }
2058
2059            match parse_trade_bin(bin, &instrument, &bar_type, ts_init) {
2060                Ok(bar) => bars.push(bar),
2061                Err(e) => tracing::warn!("Failed to parse trade bin: {e}"),
2062            }
2063        }
2064
2065        Ok(bars)
2066    }
2067
2068    /// Request fill reports for the given instrument.
2069    ///
2070    /// # Errors
2071    ///
2072    /// Returns an error if the HTTP request fails or parsing fails.
2073    pub async fn request_fill_reports(
2074        &self,
2075        instrument_id: Option<InstrumentId>,
2076        limit: Option<u32>,
2077    ) -> anyhow::Result<Vec<FillReport>> {
2078        let mut params = GetExecutionParamsBuilder::default();
2079        if let Some(instrument_id) = instrument_id {
2080            params.symbol(instrument_id.symbol.as_str());
2081        }
2082        if let Some(limit) = limit {
2083            params.count(limit as i32);
2084        } else {
2085            params.count(500); // Default count
2086        }
2087        params.reverse(true); // Get newest fills first
2088
2089        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2090
2091        let response = self.inner.get_executions(params).await?;
2092
2093        let ts_init = self.generate_ts_init();
2094
2095        let mut reports = Vec::new();
2096
2097        for exec in response {
2098            // Skip executions without symbol (e.g., CancelReject)
2099            let Some(symbol) = exec.symbol else {
2100                tracing::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
2101                continue;
2102            };
2103            let symbol_str = symbol.to_string();
2104
2105            let instrument = match self.instrument_from_cache(symbol) {
2106                Ok(instrument) => instrument,
2107                Err(e) => {
2108                    tracing::error!(symbol = %symbol_str, "Instrument not found in cache for execution parsing: {e}");
2109                    continue;
2110                }
2111            };
2112
2113            match parse_fill_report(exec, &instrument, ts_init) {
2114                Ok(report) => reports.push(report),
2115                Err(e) => {
2116                    // Log at debug level for expected skip cases
2117                    let error_msg = e.to_string();
2118                    if error_msg.starts_with("Skipping non-trade execution")
2119                        || error_msg.starts_with("Skipping execution without order_id")
2120                    {
2121                        tracing::debug!("{e}");
2122                    } else {
2123                        tracing::error!("Failed to parse fill report: {e}");
2124                    }
2125                }
2126            }
2127        }
2128
2129        Ok(reports)
2130    }
2131
2132    /// Request position reports.
2133    ///
2134    /// # Errors
2135    ///
2136    /// Returns an error if the HTTP request fails or parsing fails.
2137    pub async fn request_position_status_reports(
2138        &self,
2139    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2140        let params = GetPositionParamsBuilder::default()
2141            .count(500) // Default count
2142            .build()
2143            .map_err(|e| anyhow::anyhow!(e))?;
2144
2145        let response = self.inner.get_positions(params).await?;
2146
2147        let ts_init = self.generate_ts_init();
2148
2149        let mut reports = Vec::new();
2150
2151        for pos in response {
2152            let symbol = pos.symbol;
2153            let instrument = match self.instrument_from_cache(symbol) {
2154                Ok(instrument) => instrument,
2155                Err(e) => {
2156                    tracing::error!(
2157                        symbol = pos.symbol.as_str(),
2158                        "Instrument not found in cache for position parsing: {e}"
2159                    );
2160                    continue;
2161                }
2162            };
2163
2164            match parse_position_report(pos, &instrument, ts_init) {
2165                Ok(report) => reports.push(report),
2166                Err(e) => tracing::error!("Failed to parse position report: {e}"),
2167            }
2168        }
2169
2170        Ok(reports)
2171    }
2172
2173    /// Update position leverage.
2174    ///
2175    /// # Errors
2176    ///
2177    /// - Credentials are missing.
2178    /// - The request fails.
2179    /// - The API returns an error.
2180    pub async fn update_position_leverage(
2181        &self,
2182        symbol: &str,
2183        leverage: f64,
2184    ) -> anyhow::Result<PositionStatusReport> {
2185        let params = PostPositionLeverageParams {
2186            symbol: symbol.to_string(),
2187            leverage,
2188            target_account_id: None,
2189        };
2190
2191        let response = self.inner.update_position_leverage(params).await?;
2192
2193        let instrument = self.instrument_from_cache(Ustr::from(symbol))?;
2194        let ts_init = self.generate_ts_init();
2195
2196        parse_position_report(response, &instrument, ts_init)
2197    }
2198}
2199
2200#[cfg(test)]
2201mod tests {
2202    use nautilus_core::UUID4;
2203    use nautilus_model::enums::OrderStatus;
2204    use rstest::rstest;
2205    use serde_json::json;
2206
2207    use super::*;
2208
2209    fn build_report(
2210        client_order_id: &str,
2211        venue_order_id: &str,
2212        contingency_type: ContingencyType,
2213        order_list_id: Option<&str>,
2214    ) -> OrderStatusReport {
2215        let mut report = OrderStatusReport::new(
2216            AccountId::from("BITMEX-1"),
2217            InstrumentId::from("XBTUSD.BITMEX"),
2218            Some(ClientOrderId::from(client_order_id)),
2219            VenueOrderId::from(venue_order_id),
2220            OrderSide::Buy,
2221            OrderType::Limit,
2222            TimeInForce::Gtc,
2223            OrderStatus::Accepted,
2224            Quantity::new(100.0, 0),
2225            Quantity::default(),
2226            UnixNanos::from(1_u64),
2227            UnixNanos::from(1_u64),
2228            UnixNanos::from(1_u64),
2229            Some(UUID4::new()),
2230        );
2231
2232        if let Some(id) = order_list_id {
2233            report = report.with_order_list_id(OrderListId::from(id));
2234        }
2235
2236        report.with_contingency_type(contingency_type)
2237    }
2238
2239    #[rstest]
2240    fn test_sign_request_generates_correct_headers() {
2241        let client = BitmexRawHttpClient::with_credentials(
2242            "test_api_key".to_string(),
2243            "test_api_secret".to_string(),
2244            "http://localhost:8080".to_string(),
2245            Some(60),
2246            None, // max_retries
2247            None, // retry_delay_ms
2248            None, // retry_delay_max_ms
2249            None, // recv_window_ms
2250            None, // max_requests_per_second
2251            None, // max_requests_per_minute
2252            None, // proxy_url
2253        )
2254        .expect("Failed to create test client");
2255
2256        let headers = client
2257            .sign_request(&Method::GET, "/api/v1/order", None)
2258            .unwrap();
2259
2260        assert!(headers.contains_key("api-key"));
2261        assert!(headers.contains_key("api-signature"));
2262        assert!(headers.contains_key("api-expires"));
2263        assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
2264    }
2265
2266    #[rstest]
2267    fn test_sign_request_with_body() {
2268        let client = BitmexRawHttpClient::with_credentials(
2269            "test_api_key".to_string(),
2270            "test_api_secret".to_string(),
2271            "http://localhost:8080".to_string(),
2272            Some(60),
2273            None, // max_retries
2274            None, // retry_delay_ms
2275            None, // retry_delay_max_ms
2276            None, // recv_window_ms
2277            None, // max_requests_per_second
2278            None, // max_requests_per_minute
2279            None, // proxy_url
2280        )
2281        .expect("Failed to create test client");
2282
2283        let body = json!({"symbol": "XBTUSD", "orderQty": 100});
2284        let body_bytes = serde_json::to_vec(&body).unwrap();
2285
2286        let headers_without_body = client
2287            .sign_request(&Method::POST, "/api/v1/order", None)
2288            .unwrap();
2289        let headers_with_body = client
2290            .sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
2291            .unwrap();
2292
2293        // Signatures should be different when body is included
2294        assert_ne!(
2295            headers_without_body.get("api-signature").unwrap(),
2296            headers_with_body.get("api-signature").unwrap()
2297        );
2298    }
2299
2300    #[rstest]
2301    fn test_sign_request_uses_custom_recv_window() {
2302        let client_default = BitmexRawHttpClient::with_credentials(
2303            "test_api_key".to_string(),
2304            "test_api_secret".to_string(),
2305            "http://localhost:8080".to_string(),
2306            Some(60),
2307            None,
2308            None,
2309            None,
2310            None, // Use default recv_window_ms (10000ms = 10s)
2311            None, // max_requests_per_second
2312            None, // max_requests_per_minute
2313            None, // proxy_url
2314        )
2315        .expect("Failed to create test client");
2316
2317        let client_custom = BitmexRawHttpClient::with_credentials(
2318            "test_api_key".to_string(),
2319            "test_api_secret".to_string(),
2320            "http://localhost:8080".to_string(),
2321            Some(60),
2322            None,
2323            None,
2324            None,
2325            Some(30_000), // 30 seconds
2326            None,         // max_requests_per_second
2327            None,         // max_requests_per_minute
2328            None,         // proxy_url
2329        )
2330        .expect("Failed to create test client");
2331
2332        let headers_default = client_default
2333            .sign_request(&Method::GET, "/api/v1/order", None)
2334            .unwrap();
2335        let headers_custom = client_custom
2336            .sign_request(&Method::GET, "/api/v1/order", None)
2337            .unwrap();
2338
2339        // Parse expires timestamps
2340        let expires_default: i64 = headers_default.get("api-expires").unwrap().parse().unwrap();
2341        let expires_custom: i64 = headers_custom.get("api-expires").unwrap().parse().unwrap();
2342
2343        // Verify both are valid future timestamps
2344        let now = Utc::now().timestamp();
2345        assert!(expires_default > now);
2346        assert!(expires_custom > now);
2347
2348        // Custom window should be greater than default
2349        assert!(expires_custom > expires_default);
2350
2351        // The difference should be approximately 20 seconds (30s - 10s)
2352        // Allow wider tolerance for delays between calls on slow CI runners
2353        let diff = expires_custom - expires_default;
2354        assert!((18..=25).contains(&diff));
2355    }
2356
2357    #[rstest]
2358    fn test_populate_linked_order_ids_from_order_list() {
2359        let base = "O-20250922-002219-001-000";
2360        let entry = format!("{base}-1");
2361        let stop = format!("{base}-2");
2362        let take = format!("{base}-3");
2363
2364        let mut reports = vec![
2365            build_report(&entry, "V-1", ContingencyType::Oto, Some("OL-1")),
2366            build_report(&stop, "V-2", ContingencyType::Ouo, Some("OL-1")),
2367            build_report(&take, "V-3", ContingencyType::Ouo, Some("OL-1")),
2368        ];
2369
2370        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2371
2372        assert_eq!(
2373            reports[0].linked_order_ids,
2374            Some(vec![
2375                ClientOrderId::from(stop.as_str()),
2376                ClientOrderId::from(take.as_str()),
2377            ]),
2378        );
2379        assert_eq!(
2380            reports[1].linked_order_ids,
2381            Some(vec![
2382                ClientOrderId::from(entry.as_str()),
2383                ClientOrderId::from(take.as_str()),
2384            ]),
2385        );
2386        assert_eq!(
2387            reports[2].linked_order_ids,
2388            Some(vec![
2389                ClientOrderId::from(entry.as_str()),
2390                ClientOrderId::from(stop.as_str()),
2391            ]),
2392        );
2393    }
2394
2395    #[rstest]
2396    fn test_populate_linked_order_ids_from_id_prefix() {
2397        let base = "O-20250922-002220-001-000";
2398        let entry = format!("{base}-1");
2399        let stop = format!("{base}-2");
2400        let take = format!("{base}-3");
2401
2402        let mut reports = vec![
2403            build_report(&entry, "V-1", ContingencyType::Oto, None),
2404            build_report(&stop, "V-2", ContingencyType::Ouo, None),
2405            build_report(&take, "V-3", ContingencyType::Ouo, None),
2406        ];
2407
2408        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2409
2410        assert_eq!(
2411            reports[0].linked_order_ids,
2412            Some(vec![
2413                ClientOrderId::from(stop.as_str()),
2414                ClientOrderId::from(take.as_str()),
2415            ]),
2416        );
2417        assert_eq!(
2418            reports[1].linked_order_ids,
2419            Some(vec![
2420                ClientOrderId::from(entry.as_str()),
2421                ClientOrderId::from(take.as_str()),
2422            ]),
2423        );
2424        assert_eq!(
2425            reports[2].linked_order_ids,
2426            Some(vec![
2427                ClientOrderId::from(entry.as_str()),
2428                ClientOrderId::from(stop.as_str()),
2429            ]),
2430        );
2431    }
2432
2433    #[rstest]
2434    fn test_populate_linked_order_ids_respects_non_contingent_orders() {
2435        let base = "O-20250922-002221-001-000";
2436        let entry = format!("{base}-1");
2437        let passive = format!("{base}-2");
2438
2439        let mut reports = vec![
2440            build_report(&entry, "V-1", ContingencyType::NoContingency, None),
2441            build_report(&passive, "V-2", ContingencyType::Ouo, None),
2442        ];
2443
2444        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2445
2446        // Non-contingent orders should not be linked
2447        assert!(reports[0].linked_order_ids.is_none());
2448
2449        // A contingent order with no other contingent peers should have contingency reset
2450        assert!(reports[1].linked_order_ids.is_none());
2451        assert_eq!(reports[1].contingency_type, ContingencyType::NoContingency);
2452    }
2453}