nautilus_bitmex/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the HTTP client integration for the [BitMEX](https://bitmex.com) REST API.
17//!
18//! This module defines and implements a [`BitmexHttpClient`] for
19//! sending requests to various BitMEX endpoints. It handles request signing
20//! (when credentials are provided), constructs valid HTTP requests
21//! using the [`HttpClient`], and parses the responses back into structured data or a [`BitmexHttpError`].
22//!
23//! BitMEX API reference <https://www.bitmex.com/api/explorer/#/default>.
24
25use std::{
26    collections::HashMap,
27    num::NonZeroU32,
28    sync::{Arc, Mutex},
29};
30
31use ahash::AHashMap;
32use chrono::{DateTime, Utc};
33use nautilus_core::{
34    MUTEX_POISONED, UnixNanos,
35    consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
36    env::get_env_var,
37    time::get_atomic_clock_realtime,
38};
39use nautilus_model::{
40    data::{Bar, BarType, TradeTick},
41    enums::{
42        AggregationSource, BarAggregation, ContingencyType, OrderSide, OrderType, PriceType,
43        TimeInForce, TriggerType,
44    },
45    events::AccountState,
46    identifiers::{AccountId, ClientOrderId, InstrumentId, OrderListId, VenueOrderId},
47    instruments::{Instrument as InstrumentTrait, InstrumentAny},
48    reports::{FillReport, OrderStatusReport, PositionStatusReport},
49    types::{Price, Quantity},
50};
51use nautilus_network::{
52    http::HttpClient,
53    ratelimiter::quota::Quota,
54    retry::{RetryConfig, RetryManager},
55};
56use reqwest::{Method, StatusCode, header::USER_AGENT};
57use serde::{Deserialize, Serialize, de::DeserializeOwned};
58use serde_json::Value;
59use tokio_util::sync::CancellationToken;
60use ustr::Ustr;
61
62use super::{
63    error::{BitmexErrorResponse, BitmexHttpError},
64    models::{
65        BitmexApiInfo, BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder,
66        BitmexPosition, BitmexTrade, BitmexTradeBin, BitmexWallet,
67    },
68    query::{
69        DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
70        GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeBucketedParams,
71        GetTradeBucketedParamsBuilder, GetTradeParams, GetTradeParamsBuilder, PostOrderParams,
72        PostPositionLeverageParams, PutOrderParams,
73    },
74};
75use crate::{
76    common::{
77        consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL},
78        credential::Credential,
79        enums::{BitmexContingencyType, BitmexOrderStatus, BitmexSide},
80        parse::{parse_account_state, quantity_to_u32},
81    },
82    http::{
83        parse::{
84            parse_fill_report, parse_instrument_any, parse_order_status_report,
85            parse_position_report, parse_trade, parse_trade_bin,
86        },
87        query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
88    },
89    websocket::messages::BitmexMarginMsg,
90};
91
92/// Default BitMEX REST API rate limits.
93///
94/// BitMEX implements a dual-layer rate limiting system:
95/// - Primary limit: 120 requests per minute for authenticated users (30 for unauthenticated).
96/// - Secondary limit: 10 requests per second burst limit for specific endpoints.
97const BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 10;
98const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED: u32 = 120;
99const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED: u32 = 30;
100
101const BITMEX_GLOBAL_RATE_KEY: &str = "bitmex:global";
102const BITMEX_MINUTE_RATE_KEY: &str = "bitmex:minute";
103
104/// Represents a BitMEX HTTP response.
105#[derive(Debug, Serialize, Deserialize)]
106pub struct BitmexResponse<T> {
107    /// The typed data returned by the BitMEX endpoint.
108    pub data: Vec<T>,
109}
110
111/// Provides a lower-level HTTP client for connecting to the [BitMEX](https://bitmex.com) REST API.
112///
113/// This client wraps the underlying [`HttpClient`] to handle functionality
114/// specific to BitMEX, such as request signing (for authenticated endpoints),
115/// forming request URLs, and deserializing responses into specific data models.
116///
117/// # Connection Management
118///
119/// The client uses HTTP keep-alive for connection pooling with a 90-second idle timeout,
120/// which matches BitMEX's server-side keep-alive timeout. Connections are automatically
121/// reused for subsequent requests to minimize latency.
122///
123/// # Rate Limiting
124///
125/// BitMEX enforces the following rate limits:
126/// - 120 requests per minute for authenticated users (30 for unauthenticated).
127/// - 10 requests per second burst limit for certain endpoints (order management).
128///
129/// The client automatically respects these limits through the configured quota.
130#[derive(Debug, Clone)]
131pub struct BitmexHttpInnerClient {
132    base_url: String,
133    client: HttpClient,
134    credential: Option<Credential>,
135    recv_window_ms: u64,
136    retry_manager: RetryManager<BitmexHttpError>,
137    cancellation_token: CancellationToken,
138}
139
140impl Default for BitmexHttpInnerClient {
141    fn default() -> Self {
142        Self::new(None, Some(60), None, None, None, None, None, None)
143            .expect("Failed to create default BitmexHttpInnerClient")
144    }
145}
146
147impl BitmexHttpInnerClient {
148    /// Cancel all pending HTTP requests.
149    pub fn cancel_all_requests(&self) {
150        self.cancellation_token.cancel();
151    }
152
153    /// Get the cancellation token for this client.
154    pub fn cancellation_token(&self) -> &CancellationToken {
155        &self.cancellation_token
156    }
157    /// Creates a new [`BitmexHttpInnerClient`] using the default BitMEX HTTP URL,
158    /// optionally overridden with a custom base URL.
159    ///
160    /// This version of the client has **no credentials**, so it can only
161    /// call publicly accessible endpoints.
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if the retry manager cannot be created.
166    #[allow(clippy::too_many_arguments)]
167    pub fn new(
168        base_url: Option<String>,
169        timeout_secs: Option<u64>,
170        max_retries: Option<u32>,
171        retry_delay_ms: Option<u64>,
172        retry_delay_max_ms: Option<u64>,
173        recv_window_ms: Option<u64>,
174        max_requests_per_second: Option<u32>,
175        max_requests_per_minute: Option<u32>,
176    ) -> Result<Self, BitmexHttpError> {
177        let retry_config = RetryConfig {
178            max_retries: max_retries.unwrap_or(3),
179            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
180            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
181            backoff_factor: 2.0,
182            jitter_ms: 1000,
183            operation_timeout_ms: Some(60_000),
184            immediate_first: false,
185            max_elapsed_ms: Some(180_000),
186        };
187
188        let retry_manager = RetryManager::new(retry_config).map_err(|e| {
189            BitmexHttpError::NetworkError(format!("Failed to create retry manager: {e}"))
190        })?;
191
192        let max_req_per_sec =
193            max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
194        let max_req_per_min =
195            max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED);
196
197        Ok(Self {
198            base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
199            client: HttpClient::new(
200                Self::default_headers(),
201                vec![],
202                Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
203                Some(Self::default_quota(max_req_per_sec)),
204                timeout_secs,
205            ),
206            credential: None,
207            recv_window_ms: recv_window_ms.unwrap_or(10_000),
208            retry_manager,
209            cancellation_token: CancellationToken::new(),
210        })
211    }
212
213    /// Creates a new [`BitmexHttpInnerClient`] configured with credentials
214    /// for authenticated requests, optionally using a custom base URL.
215    ///
216    /// # Errors
217    ///
218    /// Returns an error if the retry manager cannot be created.
219    #[allow(clippy::too_many_arguments)]
220    pub fn with_credentials(
221        api_key: String,
222        api_secret: String,
223        base_url: String,
224        timeout_secs: Option<u64>,
225        max_retries: Option<u32>,
226        retry_delay_ms: Option<u64>,
227        retry_delay_max_ms: Option<u64>,
228        recv_window_ms: Option<u64>,
229        max_requests_per_second: Option<u32>,
230        max_requests_per_minute: Option<u32>,
231    ) -> Result<Self, BitmexHttpError> {
232        let retry_config = RetryConfig {
233            max_retries: max_retries.unwrap_or(3),
234            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
235            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
236            backoff_factor: 2.0,
237            jitter_ms: 1000,
238            operation_timeout_ms: Some(60_000),
239            immediate_first: false,
240            max_elapsed_ms: Some(180_000),
241        };
242
243        let retry_manager = RetryManager::new(retry_config).map_err(|e| {
244            BitmexHttpError::NetworkError(format!("Failed to create retry manager: {e}"))
245        })?;
246
247        let max_req_per_sec =
248            max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
249        let max_req_per_min =
250            max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED);
251
252        Ok(Self {
253            base_url,
254            client: HttpClient::new(
255                Self::default_headers(),
256                vec![],
257                Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
258                Some(Self::default_quota(max_req_per_sec)),
259                timeout_secs,
260            ),
261            credential: Some(Credential::new(api_key, api_secret)),
262            recv_window_ms: recv_window_ms.unwrap_or(10_000),
263            retry_manager,
264            cancellation_token: CancellationToken::new(),
265        })
266    }
267
268    fn default_headers() -> HashMap<String, String> {
269        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
270    }
271
272    fn default_quota(max_requests_per_second: u32) -> Quota {
273        Quota::per_second(
274            NonZeroU32::new(max_requests_per_second)
275                .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
276        )
277    }
278
279    fn rate_limiter_quotas(
280        max_requests_per_second: u32,
281        max_requests_per_minute: u32,
282    ) -> Vec<(String, Quota)> {
283        let per_sec_quota = Quota::per_second(
284            NonZeroU32::new(max_requests_per_second)
285                .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
286        );
287        let per_min_quota =
288            Quota::per_minute(NonZeroU32::new(max_requests_per_minute).unwrap_or_else(|| {
289                NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED).unwrap()
290            }));
291
292        vec![
293            (BITMEX_GLOBAL_RATE_KEY.to_string(), per_sec_quota),
294            (BITMEX_MINUTE_RATE_KEY.to_string(), per_min_quota),
295        ]
296    }
297
298    fn rate_limit_keys() -> Vec<Ustr> {
299        vec![
300            Ustr::from(BITMEX_GLOBAL_RATE_KEY),
301            Ustr::from(BITMEX_MINUTE_RATE_KEY),
302        ]
303    }
304
305    fn sign_request(
306        &self,
307        method: &Method,
308        endpoint: &str,
309        body: Option<&[u8]>,
310    ) -> Result<HashMap<String, String>, BitmexHttpError> {
311        let credential = self
312            .credential
313            .as_ref()
314            .ok_or(BitmexHttpError::MissingCredentials)?;
315
316        let expires = Utc::now().timestamp() + (self.recv_window_ms / 1000) as i64;
317        let body_str = body.and_then(|b| std::str::from_utf8(b).ok()).unwrap_or("");
318
319        let full_path = if endpoint.starts_with("/api/v1") {
320            endpoint.to_string()
321        } else {
322            format!("/api/v1{endpoint}")
323        };
324
325        let signature = credential.sign(method.as_str(), &full_path, expires, body_str);
326
327        let mut headers = HashMap::new();
328        headers.insert("api-expires".to_string(), expires.to_string());
329        headers.insert("api-key".to_string(), credential.api_key.to_string());
330        headers.insert("api-signature".to_string(), signature);
331
332        // Add Content-Type header for form-encoded body
333        if body.is_some()
334            && (*method == Method::POST || *method == Method::PUT || *method == Method::DELETE)
335        {
336            headers.insert(
337                "Content-Type".to_string(),
338                "application/x-www-form-urlencoded".to_string(),
339            );
340        }
341
342        Ok(headers)
343    }
344
345    async fn send_request<T: DeserializeOwned>(
346        &self,
347        method: Method,
348        endpoint: &str,
349        body: Option<Vec<u8>>,
350        authenticate: bool,
351    ) -> Result<T, BitmexHttpError> {
352        let endpoint = endpoint.to_string();
353        let url = format!("{}{endpoint}", self.base_url);
354        let method_clone = method.clone();
355        let body_clone = body.clone();
356
357        let operation = || {
358            let url = url.clone();
359            let method = method_clone.clone();
360            let body = body_clone.clone();
361            let endpoint = endpoint.clone();
362
363            async move {
364                let headers = if authenticate {
365                    Some(self.sign_request(&method, endpoint.as_str(), body.as_deref())?)
366                } else {
367                    None
368                };
369
370                let rate_keys = Self::rate_limit_keys();
371                let resp = self
372                    .client
373                    .request_with_ustr_keys(method, url, headers, body, None, Some(rate_keys))
374                    .await?;
375
376                if resp.status.is_success() {
377                    serde_json::from_slice(&resp.body).map_err(Into::into)
378                } else if let Ok(error_resp) =
379                    serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
380                {
381                    Err(error_resp.into())
382                } else {
383                    Err(BitmexHttpError::UnexpectedStatus {
384                        status: StatusCode::from_u16(resp.status.as_u16())
385                            .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
386                        body: String::from_utf8_lossy(&resp.body).to_string(),
387                    })
388                }
389            }
390        };
391
392        // Retry strategy based on BitMEX error responses and HTTP status codes:
393        //
394        // 1. Network errors: always retry (transient connection issues)
395        // 2. HTTP 5xx/429: server errors and rate limiting should be retried
396        // 3. BitMEX JSON errors with specific handling:
397        //    - "RateLimitError": explicit rate limit error from BitMEX
398        //    - "HTTPError": generic error name used by BitMEX for various issues
399        //      Only retry if message contains "rate limit" to avoid retrying
400        //      non-transient errors like authentication failures, validation errors,
401        //      insufficient balance, etc. which also return as "HTTPError"
402        //
403        // Note: BitMEX returns many permanent errors as "HTTPError" (e.g., "Invalid orderQty",
404        // "Account has insufficient Available Balance", "Invalid API Key") which should NOT
405        // be retried. We only retry when the message explicitly mentions rate limiting.
406        //
407        // See tests in tests/http.rs for retry behavior validation
408        let should_retry = |error: &BitmexHttpError| -> bool {
409            match error {
410                BitmexHttpError::NetworkError(_) => true,
411                BitmexHttpError::UnexpectedStatus { status, .. } => {
412                    status.as_u16() >= 500 || status.as_u16() == 429
413                }
414                BitmexHttpError::BitmexError {
415                    error_name,
416                    message,
417                } => {
418                    error_name == "RateLimitError"
419                        || (error_name == "HTTPError"
420                            && message.to_lowercase().contains("rate limit"))
421                }
422                _ => false,
423            }
424        };
425
426        let create_error = |msg: String| -> BitmexHttpError {
427            if msg == "canceled" {
428                BitmexHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
429            } else {
430                BitmexHttpError::NetworkError(msg)
431            }
432        };
433
434        self.retry_manager
435            .execute_with_retry_with_cancel(
436                endpoint.as_str(),
437                operation,
438                should_retry,
439                create_error,
440                &self.cancellation_token,
441            )
442            .await
443    }
444
445    /// Get all instruments.
446    ///
447    /// # Errors
448    ///
449    /// Returns an error if the request fails, the response cannot be parsed, or the API returns an error.
450    pub async fn http_get_instruments(
451        &self,
452        active_only: bool,
453    ) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
454        let path = if active_only {
455            "/instrument/active"
456        } else {
457            "/instrument"
458        };
459        self.send_request(Method::GET, path, None, false).await
460    }
461
462    /// Requests the current server time from BitMEX.
463    ///
464    /// Retrieves the BitMEX API info including the system time in Unix timestamp (milliseconds).
465    /// This is useful for synchronizing local clocks with the exchange server and logging time drift.
466    ///
467    /// # Errors
468    ///
469    /// Returns an error if the HTTP request fails or if the response body
470    /// cannot be parsed into [`BitmexApiInfo`].
471    pub async fn http_get_server_time(&self) -> Result<u64, BitmexHttpError> {
472        let response: BitmexApiInfo = self.send_request(Method::GET, "", None, false).await?;
473        Ok(response.timestamp)
474    }
475
476    /// Get the instrument definition for the specified symbol.
477    ///
478    /// BitMEX responds to `/instrument?symbol=...` with an array, even when
479    /// a single symbol is requested. This helper returns the first element of
480    /// that array and yields `Ok(None)` when the venue returns an empty list
481    /// (e.g. unknown symbol).
482    ///
483    /// # Errors
484    ///
485    /// Returns an error if the request fails or the payload cannot be deserialized.
486    pub async fn http_get_instrument(
487        &self,
488        symbol: &str,
489    ) -> Result<Option<BitmexInstrument>, BitmexHttpError> {
490        let path = &format!("/instrument?symbol={symbol}");
491        let instruments: Vec<BitmexInstrument> =
492            self.send_request(Method::GET, path, None, false).await?;
493
494        Ok(instruments.into_iter().next())
495    }
496
497    /// Get user wallet information.
498    ///
499    /// # Errors
500    ///
501    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
502    pub async fn http_get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
503        let endpoint = "/user/wallet";
504        self.send_request(Method::GET, endpoint, None, true).await
505    }
506
507    /// Get user margin information.
508    ///
509    /// # Errors
510    ///
511    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
512    pub async fn http_get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
513        let path = format!("/user/margin?currency={currency}");
514        self.send_request(Method::GET, &path, None, true).await
515    }
516
517    /// Get historical trades.
518    ///
519    /// # Errors
520    ///
521    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
522    ///
523    /// # Panics
524    ///
525    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
526    pub async fn http_get_trades(
527        &self,
528        params: GetTradeParams,
529    ) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
530        let query = serde_urlencoded::to_string(&params).map_err(|e| {
531            BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
532        })?;
533        let path = format!("/trade?{query}");
534        self.send_request(Method::GET, &path, None, true).await
535    }
536
537    /// Get bucketed (aggregated) trade data.
538    ///
539    /// # Errors
540    ///
541    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
542    pub async fn http_get_trade_bucketed(
543        &self,
544        params: GetTradeBucketedParams,
545    ) -> Result<Vec<BitmexTradeBin>, BitmexHttpError> {
546        let query = serde_urlencoded::to_string(&params).map_err(|e| {
547            BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
548        })?;
549        let path = format!("/trade/bucketed?{query}");
550        self.send_request(Method::GET, &path, None, true).await
551    }
552
553    /// Get user orders.
554    ///
555    /// # Errors
556    ///
557    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
558    ///
559    /// # Panics
560    ///
561    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
562    pub async fn http_get_orders(
563        &self,
564        params: GetOrderParams,
565    ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
566        let query = serde_urlencoded::to_string(&params).map_err(|e| {
567            BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
568        })?;
569        let path = format!("/order?{query}");
570        self.send_request(Method::GET, &path, None, true).await
571    }
572
573    /// Place a new order.
574    ///
575    /// # Errors
576    ///
577    /// Returns an error if credentials are missing, the request fails, order validation fails, or the API returns an error.
578    ///
579    /// # Panics
580    ///
581    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
582    pub async fn http_place_order(
583        &self,
584        params: PostOrderParams,
585    ) -> Result<Value, BitmexHttpError> {
586        // BitMEX spec requires form-encoded body for POST /order
587        let body = serde_urlencoded::to_string(&params)
588            .map_err(|e| {
589                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
590            })?
591            .into_bytes();
592        let path = "/order";
593        self.send_request(Method::POST, path, Some(body), true)
594            .await
595    }
596
597    /// Cancel user orders.
598    ///
599    /// # Errors
600    ///
601    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
602    ///
603    /// # Panics
604    ///
605    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
606    pub async fn http_cancel_orders(
607        &self,
608        params: DeleteOrderParams,
609    ) -> Result<Value, BitmexHttpError> {
610        // BitMEX spec requires form-encoded body for DELETE /order
611        let body = serde_urlencoded::to_string(&params)
612            .map_err(|e| {
613                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
614            })?
615            .into_bytes();
616        let path = "/order";
617        self.send_request(Method::DELETE, path, Some(body), true)
618            .await
619    }
620
621    /// Amend an existing order.
622    ///
623    /// # Errors
624    ///
625    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
626    ///
627    /// # Panics
628    ///
629    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
630    pub async fn http_amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
631        // BitMEX spec requires form-encoded body for PUT /order
632        let body = serde_urlencoded::to_string(&params)
633            .map_err(|e| {
634                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
635            })?
636            .into_bytes();
637        let path = "/order";
638        self.send_request(Method::PUT, path, Some(body), true).await
639    }
640
641    /// Cancel all orders.
642    ///
643    /// # Errors
644    ///
645    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
646    ///
647    /// # Panics
648    ///
649    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
650    ///
651    /// # References
652    ///
653    /// <https://www.bitmex.com/api/explorer/#!/Order/Order_cancelAll>
654    pub async fn http_cancel_all_orders(
655        &self,
656        params: DeleteAllOrdersParams,
657    ) -> Result<Value, BitmexHttpError> {
658        let query = serde_urlencoded::to_string(&params).map_err(|e| {
659            BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
660        })?;
661        let path = format!("/order/all?{query}");
662        self.send_request(Method::DELETE, &path, None, true).await
663    }
664
665    /// Get user executions.
666    ///
667    /// # Errors
668    ///
669    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
670    ///
671    /// # Panics
672    ///
673    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
674    pub async fn http_get_executions(
675        &self,
676        params: GetExecutionParams,
677    ) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
678        let query = serde_urlencoded::to_string(&params).map_err(|e| {
679            BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
680        })?;
681        let path = format!("/execution/tradeHistory?{query}");
682        self.send_request(Method::GET, &path, None, true).await
683    }
684
685    /// Get user positions.
686    ///
687    /// # Errors
688    ///
689    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
690    ///
691    /// # Panics
692    ///
693    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
694    pub async fn http_get_positions(
695        &self,
696        params: GetPositionParams,
697    ) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
698        let query = serde_urlencoded::to_string(&params).map_err(|e| {
699            BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
700        })?;
701        let path = format!("/position?{query}");
702        self.send_request(Method::GET, &path, None, true).await
703    }
704
705    /// Update position leverage.
706    ///
707    /// # Errors
708    ///
709    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
710    ///
711    /// # Panics
712    ///
713    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
714    pub async fn http_update_position_leverage(
715        &self,
716        params: PostPositionLeverageParams,
717    ) -> Result<BitmexPosition, BitmexHttpError> {
718        // BitMEX spec requires form-encoded body for POST endpoints
719        let body = serde_urlencoded::to_string(&params)
720            .map_err(|e| {
721                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
722            })?
723            .into_bytes();
724        let path = "/position/leverage";
725        self.send_request(Method::POST, path, Some(body), true)
726            .await
727    }
728}
729
730/// Provides a HTTP client for connecting to the [BitMEX](https://bitmex.com) REST API.
731///
732/// This is the high-level client that wraps the inner client and provides
733/// Nautilus-specific functionality for trading operations.
734#[derive(Clone, Debug)]
735#[cfg_attr(
736    feature = "python",
737    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
738)]
739pub struct BitmexHttpClient {
740    inner: Arc<BitmexHttpInnerClient>,
741    instruments_cache: Arc<Mutex<AHashMap<Ustr, InstrumentAny>>>,
742}
743
744impl Default for BitmexHttpClient {
745    fn default() -> Self {
746        Self::new(
747            None,
748            None,
749            None,
750            false,
751            Some(60),
752            None,
753            None,
754            None,
755            None,
756            None,
757            None,
758        )
759        .expect("Failed to create default BitmexHttpClient")
760    }
761}
762
763impl BitmexHttpClient {
764    /// Creates a new [`BitmexHttpClient`] instance.
765    ///
766    /// # Errors
767    ///
768    /// Returns an error if the HTTP client cannot be created.
769    #[allow(clippy::too_many_arguments)]
770    pub fn new(
771        base_url: Option<String>,
772        api_key: Option<String>,
773        api_secret: Option<String>,
774        testnet: bool,
775        timeout_secs: Option<u64>,
776        max_retries: Option<u32>,
777        retry_delay_ms: Option<u64>,
778        retry_delay_max_ms: Option<u64>,
779        recv_window_ms: Option<u64>,
780        max_requests_per_second: Option<u32>,
781        max_requests_per_minute: Option<u32>,
782    ) -> Result<Self, BitmexHttpError> {
783        // Determine the base URL
784        let url = base_url.unwrap_or_else(|| {
785            if testnet {
786                BITMEX_HTTP_TESTNET_URL.to_string()
787            } else {
788                BITMEX_HTTP_URL.to_string()
789            }
790        });
791
792        let inner = match (api_key, api_secret) {
793            (Some(key), Some(secret)) => BitmexHttpInnerClient::with_credentials(
794                key,
795                secret,
796                url,
797                timeout_secs,
798                max_retries,
799                retry_delay_ms,
800                retry_delay_max_ms,
801                recv_window_ms,
802                max_requests_per_second,
803                max_requests_per_minute,
804            )?,
805            _ => BitmexHttpInnerClient::new(
806                Some(url),
807                timeout_secs,
808                max_retries,
809                retry_delay_ms,
810                retry_delay_max_ms,
811                recv_window_ms,
812                max_requests_per_second,
813                max_requests_per_minute,
814            )?,
815        };
816
817        Ok(Self {
818            inner: Arc::new(inner),
819            instruments_cache: Arc::new(Mutex::new(AHashMap::new())),
820        })
821    }
822
823    /// Creates a new [`BitmexHttpClient`] instance using environment variables and
824    /// the default BitMEX HTTP base URL.
825    ///
826    /// # Errors
827    ///
828    /// Returns an error if required environment variables are not set or invalid.
829    pub fn from_env() -> anyhow::Result<Self> {
830        Self::with_credentials(None, None, None, None, None, None, None, None, None, None)
831            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
832    }
833
834    /// Creates a new [`BitmexHttpClient`] configured with credentials
835    /// for authenticated requests.
836    ///
837    /// If `api_key` or `api_secret` are `None`, they will be sourced from the
838    /// `BITMEX_API_KEY` and `BITMEX_API_SECRET` environment variables.
839    ///
840    /// # Errors
841    ///
842    /// Returns an error if one credential is provided without the other.
843    #[allow(clippy::too_many_arguments)]
844    pub fn with_credentials(
845        api_key: Option<String>,
846        api_secret: Option<String>,
847        base_url: Option<String>,
848        timeout_secs: Option<u64>,
849        max_retries: Option<u32>,
850        retry_delay_ms: Option<u64>,
851        retry_delay_max_ms: Option<u64>,
852        recv_window_ms: Option<u64>,
853        max_requests_per_second: Option<u32>,
854        max_requests_per_minute: Option<u32>,
855    ) -> anyhow::Result<Self> {
856        // Determine testnet from URL first to select correct environment variables
857        let testnet = base_url.as_ref().is_some_and(|url| url.contains("testnet"));
858
859        // Choose environment variables based on testnet flag
860        let (key_var, secret_var) = if testnet {
861            ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
862        } else {
863            ("BITMEX_API_KEY", "BITMEX_API_SECRET")
864        };
865
866        let api_key = api_key.or_else(|| get_env_var(key_var).ok());
867        let api_secret = api_secret.or_else(|| get_env_var(secret_var).ok());
868
869        // If we're trying to create an authenticated client, we need both key and secret
870        if api_key.is_some() && api_secret.is_none() {
871            anyhow::bail!("{secret_var} is required when {key_var} is provided");
872        }
873        if api_key.is_none() && api_secret.is_some() {
874            anyhow::bail!("{key_var} is required when {secret_var} is provided");
875        }
876
877        Self::new(
878            base_url,
879            api_key,
880            api_secret,
881            testnet,
882            timeout_secs,
883            max_retries,
884            retry_delay_ms,
885            retry_delay_max_ms,
886            recv_window_ms,
887            max_requests_per_second,
888            max_requests_per_minute,
889        )
890        .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
891    }
892
893    /// Returns the base url being used by the client.
894    #[must_use]
895    pub fn base_url(&self) -> &str {
896        self.inner.base_url.as_str()
897    }
898
899    /// Returns the public API key being used by the client.
900    #[must_use]
901    pub fn api_key(&self) -> Option<&str> {
902        self.inner.credential.as_ref().map(|c| c.api_key.as_str())
903    }
904
905    /// Requests the current server time from BitMEX.
906    ///
907    /// Returns the BitMEX system time as a Unix timestamp in milliseconds.
908    ///
909    /// # Errors
910    ///
911    /// Returns an error if the HTTP request fails or if the response cannot be parsed.
912    pub async fn http_get_server_time(&self) -> Result<u64, BitmexHttpError> {
913        self.inner.http_get_server_time().await
914    }
915
916    /// Generates a timestamp for initialization.
917    fn generate_ts_init(&self) -> UnixNanos {
918        get_atomic_clock_realtime().get_time_ns()
919    }
920
921    /// Check if the order has a contingency type that requires linking.
922    fn is_contingent_order(contingency_type: ContingencyType) -> bool {
923        matches!(
924            contingency_type,
925            ContingencyType::Oco | ContingencyType::Oto | ContingencyType::Ouo
926        )
927    }
928
929    /// Check if the order is a parent in contingency relationships.
930    fn is_parent_contingency(contingency_type: ContingencyType) -> bool {
931        matches!(
932            contingency_type,
933            ContingencyType::Oco | ContingencyType::Oto
934        )
935    }
936
937    /// Populate missing `linked_order_ids` for contingency orders by grouping on `order_list_id`.
938    fn populate_linked_order_ids(reports: &mut [OrderStatusReport]) {
939        let mut order_list_groups: HashMap<OrderListId, Vec<ClientOrderId>> = HashMap::new();
940        let mut order_list_parents: HashMap<OrderListId, ClientOrderId> = HashMap::new();
941        let mut prefix_groups: HashMap<String, Vec<ClientOrderId>> = HashMap::new();
942        let mut prefix_parents: HashMap<String, ClientOrderId> = HashMap::new();
943
944        for report in reports.iter() {
945            let Some(client_order_id) = report.client_order_id else {
946                continue;
947            };
948
949            if let Some(order_list_id) = report.order_list_id {
950                order_list_groups
951                    .entry(order_list_id)
952                    .or_default()
953                    .push(client_order_id);
954
955                if Self::is_parent_contingency(report.contingency_type) {
956                    order_list_parents
957                        .entry(order_list_id)
958                        .or_insert(client_order_id);
959                }
960            }
961
962            if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
963                && Self::is_contingent_order(report.contingency_type)
964            {
965                prefix_groups
966                    .entry(base.to_owned())
967                    .or_default()
968                    .push(client_order_id);
969
970                if Self::is_parent_contingency(report.contingency_type) {
971                    prefix_parents
972                        .entry(base.to_owned())
973                        .or_insert(client_order_id);
974                }
975            }
976        }
977
978        for report in reports.iter_mut() {
979            let Some(client_order_id) = report.client_order_id else {
980                continue;
981            };
982
983            if report.linked_order_ids.is_some() {
984                continue;
985            }
986
987            // Only process contingent orders
988            if !Self::is_contingent_order(report.contingency_type) {
989                continue;
990            }
991
992            if let Some(order_list_id) = report.order_list_id
993                && let Some(group) = order_list_groups.get(&order_list_id)
994            {
995                let mut linked: Vec<ClientOrderId> = group
996                    .iter()
997                    .copied()
998                    .filter(|candidate| candidate != &client_order_id)
999                    .collect();
1000
1001                if !linked.is_empty() {
1002                    if let Some(parent_id) = order_list_parents.get(&order_list_id) {
1003                        if client_order_id != *parent_id {
1004                            linked.sort_by_key(
1005                                |candidate| {
1006                                    if candidate == parent_id { 0 } else { 1 }
1007                                },
1008                            );
1009                            report.parent_order_id = Some(*parent_id);
1010                        } else {
1011                            report.parent_order_id = None;
1012                        }
1013                    } else {
1014                        report.parent_order_id = None;
1015                    }
1016
1017                    tracing::trace!(
1018                        client_order_id = ?client_order_id,
1019                        order_list_id = ?order_list_id,
1020                        contingency_type = ?report.contingency_type,
1021                        linked_order_ids = ?linked,
1022                        "BitMEX linked ids sourced from order list id",
1023                    );
1024                    report.linked_order_ids = Some(linked);
1025                    continue;
1026                }
1027
1028                tracing::trace!(
1029                    client_order_id = ?client_order_id,
1030                    order_list_id = ?order_list_id,
1031                    contingency_type = ?report.contingency_type,
1032                    order_list_group = ?group,
1033                    "BitMEX order list id group had no peers",
1034                );
1035                report.parent_order_id = None;
1036            } else if report.order_list_id.is_none() {
1037                report.parent_order_id = None;
1038            }
1039
1040            if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1041                && let Some(group) = prefix_groups.get(base)
1042            {
1043                let mut linked: Vec<ClientOrderId> = group
1044                    .iter()
1045                    .copied()
1046                    .filter(|candidate| candidate != &client_order_id)
1047                    .collect();
1048
1049                if !linked.is_empty() {
1050                    if let Some(parent_id) = prefix_parents.get(base) {
1051                        if client_order_id != *parent_id {
1052                            linked.sort_by_key(
1053                                |candidate| {
1054                                    if candidate == parent_id { 0 } else { 1 }
1055                                },
1056                            );
1057                            report.parent_order_id = Some(*parent_id);
1058                        } else {
1059                            report.parent_order_id = None;
1060                        }
1061                    } else {
1062                        report.parent_order_id = None;
1063                    }
1064
1065                    tracing::trace!(
1066                        client_order_id = ?client_order_id,
1067                        contingency_type = ?report.contingency_type,
1068                        base = base,
1069                        linked_order_ids = ?linked,
1070                        "BitMEX linked ids constructed from client order id prefix",
1071                    );
1072                    report.linked_order_ids = Some(linked);
1073                    continue;
1074                }
1075
1076                tracing::trace!(
1077                    client_order_id = ?client_order_id,
1078                    contingency_type = ?report.contingency_type,
1079                    base = base,
1080                    prefix_group = ?group,
1081                    "BitMEX client order id prefix group had no peers",
1082                );
1083                report.parent_order_id = None;
1084            } else if client_order_id.as_str().contains('-') {
1085                report.parent_order_id = None;
1086            }
1087
1088            if Self::is_contingent_order(report.contingency_type) {
1089                tracing::warn!(
1090                    client_order_id = ?report.client_order_id,
1091                    order_list_id = ?report.order_list_id,
1092                    contingency_type = ?report.contingency_type,
1093                    "BitMEX order status report missing linked ids after grouping",
1094                );
1095                report.contingency_type = ContingencyType::NoContingency;
1096                report.parent_order_id = None;
1097            }
1098
1099            report.linked_order_ids = None;
1100        }
1101    }
1102
1103    /// Cancel all pending HTTP requests.
1104    pub fn cancel_all_requests(&self) {
1105        self.inner.cancel_all_requests();
1106    }
1107
1108    /// Get the cancellation token for this client.
1109    pub fn cancellation_token(&self) -> CancellationToken {
1110        self.inner.cancellation_token().clone()
1111    }
1112
1113    /// Adds an instrument to the cache for precision lookups.
1114    ///
1115    /// # Panics
1116    ///
1117    /// Panics if the instruments cache mutex is poisoned.
1118    pub fn add_instrument(&self, instrument: InstrumentAny) {
1119        self.instruments_cache
1120            .lock()
1121            .unwrap()
1122            .insert(instrument.raw_symbol().inner(), instrument);
1123    }
1124
1125    /// Request a single instrument and parse it into a Nautilus type.
1126    ///
1127    /// # Errors
1128    ///
1129    /// Returns `Ok(Some(..))` when the venue returns a definition that parses
1130    /// successfully, `Ok(None)` when the instrument is unknown or the payload
1131    /// cannot be converted into a Nautilus `Instrument`.
1132    pub async fn request_instrument(
1133        &self,
1134        instrument_id: InstrumentId,
1135    ) -> anyhow::Result<Option<InstrumentAny>> {
1136        let response = self
1137            .inner
1138            .http_get_instrument(instrument_id.symbol.as_str())
1139            .await?;
1140
1141        let instrument = match response {
1142            Some(instrument) => instrument,
1143            None => return Ok(None),
1144        };
1145
1146        let ts_init = self.generate_ts_init();
1147
1148        Ok(parse_instrument_any(&instrument, ts_init))
1149    }
1150
1151    /// Request all available instruments and parse them into Nautilus types.
1152    ///
1153    /// # Errors
1154    ///
1155    /// Returns an error if the HTTP request fails or parsing fails.
1156    pub async fn request_instruments(
1157        &self,
1158        active_only: bool,
1159    ) -> anyhow::Result<Vec<InstrumentAny>> {
1160        let instruments = self.inner.http_get_instruments(active_only).await?;
1161        let ts_init = self.generate_ts_init();
1162
1163        let mut parsed_instruments = Vec::new();
1164        let mut failed_count = 0;
1165        let total_count = instruments.len();
1166
1167        for inst in instruments {
1168            if let Some(instrument_any) = parse_instrument_any(&inst, ts_init) {
1169                parsed_instruments.push(instrument_any);
1170            } else {
1171                failed_count += 1;
1172                tracing::error!(
1173                    "Failed to parse instrument: symbol={}, type={:?}, state={:?} - instrument will not be cached",
1174                    inst.symbol,
1175                    inst.instrument_type,
1176                    inst.state
1177                );
1178            }
1179        }
1180
1181        if failed_count > 0 {
1182            tracing::error!(
1183                "Instrument parse failures: {} failed out of {} total ({}  successfully parsed)",
1184                failed_count,
1185                total_count,
1186                parsed_instruments.len()
1187            );
1188        }
1189
1190        Ok(parsed_instruments)
1191    }
1192
1193    /// Get user wallet information.
1194    ///
1195    /// # Errors
1196    ///
1197    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1198    ///
1199    /// # Panics
1200    ///
1201    /// Panics if the inner mutex is poisoned.
1202    pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
1203        let inner = self.inner.clone();
1204        inner.http_get_wallet().await
1205    }
1206
1207    /// Get user orders.
1208    ///
1209    /// # Errors
1210    ///
1211    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1212    ///
1213    /// # Panics
1214    ///
1215    /// Panics if the inner mutex is poisoned.
1216    pub async fn get_orders(
1217        &self,
1218        params: GetOrderParams,
1219    ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
1220        let inner = self.inner.clone();
1221        inner.http_get_orders(params).await
1222    }
1223
1224    /// Place a new order with raw API params.
1225    ///
1226    /// # Errors
1227    ///
1228    /// Returns an error if credentials are missing, the request fails, order validation fails, or the API returns an error.
1229    ///
1230    /// # Panics
1231    ///
1232    /// Panics if the inner mutex is poisoned.
1233    pub async fn http_place_order(
1234        &self,
1235        params: PostOrderParams,
1236    ) -> Result<Value, BitmexHttpError> {
1237        let inner = self.inner.clone();
1238        inner.http_place_order(params).await
1239    }
1240
1241    /// Cancel user orders with raw API params.
1242    ///
1243    /// # Errors
1244    ///
1245    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
1246    ///
1247    /// # Panics
1248    ///
1249    /// Panics if the inner mutex is poisoned.
1250    pub async fn http_cancel_orders(
1251        &self,
1252        params: DeleteOrderParams,
1253    ) -> Result<Value, BitmexHttpError> {
1254        let inner = self.inner.clone();
1255        inner.http_cancel_orders(params).await
1256    }
1257
1258    /// Amend an existing order with raw API params.
1259    ///
1260    /// # Errors
1261    ///
1262    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
1263    ///
1264    /// # Panics
1265    ///
1266    /// Panics if the inner mutex is poisoned.
1267    pub async fn http_amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
1268        let inner = self.inner.clone();
1269        inner.http_amend_order(params).await
1270    }
1271
1272    /// Cancel all orders with raw API params.
1273    ///
1274    /// # Errors
1275    ///
1276    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1277    ///
1278    /// # Panics
1279    ///
1280    /// Panics if the inner mutex is poisoned.
1281    ///
1282    /// # References
1283    ///
1284    /// <https://www.bitmex.com/api/explorer/#!/Order/Order_cancelAll>
1285    pub async fn http_cancel_all_orders(
1286        &self,
1287        params: DeleteAllOrdersParams,
1288    ) -> Result<Value, BitmexHttpError> {
1289        let inner = self.inner.clone();
1290        inner.http_cancel_all_orders(params).await
1291    }
1292
1293    /// Get price precision for a symbol from the instruments cache (if found).
1294    ///
1295    /// # Errors
1296    ///
1297    /// Returns an error if the instrument is not found in the cache.
1298    ///
1299    /// # Panics
1300    ///
1301    /// Panics if the instruments cache mutex is poisoned.
1302    fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1303        let cache = self.instruments_cache.lock().expect(MUTEX_POISONED);
1304        cache.get(&symbol).cloned().ok_or_else(|| {
1305            anyhow::anyhow!(
1306                "Instrument {symbol} not found in cache, ensure instruments loaded first"
1307            )
1308        })
1309    }
1310
1311    /// Returns the cached price precision for the given symbol.
1312    ///
1313    /// # Errors
1314    ///
1315    /// Returns an error if the instrument was never cached (for example, if
1316    /// instruments were not loaded prior to use).
1317    pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
1318        self.instrument_from_cache(symbol)
1319            .map(|instrument| instrument.price_precision())
1320    }
1321
1322    /// Get user margin information.
1323    ///
1324    /// # Errors
1325    ///
1326    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1327    pub async fn http_get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
1328        self.inner
1329            .http_get_margin(currency)
1330            .await
1331            .map_err(|e| anyhow::anyhow!(e))
1332    }
1333
1334    /// Request account state for the given account.
1335    ///
1336    /// # Errors
1337    ///
1338    /// Returns an error if the HTTP request fails or no account state is returned.
1339    pub async fn request_account_state(
1340        &self,
1341        account_id: AccountId,
1342    ) -> anyhow::Result<AccountState> {
1343        // Get margin data for XBt (Bitcoin) by default
1344        let margin = self
1345            .inner
1346            .http_get_margin("XBt")
1347            .await
1348            .map_err(|e| anyhow::anyhow!(e))?;
1349
1350        let ts_init = nautilus_core::nanos::UnixNanos::from(
1351            chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64,
1352        );
1353
1354        // Convert HTTP Margin to WebSocket MarginMsg for parsing
1355        let margin_msg = BitmexMarginMsg {
1356            account: margin.account,
1357            currency: margin.currency,
1358            risk_limit: margin.risk_limit,
1359            amount: margin.amount,
1360            prev_realised_pnl: margin.prev_realised_pnl,
1361            gross_comm: margin.gross_comm,
1362            gross_open_cost: margin.gross_open_cost,
1363            gross_open_premium: margin.gross_open_premium,
1364            gross_exec_cost: margin.gross_exec_cost,
1365            gross_mark_value: margin.gross_mark_value,
1366            risk_value: margin.risk_value,
1367            init_margin: margin.init_margin,
1368            maint_margin: margin.maint_margin,
1369            target_excess_margin: margin.target_excess_margin,
1370            realised_pnl: margin.realised_pnl,
1371            unrealised_pnl: margin.unrealised_pnl,
1372            wallet_balance: margin.wallet_balance,
1373            margin_balance: margin.margin_balance,
1374            margin_leverage: margin.margin_leverage,
1375            margin_used_pcnt: margin.margin_used_pcnt,
1376            excess_margin: margin.excess_margin,
1377            available_margin: margin.available_margin,
1378            withdrawable_margin: margin.withdrawable_margin,
1379            maker_fee_discount: None, // Not in HTTP response
1380            taker_fee_discount: None, // Not in HTTP response
1381            timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
1382            foreign_margin_balance: None,
1383            foreign_requirement: None,
1384        };
1385
1386        parse_account_state(&margin_msg, account_id, ts_init)
1387    }
1388
1389    /// Submit a new order.
1390    ///
1391    /// # Errors
1392    ///
1393    /// Returns an error if credentials are missing, the request fails, order validation fails,
1394    /// the order is rejected, or the API returns an error.
1395    #[allow(clippy::too_many_arguments)]
1396    pub async fn submit_order(
1397        &self,
1398        instrument_id: InstrumentId,
1399        client_order_id: ClientOrderId,
1400        order_side: OrderSide,
1401        order_type: OrderType,
1402        quantity: Quantity,
1403        time_in_force: TimeInForce,
1404        price: Option<Price>,
1405        trigger_price: Option<Price>,
1406        trigger_type: Option<TriggerType>,
1407        display_qty: Option<Quantity>,
1408        post_only: bool,
1409        reduce_only: bool,
1410        order_list_id: Option<OrderListId>,
1411        contingency_type: Option<ContingencyType>,
1412    ) -> anyhow::Result<OrderStatusReport> {
1413        use crate::common::enums::{
1414            BitmexExecInstruction, BitmexOrderType, BitmexSide, BitmexTimeInForce,
1415        };
1416
1417        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1418
1419        let mut params = super::query::PostOrderParamsBuilder::default();
1420        params.text(NAUTILUS_TRADER);
1421        params.symbol(instrument_id.symbol.as_str());
1422        params.cl_ord_id(client_order_id.as_str());
1423
1424        let side = BitmexSide::try_from_order_side(order_side)?;
1425        params.side(side);
1426
1427        let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
1428        params.ord_type(ord_type);
1429
1430        params.order_qty(quantity_to_u32(&quantity, &instrument));
1431
1432        let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
1433        params.time_in_force(tif);
1434
1435        if let Some(price) = price {
1436            params.price(price.as_f64());
1437        }
1438
1439        if let Some(trigger_price) = trigger_price {
1440            params.stop_px(trigger_price.as_f64());
1441        }
1442
1443        if let Some(display_qty) = display_qty {
1444            params.display_qty(quantity_to_u32(&display_qty, &instrument));
1445        }
1446
1447        if let Some(order_list_id) = order_list_id {
1448            params.cl_ord_link_id(order_list_id.as_str());
1449        }
1450
1451        let mut exec_inst = Vec::new();
1452
1453        if post_only {
1454            exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
1455        }
1456
1457        if reduce_only {
1458            exec_inst.push(BitmexExecInstruction::ReduceOnly);
1459        }
1460
1461        if trigger_price.is_some()
1462            && let Some(trigger_type) = trigger_type
1463        {
1464            match trigger_type {
1465                TriggerType::LastPrice => exec_inst.push(BitmexExecInstruction::LastPrice),
1466                TriggerType::MarkPrice => exec_inst.push(BitmexExecInstruction::MarkPrice),
1467                TriggerType::IndexPrice => exec_inst.push(BitmexExecInstruction::IndexPrice),
1468                _ => {} // Use BitMEX default (LastPrice) for other trigger types
1469            }
1470        }
1471
1472        if !exec_inst.is_empty() {
1473            params.exec_inst(exec_inst);
1474        }
1475
1476        if let Some(contingency_type) = contingency_type {
1477            let bitmex_contingency = BitmexContingencyType::try_from(contingency_type)?;
1478            params.contingency_type(bitmex_contingency);
1479        }
1480
1481        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1482
1483        let response = self.inner.http_place_order(params).await?;
1484
1485        let order: BitmexOrder = serde_json::from_value(response)?;
1486
1487        if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1488            let reason = order
1489                .ord_rej_reason
1490                .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1491            anyhow::bail!("Order rejected: {reason}");
1492        }
1493
1494        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1495        let ts_init = self.generate_ts_init();
1496
1497        parse_order_status_report(&order, &instrument, ts_init)
1498    }
1499
1500    /// Cancel an order.
1501    ///
1502    /// # Errors
1503    ///
1504    /// Returns an error if:
1505    /// - Credentials are missing.
1506    /// - The request fails.
1507    /// - The order doesn't exist.
1508    /// - The API returns an error.
1509    pub async fn cancel_order(
1510        &self,
1511        instrument_id: InstrumentId,
1512        client_order_id: Option<ClientOrderId>,
1513        venue_order_id: Option<VenueOrderId>,
1514    ) -> anyhow::Result<OrderStatusReport> {
1515        let mut params = super::query::DeleteOrderParamsBuilder::default();
1516        params.text(NAUTILUS_TRADER);
1517
1518        if let Some(venue_order_id) = venue_order_id {
1519            params.order_id(vec![venue_order_id.as_str().to_string()]);
1520        } else if let Some(client_order_id) = client_order_id {
1521            params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
1522        } else {
1523            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1524        }
1525
1526        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1527
1528        let response = self.inner.http_cancel_orders(params).await?;
1529
1530        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1531        let order = orders
1532            .into_iter()
1533            .next()
1534            .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1535
1536        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1537        let ts_init = self.generate_ts_init();
1538
1539        parse_order_status_report(&order, &instrument, ts_init)
1540    }
1541
1542    /// Cancel multiple orders.
1543    ///
1544    /// # Errors
1545    ///
1546    /// Returns an error if:
1547    /// - Credentials are missing.
1548    /// - The request fails.
1549    /// - The order doesn't exist.
1550    /// - The API returns an error.
1551    pub async fn cancel_orders(
1552        &self,
1553        instrument_id: InstrumentId,
1554        client_order_ids: Option<Vec<ClientOrderId>>,
1555        venue_order_ids: Option<Vec<VenueOrderId>>,
1556    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1557        let mut params = super::query::DeleteOrderParamsBuilder::default();
1558        params.text(NAUTILUS_TRADER);
1559
1560        // BitMEX API requires either client order IDs or venue order IDs, not both
1561        // Prioritize venue order IDs if both are provided
1562        if let Some(venue_order_ids) = venue_order_ids {
1563            if venue_order_ids.is_empty() {
1564                anyhow::bail!("venue_order_ids cannot be empty");
1565            }
1566            params.order_id(
1567                venue_order_ids
1568                    .iter()
1569                    .map(|id| id.to_string())
1570                    .collect::<Vec<_>>(),
1571            );
1572        } else if let Some(client_order_ids) = client_order_ids {
1573            if client_order_ids.is_empty() {
1574                anyhow::bail!("client_order_ids cannot be empty");
1575            }
1576            params.cl_ord_id(
1577                client_order_ids
1578                    .iter()
1579                    .map(|id| id.to_string())
1580                    .collect::<Vec<_>>(),
1581            );
1582        } else {
1583            anyhow::bail!("Either client_order_ids or venue_order_ids must be provided");
1584        }
1585
1586        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1587
1588        let response = self.inner.http_cancel_orders(params).await?;
1589
1590        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1591
1592        let ts_init = self.generate_ts_init();
1593        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1594
1595        let mut reports = Vec::new();
1596
1597        for order in orders {
1598            reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1599        }
1600
1601        Self::populate_linked_order_ids(&mut reports);
1602
1603        Ok(reports)
1604    }
1605
1606    /// Cancel all orders for an instrument and optionally an order side.
1607    ///
1608    /// # Errors
1609    ///
1610    /// Returns an error if:
1611    /// - Credentials are missing.
1612    /// - The request fails.
1613    /// - The order doesn't exist.
1614    /// - The API returns an error.
1615    pub async fn cancel_all_orders(
1616        &self,
1617        instrument_id: InstrumentId,
1618        order_side: Option<OrderSide>,
1619    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1620        let mut params = DeleteAllOrdersParamsBuilder::default();
1621        params.text(NAUTILUS_TRADER);
1622        params.symbol(instrument_id.symbol.as_str());
1623
1624        if let Some(side) = order_side {
1625            let side = BitmexSide::try_from_order_side(side)?;
1626            params.filter(serde_json::json!({
1627                "side": side
1628            }));
1629        }
1630
1631        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1632
1633        let response = self.inner.http_cancel_all_orders(params).await?;
1634
1635        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1636
1637        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1638        let ts_init = self.generate_ts_init();
1639
1640        let mut reports = Vec::new();
1641
1642        for order in orders {
1643            reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1644        }
1645
1646        Self::populate_linked_order_ids(&mut reports);
1647
1648        Ok(reports)
1649    }
1650
1651    /// Modify an existing order.
1652    ///
1653    /// # Errors
1654    ///
1655    /// Returns an error if:
1656    /// - Credentials are missing.
1657    /// - The request fails.
1658    /// - The order doesn't exist.
1659    /// - The order is already closed.
1660    /// - The API returns an error.
1661    pub async fn modify_order(
1662        &self,
1663        instrument_id: InstrumentId,
1664        client_order_id: Option<ClientOrderId>,
1665        venue_order_id: Option<VenueOrderId>,
1666        quantity: Option<Quantity>,
1667        price: Option<Price>,
1668        trigger_price: Option<Price>,
1669    ) -> anyhow::Result<OrderStatusReport> {
1670        let mut params = PutOrderParamsBuilder::default();
1671        params.text(NAUTILUS_TRADER);
1672
1673        // Set order ID - prefer venue_order_id if available
1674        if let Some(venue_order_id) = venue_order_id {
1675            params.order_id(venue_order_id.as_str());
1676        } else if let Some(client_order_id) = client_order_id {
1677            params.orig_cl_ord_id(client_order_id.as_str());
1678        } else {
1679            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1680        }
1681
1682        if let Some(quantity) = quantity {
1683            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1684            params.order_qty(quantity_to_u32(&quantity, &instrument));
1685        }
1686
1687        if let Some(price) = price {
1688            params.price(price.as_f64());
1689        }
1690
1691        if let Some(trigger_price) = trigger_price {
1692            params.stop_px(trigger_price.as_f64());
1693        }
1694
1695        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1696
1697        let response = self.inner.http_amend_order(params).await?;
1698
1699        let order: BitmexOrder = serde_json::from_value(response)?;
1700
1701        if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1702            let reason = order
1703                .ord_rej_reason
1704                .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1705            anyhow::bail!("Order modification rejected: {reason}");
1706        }
1707
1708        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1709        let ts_init = self.generate_ts_init();
1710
1711        parse_order_status_report(&order, &instrument, ts_init)
1712    }
1713
1714    /// Query a single order by client order ID or venue order ID.
1715    ///
1716    /// # Errors
1717    ///
1718    /// Returns an error if:
1719    /// - Credentials are missing.
1720    /// - The request fails.
1721    /// - The API returns an error.
1722    pub async fn query_order(
1723        &self,
1724        instrument_id: InstrumentId,
1725        client_order_id: Option<ClientOrderId>,
1726        venue_order_id: Option<VenueOrderId>,
1727    ) -> anyhow::Result<Option<OrderStatusReport>> {
1728        let mut params = GetOrderParamsBuilder::default();
1729
1730        let filter_json = if let Some(client_order_id) = client_order_id {
1731            serde_json::json!({
1732                "clOrdID": client_order_id.to_string()
1733            })
1734        } else if let Some(venue_order_id) = venue_order_id {
1735            serde_json::json!({
1736                "orderID": venue_order_id.to_string()
1737            })
1738        } else {
1739            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1740        };
1741
1742        params.filter(filter_json);
1743        params.count(1); // Only need one order
1744
1745        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1746
1747        let response = self.inner.http_get_orders(params).await?;
1748
1749        if response.is_empty() {
1750            return Ok(None);
1751        }
1752
1753        let order = &response[0];
1754
1755        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1756        let ts_init = self.generate_ts_init();
1757
1758        let report = parse_order_status_report(order, &instrument, ts_init)?;
1759
1760        Ok(Some(report))
1761    }
1762
1763    /// Request a single order status report.
1764    ///
1765    /// # Errors
1766    ///
1767    /// Returns an error if:
1768    /// - Credentials are missing.
1769    /// - The request fails.
1770    /// - The API returns an error.
1771    pub async fn request_order_status_report(
1772        &self,
1773        instrument_id: InstrumentId,
1774        client_order_id: Option<ClientOrderId>,
1775        venue_order_id: Option<VenueOrderId>,
1776    ) -> anyhow::Result<OrderStatusReport> {
1777        let mut params = GetOrderParamsBuilder::default();
1778        params.symbol(instrument_id.symbol.as_str());
1779
1780        if let Some(venue_order_id) = venue_order_id {
1781            params.filter(serde_json::json!({
1782                "orderID": venue_order_id.as_str()
1783            }));
1784        } else if let Some(client_order_id) = client_order_id {
1785            params.filter(serde_json::json!({
1786                "clOrdID": client_order_id.as_str()
1787            }));
1788        }
1789
1790        params.count(1i32);
1791        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1792
1793        let response = self.inner.http_get_orders(params).await?;
1794
1795        let order = response
1796            .into_iter()
1797            .next()
1798            .ok_or_else(|| anyhow::anyhow!("Order not found"))?;
1799
1800        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1801        let ts_init = self.generate_ts_init();
1802
1803        parse_order_status_report(&order, &instrument, ts_init)
1804    }
1805
1806    /// Request multiple order status reports.
1807    ///
1808    /// # Errors
1809    ///
1810    /// Returns an error if:
1811    /// - Credentials are missing.
1812    /// - The request fails.
1813    /// - The API returns an error.
1814    pub async fn request_order_status_reports(
1815        &self,
1816        instrument_id: Option<InstrumentId>,
1817        open_only: bool,
1818        limit: Option<u32>,
1819    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1820        let mut params = GetOrderParamsBuilder::default();
1821
1822        if let Some(instrument_id) = &instrument_id {
1823            params.symbol(instrument_id.symbol.as_str());
1824        }
1825
1826        if open_only {
1827            params.filter(serde_json::json!({
1828                "open": true
1829            }));
1830        }
1831
1832        if let Some(limit) = limit {
1833            params.count(limit as i32);
1834        } else {
1835            params.count(500); // Default count to avoid empty query
1836        }
1837
1838        params.reverse(true); // Get newest orders first
1839
1840        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1841
1842        let response = self.inner.http_get_orders(params).await?;
1843
1844        let ts_init = self.generate_ts_init();
1845
1846        let mut reports = Vec::new();
1847
1848        for order in response {
1849            // Skip orders without symbol (can happen with query responses)
1850            let Some(symbol) = order.symbol else {
1851                tracing::warn!("Order response missing symbol, skipping");
1852                continue;
1853            };
1854
1855            let instrument = self.instrument_from_cache(symbol)?;
1856
1857            match parse_order_status_report(&order, &instrument, ts_init) {
1858                Ok(report) => reports.push(report),
1859                Err(e) => tracing::error!("Failed to parse order status report: {e}"),
1860            }
1861        }
1862
1863        Self::populate_linked_order_ids(&mut reports);
1864
1865        Ok(reports)
1866    }
1867
1868    /// Request trades for the given instrument.
1869    ///
1870    /// # Errors
1871    ///
1872    /// Returns an error if the HTTP request fails or parsing fails.
1873    pub async fn request_trades(
1874        &self,
1875        instrument_id: InstrumentId,
1876        start: Option<DateTime<Utc>>,
1877        end: Option<DateTime<Utc>>,
1878        limit: Option<u32>,
1879    ) -> anyhow::Result<Vec<TradeTick>> {
1880        let mut params = GetTradeParamsBuilder::default();
1881        params.symbol(instrument_id.symbol.as_str());
1882
1883        if let Some(start) = start {
1884            params.start_time(start);
1885        }
1886
1887        if let Some(end) = end {
1888            params.end_time(end);
1889        }
1890
1891        if let (Some(start), Some(end)) = (start, end) {
1892            anyhow::ensure!(
1893                start < end,
1894                "Invalid time range: start={start:?} end={end:?}",
1895            );
1896        }
1897
1898        if let Some(limit) = limit {
1899            let clamped_limit = limit.min(1000);
1900            if limit > 1000 {
1901                tracing::warn!(
1902                    limit,
1903                    clamped_limit,
1904                    "BitMEX trade request limit exceeds venue maximum; clamping",
1905                );
1906            }
1907            params.count(i32::try_from(clamped_limit).unwrap_or(1000));
1908        }
1909        params.reverse(false);
1910        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1911
1912        let response = self.inner.http_get_trades(params).await?;
1913
1914        let ts_init = self.generate_ts_init();
1915
1916        let mut parsed_trades = Vec::new();
1917
1918        for trade in response {
1919            if let Some(start) = start
1920                && trade.timestamp < start
1921            {
1922                continue;
1923            }
1924
1925            if let Some(end) = end
1926                && trade.timestamp > end
1927            {
1928                continue;
1929            }
1930
1931            let price_precision = self.get_price_precision(trade.symbol)?;
1932
1933            match parse_trade(trade, price_precision, ts_init) {
1934                Ok(trade) => parsed_trades.push(trade),
1935                Err(e) => tracing::error!("Failed to parse trade: {e}"),
1936            }
1937        }
1938
1939        Ok(parsed_trades)
1940    }
1941
1942    /// Request bars for the given bar type.
1943    ///
1944    /// # Errors
1945    ///
1946    /// Returns an error if the HTTP request fails, parsing fails, or the bar specification is
1947    /// unsupported by BitMEX.
1948    pub async fn request_bars(
1949        &self,
1950        mut bar_type: BarType,
1951        start: Option<DateTime<Utc>>,
1952        end: Option<DateTime<Utc>>,
1953        limit: Option<u32>,
1954        partial: bool,
1955    ) -> anyhow::Result<Vec<Bar>> {
1956        bar_type = bar_type.standard();
1957
1958        anyhow::ensure!(
1959            bar_type.aggregation_source() == AggregationSource::External,
1960            "Only EXTERNAL aggregation bars are supported"
1961        );
1962        anyhow::ensure!(
1963            bar_type.spec().price_type == PriceType::Last,
1964            "Only LAST price type bars are supported"
1965        );
1966        if let (Some(start), Some(end)) = (start, end) {
1967            anyhow::ensure!(
1968                start < end,
1969                "Invalid time range: start={start:?} end={end:?}"
1970            );
1971        }
1972
1973        let spec = bar_type.spec();
1974        let bin_size = match (spec.aggregation, spec.step.get()) {
1975            (BarAggregation::Minute, 1) => "1m",
1976            (BarAggregation::Minute, 5) => "5m",
1977            (BarAggregation::Hour, 1) => "1h",
1978            (BarAggregation::Day, 1) => "1d",
1979            _ => anyhow::bail!(
1980                "BitMEX does not support {}-{:?}-{:?} bars",
1981                spec.step.get(),
1982                spec.aggregation,
1983                spec.price_type,
1984            ),
1985        };
1986
1987        let instrument_id = bar_type.instrument_id();
1988        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1989
1990        let mut params = GetTradeBucketedParamsBuilder::default();
1991        params.symbol(instrument_id.symbol.as_str());
1992        params.bin_size(bin_size);
1993        if partial {
1994            params.partial(true);
1995        }
1996        if let Some(start) = start {
1997            params.start_time(start);
1998        }
1999        if let Some(end) = end {
2000            params.end_time(end);
2001        }
2002        if let Some(limit) = limit {
2003            let clamped_limit = limit.min(1000);
2004            if limit > 1000 {
2005                tracing::warn!(
2006                    limit,
2007                    clamped_limit,
2008                    "BitMEX bar request limit exceeds venue maximum; clamping",
2009                );
2010            }
2011            params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2012        }
2013        params.reverse(false);
2014        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2015
2016        let response = self.inner.http_get_trade_bucketed(params).await?;
2017        let ts_init = self.generate_ts_init();
2018        let mut bars = Vec::new();
2019
2020        for bin in response {
2021            if let Some(start) = start
2022                && bin.timestamp < start
2023            {
2024                continue;
2025            }
2026            if let Some(end) = end
2027                && bin.timestamp > end
2028            {
2029                continue;
2030            }
2031            if bin.symbol != instrument_id.symbol.inner() {
2032                tracing::warn!(
2033                    symbol = %bin.symbol,
2034                    expected = %instrument_id.symbol,
2035                    "Skipping trade bin for unexpected symbol",
2036                );
2037                continue;
2038            }
2039
2040            match parse_trade_bin(bin, &instrument, &bar_type, ts_init) {
2041                Ok(bar) => bars.push(bar),
2042                Err(e) => tracing::warn!("Failed to parse trade bin: {e}"),
2043            }
2044        }
2045
2046        Ok(bars)
2047    }
2048
2049    /// Request fill reports for the given instrument.
2050    ///
2051    /// # Errors
2052    ///
2053    /// Returns an error if the HTTP request fails or parsing fails.
2054    pub async fn request_fill_reports(
2055        &self,
2056        instrument_id: Option<InstrumentId>,
2057        limit: Option<u32>,
2058    ) -> anyhow::Result<Vec<FillReport>> {
2059        let mut params = GetExecutionParamsBuilder::default();
2060        if let Some(instrument_id) = instrument_id {
2061            params.symbol(instrument_id.symbol.as_str());
2062        }
2063        if let Some(limit) = limit {
2064            params.count(limit as i32);
2065        } else {
2066            params.count(500); // Default count
2067        }
2068        params.reverse(true); // Get newest fills first
2069
2070        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2071
2072        let response = self.inner.http_get_executions(params).await?;
2073
2074        let ts_init = self.generate_ts_init();
2075
2076        let mut reports = Vec::new();
2077
2078        for exec in response {
2079            // Skip executions without symbol (e.g., CancelReject)
2080            let Some(symbol) = exec.symbol else {
2081                tracing::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
2082                continue;
2083            };
2084            let symbol_str = symbol.to_string();
2085
2086            let instrument = match self.instrument_from_cache(symbol) {
2087                Ok(instrument) => instrument,
2088                Err(e) => {
2089                    tracing::error!(symbol = %symbol_str, "Instrument not found in cache for execution parsing: {e}");
2090                    continue;
2091                }
2092            };
2093
2094            match parse_fill_report(exec, &instrument, ts_init) {
2095                Ok(report) => reports.push(report),
2096                Err(e) => {
2097                    // Log at debug level for expected skip cases
2098                    let error_msg = e.to_string();
2099                    if error_msg.starts_with("Skipping non-trade execution")
2100                        || error_msg.starts_with("Skipping execution without order_id")
2101                    {
2102                        tracing::debug!("{e}");
2103                    } else {
2104                        tracing::error!("Failed to parse fill report: {e}");
2105                    }
2106                }
2107            }
2108        }
2109
2110        Ok(reports)
2111    }
2112
2113    /// Request position reports.
2114    ///
2115    /// # Errors
2116    ///
2117    /// Returns an error if the HTTP request fails or parsing fails.
2118    pub async fn request_position_status_reports(
2119        &self,
2120    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2121        let params = GetPositionParamsBuilder::default()
2122            .count(500) // Default count
2123            .build()
2124            .map_err(|e| anyhow::anyhow!(e))?;
2125
2126        let response = self.inner.http_get_positions(params).await?;
2127
2128        let ts_init = self.generate_ts_init();
2129
2130        let mut reports = Vec::new();
2131
2132        for pos in response {
2133            let symbol = Ustr::from(pos.symbol.as_str());
2134            let instrument = match self.instrument_from_cache(symbol) {
2135                Ok(instrument) => instrument,
2136                Err(e) => {
2137                    tracing::error!(
2138                        symbol = pos.symbol.as_str(),
2139                        "Instrument not found in cache for position parsing: {e}"
2140                    );
2141                    continue;
2142                }
2143            };
2144
2145            match parse_position_report(pos, &instrument, ts_init) {
2146                Ok(report) => reports.push(report),
2147                Err(e) => tracing::error!("Failed to parse position report: {e}"),
2148            }
2149        }
2150
2151        Ok(reports)
2152    }
2153
2154    /// Update position leverage.
2155    ///
2156    /// # Errors
2157    ///
2158    /// - Credentials are missing.
2159    /// - The request fails.
2160    /// - The API returns an error.
2161    pub async fn update_position_leverage(
2162        &self,
2163        symbol: &str,
2164        leverage: f64,
2165    ) -> anyhow::Result<PositionStatusReport> {
2166        let params = PostPositionLeverageParams {
2167            symbol: symbol.to_string(),
2168            leverage,
2169            target_account_id: None,
2170        };
2171
2172        let response = self.inner.http_update_position_leverage(params).await?;
2173
2174        let instrument = self.instrument_from_cache(Ustr::from(symbol))?;
2175        let ts_init = self.generate_ts_init();
2176
2177        parse_position_report(response, &instrument, ts_init)
2178    }
2179}
2180
2181////////////////////////////////////////////////////////////////////////////////
2182// Tests
2183////////////////////////////////////////////////////////////////////////////////
2184
2185#[cfg(test)]
2186mod tests {
2187    use nautilus_core::UUID4;
2188    use nautilus_model::enums::OrderStatus;
2189    use rstest::rstest;
2190    use serde_json::json;
2191
2192    use super::*;
2193
2194    fn build_report(
2195        client_order_id: &str,
2196        venue_order_id: &str,
2197        contingency_type: ContingencyType,
2198        order_list_id: Option<&str>,
2199    ) -> OrderStatusReport {
2200        let mut report = OrderStatusReport::new(
2201            AccountId::from("BITMEX-1"),
2202            InstrumentId::from("XBTUSD.BITMEX"),
2203            Some(ClientOrderId::from(client_order_id)),
2204            VenueOrderId::from(venue_order_id),
2205            OrderSide::Buy,
2206            OrderType::Limit,
2207            TimeInForce::Gtc,
2208            OrderStatus::Accepted,
2209            Quantity::new(100.0, 0),
2210            Quantity::default(),
2211            UnixNanos::from(1_u64),
2212            UnixNanos::from(1_u64),
2213            UnixNanos::from(1_u64),
2214            Some(UUID4::new()),
2215        );
2216
2217        if let Some(id) = order_list_id {
2218            report = report.with_order_list_id(OrderListId::from(id));
2219        }
2220
2221        report.with_contingency_type(contingency_type)
2222    }
2223
2224    #[rstest]
2225    fn test_sign_request_generates_correct_headers() {
2226        let client = BitmexHttpInnerClient::with_credentials(
2227            "test_api_key".to_string(),
2228            "test_api_secret".to_string(),
2229            "http://localhost:8080".to_string(),
2230            Some(60),
2231            None, // max_retries
2232            None, // retry_delay_ms
2233            None, // retry_delay_max_ms
2234            None, // recv_window_ms
2235            None, // max_requests_per_second
2236            None, // max_requests_per_minute
2237        )
2238        .expect("Failed to create test client");
2239
2240        let headers = client
2241            .sign_request(&Method::GET, "/api/v1/order", None)
2242            .unwrap();
2243
2244        assert!(headers.contains_key("api-key"));
2245        assert!(headers.contains_key("api-signature"));
2246        assert!(headers.contains_key("api-expires"));
2247        assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
2248    }
2249
2250    #[rstest]
2251    fn test_sign_request_with_body() {
2252        let client = BitmexHttpInnerClient::with_credentials(
2253            "test_api_key".to_string(),
2254            "test_api_secret".to_string(),
2255            "http://localhost:8080".to_string(),
2256            Some(60),
2257            None, // max_retries
2258            None, // retry_delay_ms
2259            None, // retry_delay_max_ms
2260            None, // recv_window_ms
2261            None, // max_requests_per_second
2262            None, // max_requests_per_minute
2263        )
2264        .expect("Failed to create test client");
2265
2266        let body = json!({"symbol": "XBTUSD", "orderQty": 100});
2267        let body_bytes = serde_json::to_vec(&body).unwrap();
2268
2269        let headers_without_body = client
2270            .sign_request(&Method::POST, "/api/v1/order", None)
2271            .unwrap();
2272        let headers_with_body = client
2273            .sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
2274            .unwrap();
2275
2276        // Signatures should be different when body is included
2277        assert_ne!(
2278            headers_without_body.get("api-signature").unwrap(),
2279            headers_with_body.get("api-signature").unwrap()
2280        );
2281    }
2282
2283    #[rstest]
2284    fn test_sign_request_uses_custom_recv_window() {
2285        let client_default = BitmexHttpInnerClient::with_credentials(
2286            "test_api_key".to_string(),
2287            "test_api_secret".to_string(),
2288            "http://localhost:8080".to_string(),
2289            Some(60),
2290            None,
2291            None,
2292            None,
2293            None, // Use default recv_window_ms (10000ms = 10s)
2294            None, // max_requests_per_second
2295            None, // max_requests_per_minute
2296        )
2297        .expect("Failed to create test client");
2298
2299        let client_custom = BitmexHttpInnerClient::with_credentials(
2300            "test_api_key".to_string(),
2301            "test_api_secret".to_string(),
2302            "http://localhost:8080".to_string(),
2303            Some(60),
2304            None,
2305            None,
2306            None,
2307            Some(30_000), // 30 seconds
2308            None,         // max_requests_per_second
2309            None,         // max_requests_per_minute
2310        )
2311        .expect("Failed to create test client");
2312
2313        let headers_default = client_default
2314            .sign_request(&Method::GET, "/api/v1/order", None)
2315            .unwrap();
2316        let headers_custom = client_custom
2317            .sign_request(&Method::GET, "/api/v1/order", None)
2318            .unwrap();
2319
2320        // Parse expires timestamps
2321        let expires_default: i64 = headers_default.get("api-expires").unwrap().parse().unwrap();
2322        let expires_custom: i64 = headers_custom.get("api-expires").unwrap().parse().unwrap();
2323
2324        // Verify both are valid future timestamps
2325        let now = Utc::now().timestamp();
2326        assert!(expires_default > now);
2327        assert!(expires_custom > now);
2328
2329        // Custom window should be greater than default
2330        assert!(expires_custom > expires_default);
2331
2332        // The difference should be approximately 20 seconds (30s - 10s)
2333        // Allow wider tolerance for delays between calls on slow CI runners
2334        let diff = expires_custom - expires_default;
2335        assert!((18..=25).contains(&diff));
2336    }
2337
2338    #[rstest]
2339    fn test_populate_linked_order_ids_from_order_list() {
2340        let base = "O-20250922-002219-001-000";
2341        let entry = format!("{base}-1");
2342        let stop = format!("{base}-2");
2343        let take = format!("{base}-3");
2344
2345        let mut reports = vec![
2346            build_report(&entry, "V-1", ContingencyType::Oto, Some("OL-1")),
2347            build_report(&stop, "V-2", ContingencyType::Ouo, Some("OL-1")),
2348            build_report(&take, "V-3", ContingencyType::Ouo, Some("OL-1")),
2349        ];
2350
2351        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2352
2353        assert_eq!(
2354            reports[0].linked_order_ids,
2355            Some(vec![
2356                ClientOrderId::from(stop.as_str()),
2357                ClientOrderId::from(take.as_str()),
2358            ]),
2359        );
2360        assert_eq!(
2361            reports[1].linked_order_ids,
2362            Some(vec![
2363                ClientOrderId::from(entry.as_str()),
2364                ClientOrderId::from(take.as_str()),
2365            ]),
2366        );
2367        assert_eq!(
2368            reports[2].linked_order_ids,
2369            Some(vec![
2370                ClientOrderId::from(entry.as_str()),
2371                ClientOrderId::from(stop.as_str()),
2372            ]),
2373        );
2374    }
2375
2376    #[rstest]
2377    fn test_populate_linked_order_ids_from_id_prefix() {
2378        let base = "O-20250922-002220-001-000";
2379        let entry = format!("{base}-1");
2380        let stop = format!("{base}-2");
2381        let take = format!("{base}-3");
2382
2383        let mut reports = vec![
2384            build_report(&entry, "V-1", ContingencyType::Oto, None),
2385            build_report(&stop, "V-2", ContingencyType::Ouo, None),
2386            build_report(&take, "V-3", ContingencyType::Ouo, None),
2387        ];
2388
2389        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2390
2391        assert_eq!(
2392            reports[0].linked_order_ids,
2393            Some(vec![
2394                ClientOrderId::from(stop.as_str()),
2395                ClientOrderId::from(take.as_str()),
2396            ]),
2397        );
2398        assert_eq!(
2399            reports[1].linked_order_ids,
2400            Some(vec![
2401                ClientOrderId::from(entry.as_str()),
2402                ClientOrderId::from(take.as_str()),
2403            ]),
2404        );
2405        assert_eq!(
2406            reports[2].linked_order_ids,
2407            Some(vec![
2408                ClientOrderId::from(entry.as_str()),
2409                ClientOrderId::from(stop.as_str()),
2410            ]),
2411        );
2412    }
2413
2414    #[rstest]
2415    fn test_populate_linked_order_ids_respects_non_contingent_orders() {
2416        let base = "O-20250922-002221-001-000";
2417        let entry = format!("{base}-1");
2418        let passive = format!("{base}-2");
2419
2420        let mut reports = vec![
2421            build_report(&entry, "V-1", ContingencyType::NoContingency, None),
2422            build_report(&passive, "V-2", ContingencyType::Ouo, None),
2423        ];
2424
2425        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2426
2427        // Non-contingent orders should not be linked
2428        assert!(reports[0].linked_order_ids.is_none());
2429
2430        // A contingent order with no other contingent peers should have contingency reset
2431        assert!(reports[1].linked_order_ids.is_none());
2432        assert_eq!(reports[1].contingency_type, ContingencyType::NoContingency);
2433    }
2434}