nautilus_bitmex/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the HTTP client integration for the [BitMEX](https://bitmex.com) REST API.
17//!
18//! This module defines and implements a [`BitmexHttpClient`] for
19//! sending requests to various BitMEX endpoints. It handles request signing
20//! (when credentials are provided), constructs valid HTTP requests
21//! using the [`HttpClient`], and parses the responses back into structured data or a [`BitmexHttpError`].
22//!
23//! BitMEX API reference <https://www.bitmex.com/api/explorer/#/default>.
24
25use std::{
26    collections::HashMap,
27    num::NonZeroU32,
28    sync::{Arc, Mutex},
29};
30
31use ahash::AHashMap;
32use chrono::{DateTime, Utc};
33use nautilus_core::{
34    UnixNanos,
35    consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
36    env::get_env_var,
37    time::get_atomic_clock_realtime,
38};
39use nautilus_model::{
40    data::{Bar, BarType, TradeTick},
41    enums::{
42        AggregationSource, BarAggregation, ContingencyType, OrderSide, OrderType, PriceType,
43        TimeInForce, TriggerType,
44    },
45    events::AccountState,
46    identifiers::{AccountId, ClientOrderId, InstrumentId, OrderListId, VenueOrderId},
47    instruments::{Instrument as InstrumentTrait, InstrumentAny},
48    reports::{FillReport, OrderStatusReport, PositionStatusReport},
49    types::{Price, Quantity},
50};
51use nautilus_network::{
52    http::HttpClient,
53    ratelimiter::quota::Quota,
54    retry::{RetryConfig, RetryManager},
55};
56use reqwest::{Method, StatusCode, header::USER_AGENT};
57use serde::{Deserialize, Serialize, de::DeserializeOwned};
58use serde_json::Value;
59use tokio_util::sync::CancellationToken;
60use ustr::Ustr;
61
62use super::{
63    error::{BitmexErrorResponse, BitmexHttpError},
64    models::{
65        BitmexApiInfo, BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder,
66        BitmexPosition, BitmexTrade, BitmexTradeBin, BitmexWallet,
67    },
68    query::{
69        DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
70        GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeBucketedParams,
71        GetTradeBucketedParamsBuilder, GetTradeParams, GetTradeParamsBuilder, PostOrderParams,
72        PostPositionLeverageParams, PutOrderParams,
73    },
74};
75use crate::{
76    common::{
77        consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL},
78        credential::Credential,
79        enums::{BitmexContingencyType, BitmexOrderStatus, BitmexSide},
80        parse::{parse_account_state, quantity_to_u32},
81    },
82    http::{
83        parse::{
84            parse_fill_report, parse_instrument_any, parse_order_status_report,
85            parse_position_report, parse_trade, parse_trade_bin,
86        },
87        query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
88    },
89    websocket::messages::BitmexMarginMsg,
90};
91
92/// Default BitMEX REST API rate limits.
93///
94/// BitMEX implements a dual-layer rate limiting system:
95/// - Primary limit: 120 requests per minute for authenticated users (30 for unauthenticated).
96/// - Secondary limit: 10 requests per second burst limit for specific endpoints.
97const BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 10;
98const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED: u32 = 120;
99const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED: u32 = 30;
100
101const BITMEX_GLOBAL_RATE_KEY: &str = "bitmex:global";
102const BITMEX_MINUTE_RATE_KEY: &str = "bitmex:minute";
103
104/// Represents a BitMEX HTTP response.
105#[derive(Debug, Serialize, Deserialize)]
106pub struct BitmexResponse<T> {
107    /// The typed data returned by the BitMEX endpoint.
108    pub data: Vec<T>,
109}
110
111/// Provides a lower-level HTTP client for connecting to the [BitMEX](https://bitmex.com) REST API.
112///
113/// This client wraps the underlying [`HttpClient`] to handle functionality
114/// specific to BitMEX, such as request signing (for authenticated endpoints),
115/// forming request URLs, and deserializing responses into specific data models.
116///
117/// # Connection Management
118///
119/// The client uses HTTP keep-alive for connection pooling with a 90-second idle timeout,
120/// which matches BitMEX's server-side keep-alive timeout. Connections are automatically
121/// reused for subsequent requests to minimize latency.
122///
123/// # Rate Limiting
124///
125/// BitMEX enforces the following rate limits:
126/// - 120 requests per minute for authenticated users (30 for unauthenticated).
127/// - 10 requests per second burst limit for certain endpoints (order management).
128///
129/// The client automatically respects these limits through the configured quota.
130#[derive(Debug, Clone)]
131pub struct BitmexHttpInnerClient {
132    base_url: String,
133    client: HttpClient,
134    credential: Option<Credential>,
135    recv_window_ms: u64,
136    retry_manager: RetryManager<BitmexHttpError>,
137    cancellation_token: CancellationToken,
138}
139
140impl Default for BitmexHttpInnerClient {
141    fn default() -> Self {
142        Self::new(None, Some(60), None, None, None, None, None, None)
143            .expect("Failed to create default BitmexHttpInnerClient")
144    }
145}
146
147impl BitmexHttpInnerClient {
148    /// Cancel all pending HTTP requests.
149    pub fn cancel_all_requests(&self) {
150        self.cancellation_token.cancel();
151    }
152
153    /// Get the cancellation token for this client.
154    pub fn cancellation_token(&self) -> &CancellationToken {
155        &self.cancellation_token
156    }
157    /// Creates a new [`BitmexHttpInnerClient`] using the default BitMEX HTTP URL,
158    /// optionally overridden with a custom base URL.
159    ///
160    /// This version of the client has **no credentials**, so it can only
161    /// call publicly accessible endpoints.
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if the retry manager cannot be created.
166    #[allow(clippy::too_many_arguments)]
167    pub fn new(
168        base_url: Option<String>,
169        timeout_secs: Option<u64>,
170        max_retries: Option<u32>,
171        retry_delay_ms: Option<u64>,
172        retry_delay_max_ms: Option<u64>,
173        recv_window_ms: Option<u64>,
174        max_requests_per_second: Option<u32>,
175        max_requests_per_minute: Option<u32>,
176    ) -> Result<Self, BitmexHttpError> {
177        let retry_config = RetryConfig {
178            max_retries: max_retries.unwrap_or(3),
179            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
180            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
181            backoff_factor: 2.0,
182            jitter_ms: 1000,
183            operation_timeout_ms: Some(60_000),
184            immediate_first: false,
185            max_elapsed_ms: Some(180_000),
186        };
187
188        let retry_manager = RetryManager::new(retry_config).map_err(|e| {
189            BitmexHttpError::NetworkError(format!("Failed to create retry manager: {e}"))
190        })?;
191
192        let max_req_per_sec =
193            max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
194        let max_req_per_min =
195            max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED);
196
197        Ok(Self {
198            base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
199            client: HttpClient::new(
200                Self::default_headers(),
201                vec![],
202                Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
203                Some(Self::default_quota(max_req_per_sec)),
204                timeout_secs,
205            ),
206            credential: None,
207            recv_window_ms: recv_window_ms.unwrap_or(10_000),
208            retry_manager,
209            cancellation_token: CancellationToken::new(),
210        })
211    }
212
213    /// Creates a new [`BitmexHttpInnerClient`] configured with credentials
214    /// for authenticated requests, optionally using a custom base URL.
215    ///
216    /// # Errors
217    ///
218    /// Returns an error if the retry manager cannot be created.
219    #[allow(clippy::too_many_arguments)]
220    pub fn with_credentials(
221        api_key: String,
222        api_secret: String,
223        base_url: String,
224        timeout_secs: Option<u64>,
225        max_retries: Option<u32>,
226        retry_delay_ms: Option<u64>,
227        retry_delay_max_ms: Option<u64>,
228        recv_window_ms: Option<u64>,
229        max_requests_per_second: Option<u32>,
230        max_requests_per_minute: Option<u32>,
231    ) -> Result<Self, BitmexHttpError> {
232        let retry_config = RetryConfig {
233            max_retries: max_retries.unwrap_or(3),
234            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
235            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
236            backoff_factor: 2.0,
237            jitter_ms: 1000,
238            operation_timeout_ms: Some(60_000),
239            immediate_first: false,
240            max_elapsed_ms: Some(180_000),
241        };
242
243        let retry_manager = RetryManager::new(retry_config).map_err(|e| {
244            BitmexHttpError::NetworkError(format!("Failed to create retry manager: {e}"))
245        })?;
246
247        let max_req_per_sec =
248            max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
249        let max_req_per_min =
250            max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED);
251
252        Ok(Self {
253            base_url,
254            client: HttpClient::new(
255                Self::default_headers(),
256                vec![],
257                Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
258                Some(Self::default_quota(max_req_per_sec)),
259                timeout_secs,
260            ),
261            credential: Some(Credential::new(api_key, api_secret)),
262            recv_window_ms: recv_window_ms.unwrap_or(10_000),
263            retry_manager,
264            cancellation_token: CancellationToken::new(),
265        })
266    }
267
268    fn default_headers() -> HashMap<String, String> {
269        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
270    }
271
272    fn default_quota(max_requests_per_second: u32) -> Quota {
273        Quota::per_second(
274            NonZeroU32::new(max_requests_per_second)
275                .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
276        )
277    }
278
279    fn rate_limiter_quotas(
280        max_requests_per_second: u32,
281        max_requests_per_minute: u32,
282    ) -> Vec<(String, Quota)> {
283        let per_sec_quota = Quota::per_second(
284            NonZeroU32::new(max_requests_per_second)
285                .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
286        );
287        let per_min_quota =
288            Quota::per_minute(NonZeroU32::new(max_requests_per_minute).unwrap_or_else(|| {
289                NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED).unwrap()
290            }));
291
292        vec![
293            (BITMEX_GLOBAL_RATE_KEY.to_string(), per_sec_quota),
294            (BITMEX_MINUTE_RATE_KEY.to_string(), per_min_quota),
295        ]
296    }
297
298    fn rate_limit_keys() -> Vec<Ustr> {
299        vec![
300            Ustr::from(BITMEX_GLOBAL_RATE_KEY),
301            Ustr::from(BITMEX_MINUTE_RATE_KEY),
302        ]
303    }
304
305    fn sign_request(
306        &self,
307        method: &Method,
308        endpoint: &str,
309        body: Option<&[u8]>,
310    ) -> Result<HashMap<String, String>, BitmexHttpError> {
311        let credential = self
312            .credential
313            .as_ref()
314            .ok_or(BitmexHttpError::MissingCredentials)?;
315
316        let expires = Utc::now().timestamp() + (self.recv_window_ms / 1000) as i64;
317        let body_str = body.and_then(|b| std::str::from_utf8(b).ok()).unwrap_or("");
318
319        let full_path = if endpoint.starts_with("/api/v1") {
320            endpoint.to_string()
321        } else {
322            format!("/api/v1{endpoint}")
323        };
324
325        let signature = credential.sign(method.as_str(), &full_path, expires, body_str);
326
327        let mut headers = HashMap::new();
328        headers.insert("api-expires".to_string(), expires.to_string());
329        headers.insert("api-key".to_string(), credential.api_key.to_string());
330        headers.insert("api-signature".to_string(), signature);
331
332        // Add Content-Type header for form-encoded body
333        if body.is_some()
334            && (*method == Method::POST || *method == Method::PUT || *method == Method::DELETE)
335        {
336            headers.insert(
337                "Content-Type".to_string(),
338                "application/x-www-form-urlencoded".to_string(),
339            );
340        }
341
342        Ok(headers)
343    }
344
345    async fn send_request<T: DeserializeOwned>(
346        &self,
347        method: Method,
348        endpoint: &str,
349        body: Option<Vec<u8>>,
350        authenticate: bool,
351    ) -> Result<T, BitmexHttpError> {
352        let endpoint = endpoint.to_string();
353        let url = format!("{}{endpoint}", self.base_url);
354        let method_clone = method.clone();
355        let body_clone = body.clone();
356
357        let operation = || {
358            let url = url.clone();
359            let method = method_clone.clone();
360            let body = body_clone.clone();
361            let endpoint = endpoint.clone();
362
363            async move {
364                let headers = if authenticate {
365                    Some(self.sign_request(&method, endpoint.as_str(), body.as_deref())?)
366                } else {
367                    None
368                };
369
370                let rate_keys = Self::rate_limit_keys();
371                let resp = self
372                    .client
373                    .request_with_ustr_keys(method, url, headers, body, None, Some(rate_keys))
374                    .await?;
375
376                if resp.status.is_success() {
377                    serde_json::from_slice(&resp.body).map_err(Into::into)
378                } else if let Ok(error_resp) =
379                    serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
380                {
381                    Err(error_resp.into())
382                } else {
383                    Err(BitmexHttpError::UnexpectedStatus {
384                        status: StatusCode::from_u16(resp.status.as_u16())
385                            .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
386                        body: String::from_utf8_lossy(&resp.body).to_string(),
387                    })
388                }
389            }
390        };
391
392        // Retry strategy based on BitMEX error responses and HTTP status codes:
393        //
394        // 1. Network errors: always retry (transient connection issues)
395        // 2. HTTP 5xx/429: server errors and rate limiting should be retried
396        // 3. BitMEX JSON errors with specific handling:
397        //    - "RateLimitError": explicit rate limit error from BitMEX
398        //    - "HTTPError": generic error name used by BitMEX for various issues
399        //      Only retry if message contains "rate limit" to avoid retrying
400        //      non-transient errors like authentication failures, validation errors,
401        //      insufficient balance, etc. which also return as "HTTPError"
402        //
403        // Note: BitMEX returns many permanent errors as "HTTPError" (e.g., "Invalid orderQty",
404        // "Account has insufficient Available Balance", "Invalid API Key") which should NOT
405        // be retried. We only retry when the message explicitly mentions rate limiting.
406        //
407        // See tests in tests/http.rs for retry behavior validation
408        let should_retry = |error: &BitmexHttpError| -> bool {
409            match error {
410                BitmexHttpError::NetworkError(_) => true,
411                BitmexHttpError::UnexpectedStatus { status, .. } => {
412                    status.as_u16() >= 500 || status.as_u16() == 429
413                }
414                BitmexHttpError::BitmexError {
415                    error_name,
416                    message,
417                } => {
418                    error_name == "RateLimitError"
419                        || (error_name == "HTTPError"
420                            && message.to_lowercase().contains("rate limit"))
421                }
422                _ => false,
423            }
424        };
425
426        let create_error = |msg: String| -> BitmexHttpError {
427            if msg == "canceled" {
428                BitmexHttpError::NetworkError("Request canceled".to_string())
429            } else {
430                BitmexHttpError::NetworkError(msg)
431            }
432        };
433
434        self.retry_manager
435            .execute_with_retry_with_cancel(
436                endpoint.as_str(),
437                operation,
438                should_retry,
439                create_error,
440                &self.cancellation_token,
441            )
442            .await
443    }
444
445    /// Get all instruments.
446    ///
447    /// # Errors
448    ///
449    /// Returns an error if the request fails, the response cannot be parsed, or the API returns an error.
450    pub async fn http_get_instruments(
451        &self,
452        active_only: bool,
453    ) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
454        let path = if active_only {
455            "/instrument/active"
456        } else {
457            "/instrument"
458        };
459        self.send_request(Method::GET, path, None, false).await
460    }
461
462    /// Requests the current server time from BitMEX.
463    ///
464    /// Retrieves the BitMEX API info including the system time in Unix timestamp (milliseconds).
465    /// This is useful for synchronizing local clocks with the exchange server and logging time drift.
466    ///
467    /// # Errors
468    ///
469    /// Returns an error if the HTTP request fails or if the response body
470    /// cannot be parsed into [`BitmexApiInfo`].
471    pub async fn http_get_server_time(&self) -> Result<u64, BitmexHttpError> {
472        let response: BitmexApiInfo = self.send_request(Method::GET, "", None, false).await?;
473        Ok(response.timestamp)
474    }
475
476    /// Get the instrument definition for the specified symbol.
477    ///
478    /// BitMEX responds to `/instrument?symbol=...` with an array, even when
479    /// a single symbol is requested. This helper returns the first element of
480    /// that array and yields `Ok(None)` when the venue returns an empty list
481    /// (e.g. unknown symbol).
482    ///
483    /// # Errors
484    ///
485    /// Returns an error if the request fails or the payload cannot be deserialized.
486    pub async fn http_get_instrument(
487        &self,
488        symbol: &str,
489    ) -> Result<Option<BitmexInstrument>, BitmexHttpError> {
490        let path = &format!("/instrument?symbol={symbol}");
491        let instruments: Vec<BitmexInstrument> =
492            self.send_request(Method::GET, path, None, false).await?;
493
494        Ok(instruments.into_iter().next())
495    }
496
497    /// Get user wallet information.
498    ///
499    /// # Errors
500    ///
501    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
502    pub async fn http_get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
503        let endpoint = "/user/wallet";
504        self.send_request(Method::GET, endpoint, None, true).await
505    }
506
507    /// Get user margin information.
508    ///
509    /// # Errors
510    ///
511    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
512    pub async fn http_get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
513        let path = format!("/user/margin?currency={currency}");
514        self.send_request(Method::GET, &path, None, true).await
515    }
516
517    /// Get historical trades.
518    ///
519    /// # Errors
520    ///
521    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
522    ///
523    /// # Panics
524    ///
525    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
526    pub async fn http_get_trades(
527        &self,
528        params: GetTradeParams,
529    ) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
530        let query = serde_urlencoded::to_string(&params).map_err(|e| {
531            BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
532        })?;
533        let path = format!("/trade?{query}");
534        self.send_request(Method::GET, &path, None, true).await
535    }
536
537    /// Get bucketed (aggregated) trade data.
538    ///
539    /// # Errors
540    ///
541    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
542    pub async fn http_get_trade_bucketed(
543        &self,
544        params: GetTradeBucketedParams,
545    ) -> Result<Vec<BitmexTradeBin>, BitmexHttpError> {
546        let query = serde_urlencoded::to_string(&params).map_err(|e| {
547            BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
548        })?;
549        let path = format!("/trade/bucketed?{query}");
550        self.send_request(Method::GET, &path, None, true).await
551    }
552
553    /// Get user orders.
554    ///
555    /// # Errors
556    ///
557    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
558    ///
559    /// # Panics
560    ///
561    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
562    pub async fn http_get_orders(
563        &self,
564        params: GetOrderParams,
565    ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
566        let query = serde_urlencoded::to_string(&params).map_err(|e| {
567            BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
568        })?;
569        let path = format!("/order?{query}");
570        self.send_request(Method::GET, &path, None, true).await
571    }
572
573    /// Place a new order.
574    ///
575    /// # Errors
576    ///
577    /// Returns an error if credentials are missing, the request fails, order validation fails, or the API returns an error.
578    ///
579    /// # Panics
580    ///
581    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
582    pub async fn http_place_order(
583        &self,
584        params: PostOrderParams,
585    ) -> Result<Value, BitmexHttpError> {
586        // BitMEX spec requires form-encoded body for POST /order
587        let body = serde_urlencoded::to_string(&params)
588            .map_err(|e| {
589                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
590            })?
591            .into_bytes();
592        let path = "/order";
593        self.send_request(Method::POST, path, Some(body), true)
594            .await
595    }
596
597    /// Cancel user orders.
598    ///
599    /// # Errors
600    ///
601    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
602    ///
603    /// # Panics
604    ///
605    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
606    pub async fn http_cancel_orders(
607        &self,
608        params: DeleteOrderParams,
609    ) -> Result<Value, BitmexHttpError> {
610        // BitMEX spec requires form-encoded body for DELETE /order
611        let body = serde_urlencoded::to_string(&params)
612            .map_err(|e| {
613                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
614            })?
615            .into_bytes();
616        let path = "/order";
617        self.send_request(Method::DELETE, path, Some(body), true)
618            .await
619    }
620
621    /// Amend an existing order.
622    ///
623    /// # Errors
624    ///
625    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
626    ///
627    /// # Panics
628    ///
629    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
630    pub async fn http_amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
631        // BitMEX spec requires form-encoded body for PUT /order
632        let body = serde_urlencoded::to_string(&params)
633            .map_err(|e| {
634                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
635            })?
636            .into_bytes();
637        let path = "/order";
638        self.send_request(Method::PUT, path, Some(body), true).await
639    }
640
641    /// Cancel all orders.
642    ///
643    /// # Errors
644    ///
645    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
646    ///
647    /// # Panics
648    ///
649    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
650    ///
651    /// # References
652    ///
653    /// <https://www.bitmex.com/api/explorer/#!/Order/Order_cancelAll>
654    pub async fn http_cancel_all_orders(
655        &self,
656        params: DeleteAllOrdersParams,
657    ) -> Result<Value, BitmexHttpError> {
658        let query = serde_urlencoded::to_string(&params).map_err(|e| {
659            BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
660        })?;
661        let path = format!("/order/all?{query}");
662        self.send_request(Method::DELETE, &path, None, true).await
663    }
664
665    /// Get user executions.
666    ///
667    /// # Errors
668    ///
669    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
670    ///
671    /// # Panics
672    ///
673    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
674    pub async fn http_get_executions(
675        &self,
676        params: GetExecutionParams,
677    ) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
678        let query = serde_urlencoded::to_string(&params).map_err(|e| {
679            BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
680        })?;
681        let path = format!("/execution/tradeHistory?{query}");
682        self.send_request(Method::GET, &path, None, true).await
683    }
684
685    /// Get user positions.
686    ///
687    /// # Errors
688    ///
689    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
690    ///
691    /// # Panics
692    ///
693    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
694    pub async fn http_get_positions(
695        &self,
696        params: GetPositionParams,
697    ) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
698        let query = serde_urlencoded::to_string(&params).map_err(|e| {
699            BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
700        })?;
701        let path = format!("/position?{query}");
702        self.send_request(Method::GET, &path, None, true).await
703    }
704
705    /// Update position leverage.
706    ///
707    /// # Errors
708    ///
709    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
710    ///
711    /// # Panics
712    ///
713    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
714    pub async fn http_update_position_leverage(
715        &self,
716        params: PostPositionLeverageParams,
717    ) -> Result<BitmexPosition, BitmexHttpError> {
718        // BitMEX spec requires form-encoded body for POST endpoints
719        let body = serde_urlencoded::to_string(&params)
720            .map_err(|e| {
721                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
722            })?
723            .into_bytes();
724        let path = "/position/leverage";
725        self.send_request(Method::POST, path, Some(body), true)
726            .await
727    }
728}
729
730/// Provides a HTTP client for connecting to the [BitMEX](https://bitmex.com) REST API.
731///
732/// This is the high-level client that wraps the inner client and provides
733/// Nautilus-specific functionality for trading operations.
734#[derive(Clone, Debug)]
735#[cfg_attr(
736    feature = "python",
737    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
738)]
739pub struct BitmexHttpClient {
740    inner: Arc<BitmexHttpInnerClient>,
741    instruments_cache: Arc<Mutex<AHashMap<Ustr, InstrumentAny>>>,
742}
743
744impl Default for BitmexHttpClient {
745    fn default() -> Self {
746        Self::new(
747            None,
748            None,
749            None,
750            false,
751            Some(60),
752            None,
753            None,
754            None,
755            None,
756            None,
757            None,
758        )
759        .expect("Failed to create default BitmexHttpClient")
760    }
761}
762
763impl BitmexHttpClient {
764    /// Creates a new [`BitmexHttpClient`] instance.
765    ///
766    /// # Errors
767    ///
768    /// Returns an error if the HTTP client cannot be created.
769    #[allow(clippy::too_many_arguments)]
770    pub fn new(
771        base_url: Option<String>,
772        api_key: Option<String>,
773        api_secret: Option<String>,
774        testnet: bool,
775        timeout_secs: Option<u64>,
776        max_retries: Option<u32>,
777        retry_delay_ms: Option<u64>,
778        retry_delay_max_ms: Option<u64>,
779        recv_window_ms: Option<u64>,
780        max_requests_per_second: Option<u32>,
781        max_requests_per_minute: Option<u32>,
782    ) -> Result<Self, BitmexHttpError> {
783        // Determine the base URL
784        let url = base_url.unwrap_or_else(|| {
785            if testnet {
786                BITMEX_HTTP_TESTNET_URL.to_string()
787            } else {
788                BITMEX_HTTP_URL.to_string()
789            }
790        });
791
792        let inner = match (api_key, api_secret) {
793            (Some(key), Some(secret)) => BitmexHttpInnerClient::with_credentials(
794                key,
795                secret,
796                url,
797                timeout_secs,
798                max_retries,
799                retry_delay_ms,
800                retry_delay_max_ms,
801                recv_window_ms,
802                max_requests_per_second,
803                max_requests_per_minute,
804            )?,
805            _ => BitmexHttpInnerClient::new(
806                Some(url),
807                timeout_secs,
808                max_retries,
809                retry_delay_ms,
810                retry_delay_max_ms,
811                recv_window_ms,
812                max_requests_per_second,
813                max_requests_per_minute,
814            )?,
815        };
816
817        Ok(Self {
818            inner: Arc::new(inner),
819            instruments_cache: Arc::new(Mutex::new(AHashMap::new())),
820        })
821    }
822
823    /// Creates a new [`BitmexHttpClient`] instance using environment variables and
824    /// the default BitMEX HTTP base URL.
825    ///
826    /// # Errors
827    ///
828    /// Returns an error if required environment variables are not set or invalid.
829    pub fn from_env() -> anyhow::Result<Self> {
830        Self::with_credentials(None, None, None, None, None, None, None, None, None, None)
831            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
832    }
833
834    /// Creates a new [`BitmexHttpClient`] configured with credentials
835    /// for authenticated requests.
836    ///
837    /// If `api_key` or `api_secret` are `None`, they will be sourced from the
838    /// `BITMEX_API_KEY` and `BITMEX_API_SECRET` environment variables.
839    ///
840    /// # Errors
841    ///
842    /// Returns an error if one credential is provided without the other.
843    #[allow(clippy::too_many_arguments)]
844    pub fn with_credentials(
845        api_key: Option<String>,
846        api_secret: Option<String>,
847        base_url: Option<String>,
848        timeout_secs: Option<u64>,
849        max_retries: Option<u32>,
850        retry_delay_ms: Option<u64>,
851        retry_delay_max_ms: Option<u64>,
852        recv_window_ms: Option<u64>,
853        max_requests_per_second: Option<u32>,
854        max_requests_per_minute: Option<u32>,
855    ) -> anyhow::Result<Self> {
856        // Determine testnet from URL first to select correct environment variables
857        let testnet = base_url.as_ref().is_some_and(|url| url.contains("testnet"));
858
859        // Choose environment variables based on testnet flag
860        let (key_var, secret_var) = if testnet {
861            ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
862        } else {
863            ("BITMEX_API_KEY", "BITMEX_API_SECRET")
864        };
865
866        let api_key = api_key.or_else(|| get_env_var(key_var).ok());
867        let api_secret = api_secret.or_else(|| get_env_var(secret_var).ok());
868
869        // If we're trying to create an authenticated client, we need both key and secret
870        if api_key.is_some() && api_secret.is_none() {
871            anyhow::bail!("{secret_var} is required when {key_var} is provided");
872        }
873        if api_key.is_none() && api_secret.is_some() {
874            anyhow::bail!("{key_var} is required when {secret_var} is provided");
875        }
876
877        Self::new(
878            base_url,
879            api_key,
880            api_secret,
881            testnet,
882            timeout_secs,
883            max_retries,
884            retry_delay_ms,
885            retry_delay_max_ms,
886            recv_window_ms,
887            max_requests_per_second,
888            max_requests_per_minute,
889        )
890        .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
891    }
892
893    /// Returns the base url being used by the client.
894    #[must_use]
895    pub fn base_url(&self) -> &str {
896        self.inner.base_url.as_str()
897    }
898
899    /// Returns the public API key being used by the client.
900    #[must_use]
901    pub fn api_key(&self) -> Option<&str> {
902        self.inner.credential.as_ref().map(|c| c.api_key.as_str())
903    }
904
905    /// Requests the current server time from BitMEX.
906    ///
907    /// Returns the BitMEX system time as a Unix timestamp in milliseconds.
908    ///
909    /// # Errors
910    ///
911    /// Returns an error if the HTTP request fails or if the response cannot be parsed.
912    pub async fn http_get_server_time(&self) -> Result<u64, BitmexHttpError> {
913        self.inner.http_get_server_time().await
914    }
915
916    /// Generates a timestamp for initialization.
917    fn generate_ts_init(&self) -> UnixNanos {
918        get_atomic_clock_realtime().get_time_ns()
919    }
920
921    /// Check if the order has a contingency type that requires linking.
922    fn is_contingent_order(contingency_type: ContingencyType) -> bool {
923        matches!(
924            contingency_type,
925            ContingencyType::Oco | ContingencyType::Oto | ContingencyType::Ouo
926        )
927    }
928
929    /// Check if the order is a parent in contingency relationships.
930    fn is_parent_contingency(contingency_type: ContingencyType) -> bool {
931        matches!(
932            contingency_type,
933            ContingencyType::Oco | ContingencyType::Oto
934        )
935    }
936
937    /// Populate missing `linked_order_ids` for contingency orders by grouping on `order_list_id`.
938    fn populate_linked_order_ids(reports: &mut [OrderStatusReport]) {
939        let mut order_list_groups: HashMap<OrderListId, Vec<ClientOrderId>> = HashMap::new();
940        let mut order_list_parents: HashMap<OrderListId, ClientOrderId> = HashMap::new();
941        let mut prefix_groups: HashMap<String, Vec<ClientOrderId>> = HashMap::new();
942        let mut prefix_parents: HashMap<String, ClientOrderId> = HashMap::new();
943
944        for report in reports.iter() {
945            let Some(client_order_id) = report.client_order_id else {
946                continue;
947            };
948
949            if let Some(order_list_id) = report.order_list_id {
950                order_list_groups
951                    .entry(order_list_id)
952                    .or_default()
953                    .push(client_order_id);
954
955                if Self::is_parent_contingency(report.contingency_type) {
956                    order_list_parents
957                        .entry(order_list_id)
958                        .or_insert(client_order_id);
959                }
960            }
961
962            if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
963                && Self::is_contingent_order(report.contingency_type)
964            {
965                prefix_groups
966                    .entry(base.to_owned())
967                    .or_default()
968                    .push(client_order_id);
969
970                if Self::is_parent_contingency(report.contingency_type) {
971                    prefix_parents
972                        .entry(base.to_owned())
973                        .or_insert(client_order_id);
974                }
975            }
976        }
977
978        for report in reports.iter_mut() {
979            let Some(client_order_id) = report.client_order_id else {
980                continue;
981            };
982
983            if report.linked_order_ids.is_some() {
984                continue;
985            }
986
987            // Only process contingent orders
988            if !Self::is_contingent_order(report.contingency_type) {
989                continue;
990            }
991
992            if let Some(order_list_id) = report.order_list_id
993                && let Some(group) = order_list_groups.get(&order_list_id)
994            {
995                let mut linked: Vec<ClientOrderId> = group
996                    .iter()
997                    .copied()
998                    .filter(|candidate| candidate != &client_order_id)
999                    .collect();
1000
1001                if !linked.is_empty() {
1002                    if let Some(parent_id) = order_list_parents.get(&order_list_id) {
1003                        if client_order_id != *parent_id {
1004                            linked.sort_by_key(
1005                                |candidate| {
1006                                    if candidate == parent_id { 0 } else { 1 }
1007                                },
1008                            );
1009                            report.parent_order_id = Some(*parent_id);
1010                        } else {
1011                            report.parent_order_id = None;
1012                        }
1013                    } else {
1014                        report.parent_order_id = None;
1015                    }
1016
1017                    tracing::trace!(
1018                        client_order_id = ?client_order_id,
1019                        order_list_id = ?order_list_id,
1020                        contingency_type = ?report.contingency_type,
1021                        linked_order_ids = ?linked,
1022                        "BitMEX linked ids sourced from order list id",
1023                    );
1024                    report.linked_order_ids = Some(linked);
1025                    continue;
1026                }
1027
1028                tracing::trace!(
1029                    client_order_id = ?client_order_id,
1030                    order_list_id = ?order_list_id,
1031                    contingency_type = ?report.contingency_type,
1032                    order_list_group = ?group,
1033                    "BitMEX order list id group had no peers",
1034                );
1035                report.parent_order_id = None;
1036            } else if report.order_list_id.is_none() {
1037                report.parent_order_id = None;
1038            }
1039
1040            if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1041                && let Some(group) = prefix_groups.get(base)
1042            {
1043                let mut linked: Vec<ClientOrderId> = group
1044                    .iter()
1045                    .copied()
1046                    .filter(|candidate| candidate != &client_order_id)
1047                    .collect();
1048
1049                if !linked.is_empty() {
1050                    if let Some(parent_id) = prefix_parents.get(base) {
1051                        if client_order_id != *parent_id {
1052                            linked.sort_by_key(
1053                                |candidate| {
1054                                    if candidate == parent_id { 0 } else { 1 }
1055                                },
1056                            );
1057                            report.parent_order_id = Some(*parent_id);
1058                        } else {
1059                            report.parent_order_id = None;
1060                        }
1061                    } else {
1062                        report.parent_order_id = None;
1063                    }
1064
1065                    tracing::trace!(
1066                        client_order_id = ?client_order_id,
1067                        contingency_type = ?report.contingency_type,
1068                        base = base,
1069                        linked_order_ids = ?linked,
1070                        "BitMEX linked ids constructed from client order id prefix",
1071                    );
1072                    report.linked_order_ids = Some(linked);
1073                    continue;
1074                }
1075
1076                tracing::trace!(
1077                    client_order_id = ?client_order_id,
1078                    contingency_type = ?report.contingency_type,
1079                    base = base,
1080                    prefix_group = ?group,
1081                    "BitMEX client order id prefix group had no peers",
1082                );
1083                report.parent_order_id = None;
1084            } else if client_order_id.as_str().contains('-') {
1085                report.parent_order_id = None;
1086            }
1087
1088            if Self::is_contingent_order(report.contingency_type) {
1089                tracing::warn!(
1090                    client_order_id = ?report.client_order_id,
1091                    order_list_id = ?report.order_list_id,
1092                    contingency_type = ?report.contingency_type,
1093                    "BitMEX order status report missing linked ids after grouping",
1094                );
1095                report.contingency_type = ContingencyType::NoContingency;
1096                report.parent_order_id = None;
1097            }
1098
1099            report.linked_order_ids = None;
1100        }
1101    }
1102
1103    /// Cancel all pending HTTP requests.
1104    pub fn cancel_all_requests(&self) {
1105        self.inner.cancel_all_requests();
1106    }
1107
1108    /// Get the cancellation token for this client.
1109    pub fn cancellation_token(&self) -> CancellationToken {
1110        self.inner.cancellation_token().clone()
1111    }
1112
1113    /// Adds an instrument to the cache for precision lookups.
1114    ///
1115    /// # Panics
1116    ///
1117    /// Panics if the instruments cache mutex is poisoned.
1118    pub fn add_instrument(&self, instrument: InstrumentAny) {
1119        self.instruments_cache
1120            .lock()
1121            .unwrap()
1122            .insert(instrument.raw_symbol().inner(), instrument);
1123    }
1124
1125    /// Request a single instrument and parse it into a Nautilus type.
1126    ///
1127    /// # Errors
1128    ///
1129    /// Returns `Ok(Some(..))` when the venue returns a definition that parses
1130    /// successfully, `Ok(None)` when the instrument is unknown or the payload
1131    /// cannot be converted into a Nautilus `Instrument`.
1132    pub async fn request_instrument(
1133        &self,
1134        instrument_id: InstrumentId,
1135    ) -> anyhow::Result<Option<InstrumentAny>> {
1136        let response = self
1137            .inner
1138            .http_get_instrument(instrument_id.symbol.as_str())
1139            .await?;
1140
1141        let instrument = match response {
1142            Some(instrument) => instrument,
1143            None => return Ok(None),
1144        };
1145
1146        let ts_init = self.generate_ts_init();
1147
1148        Ok(parse_instrument_any(&instrument, ts_init))
1149    }
1150
1151    /// Request all available instruments and parse them into Nautilus types.
1152    ///
1153    /// # Errors
1154    ///
1155    /// Returns an error if the HTTP request fails or parsing fails.
1156    pub async fn request_instruments(
1157        &self,
1158        active_only: bool,
1159    ) -> anyhow::Result<Vec<InstrumentAny>> {
1160        let instruments = self.inner.http_get_instruments(active_only).await?;
1161        let ts_init = self.generate_ts_init();
1162
1163        let mut parsed_instruments = Vec::new();
1164        for inst in instruments {
1165            if let Some(instrument_any) = parse_instrument_any(&inst, ts_init) {
1166                parsed_instruments.push(instrument_any);
1167            }
1168        }
1169
1170        Ok(parsed_instruments)
1171    }
1172
1173    /// Get user wallet information.
1174    ///
1175    /// # Errors
1176    ///
1177    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1178    ///
1179    /// # Panics
1180    ///
1181    /// Panics if the inner mutex is poisoned.
1182    pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
1183        let inner = self.inner.clone();
1184        inner.http_get_wallet().await
1185    }
1186
1187    /// Get user orders.
1188    ///
1189    /// # Errors
1190    ///
1191    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1192    ///
1193    /// # Panics
1194    ///
1195    /// Panics if the inner mutex is poisoned.
1196    pub async fn get_orders(
1197        &self,
1198        params: GetOrderParams,
1199    ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
1200        let inner = self.inner.clone();
1201        inner.http_get_orders(params).await
1202    }
1203
1204    /// Place a new order with raw API params.
1205    ///
1206    /// # Errors
1207    ///
1208    /// Returns an error if credentials are missing, the request fails, order validation fails, or the API returns an error.
1209    ///
1210    /// # Panics
1211    ///
1212    /// Panics if the inner mutex is poisoned.
1213    pub async fn http_place_order(
1214        &self,
1215        params: PostOrderParams,
1216    ) -> Result<Value, BitmexHttpError> {
1217        let inner = self.inner.clone();
1218        inner.http_place_order(params).await
1219    }
1220
1221    /// Cancel user orders with raw API params.
1222    ///
1223    /// # Errors
1224    ///
1225    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
1226    ///
1227    /// # Panics
1228    ///
1229    /// Panics if the inner mutex is poisoned.
1230    pub async fn http_cancel_orders(
1231        &self,
1232        params: DeleteOrderParams,
1233    ) -> Result<Value, BitmexHttpError> {
1234        let inner = self.inner.clone();
1235        inner.http_cancel_orders(params).await
1236    }
1237
1238    /// Amend an existing order with raw API params.
1239    ///
1240    /// # Errors
1241    ///
1242    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
1243    ///
1244    /// # Panics
1245    ///
1246    /// Panics if the inner mutex is poisoned.
1247    pub async fn http_amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
1248        let inner = self.inner.clone();
1249        inner.http_amend_order(params).await
1250    }
1251
1252    /// Cancel all orders with raw API params.
1253    ///
1254    /// # Errors
1255    ///
1256    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1257    ///
1258    /// # Panics
1259    ///
1260    /// Panics if the inner mutex is poisoned.
1261    ///
1262    /// # References
1263    ///
1264    /// <https://www.bitmex.com/api/explorer/#!/Order/Order_cancelAll>
1265    pub async fn http_cancel_all_orders(
1266        &self,
1267        params: DeleteAllOrdersParams,
1268    ) -> Result<Value, BitmexHttpError> {
1269        let inner = self.inner.clone();
1270        inner.http_cancel_all_orders(params).await
1271    }
1272
1273    /// Get price precision for a symbol from the instruments cache (if found).
1274    ///
1275    /// # Errors
1276    ///
1277    /// Returns an error if the instrument is not found in the cache.
1278    ///
1279    /// # Panics
1280    ///
1281    /// Panics if the instruments cache mutex is poisoned.
1282    fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1283        let cache = self.instruments_cache.lock().unwrap();
1284        cache.get(&symbol).cloned().ok_or_else(|| {
1285            anyhow::anyhow!(
1286                "Instrument {symbol} not found in cache, ensure instruments loaded first"
1287            )
1288        })
1289    }
1290
1291    /// Returns the cached price precision for the given symbol.
1292    ///
1293    /// # Errors
1294    ///
1295    /// Returns an error if the instrument was never cached (for example, if
1296    /// instruments were not loaded prior to use).
1297    pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
1298        self.instrument_from_cache(symbol)
1299            .map(|instrument| instrument.price_precision())
1300    }
1301
1302    /// Get user margin information.
1303    ///
1304    /// # Errors
1305    ///
1306    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1307    pub async fn http_get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
1308        self.inner
1309            .http_get_margin(currency)
1310            .await
1311            .map_err(|e| anyhow::anyhow!(e))
1312    }
1313
1314    /// Request account state for the given account.
1315    ///
1316    /// # Errors
1317    ///
1318    /// Returns an error if the HTTP request fails or no account state is returned.
1319    pub async fn request_account_state(
1320        &self,
1321        account_id: AccountId,
1322    ) -> anyhow::Result<AccountState> {
1323        // Get margin data for XBt (Bitcoin) by default
1324        let margin = self
1325            .inner
1326            .http_get_margin("XBt")
1327            .await
1328            .map_err(|e| anyhow::anyhow!(e))?;
1329
1330        let ts_init = nautilus_core::nanos::UnixNanos::from(
1331            chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64,
1332        );
1333
1334        // Convert HTTP Margin to WebSocket MarginMsg for parsing
1335        let margin_msg = BitmexMarginMsg {
1336            account: margin.account,
1337            currency: margin.currency,
1338            risk_limit: margin.risk_limit,
1339            amount: margin.amount,
1340            prev_realised_pnl: margin.prev_realised_pnl,
1341            gross_comm: margin.gross_comm,
1342            gross_open_cost: margin.gross_open_cost,
1343            gross_open_premium: margin.gross_open_premium,
1344            gross_exec_cost: margin.gross_exec_cost,
1345            gross_mark_value: margin.gross_mark_value,
1346            risk_value: margin.risk_value,
1347            init_margin: margin.init_margin,
1348            maint_margin: margin.maint_margin,
1349            target_excess_margin: margin.target_excess_margin,
1350            realised_pnl: margin.realised_pnl,
1351            unrealised_pnl: margin.unrealised_pnl,
1352            wallet_balance: margin.wallet_balance,
1353            margin_balance: margin.margin_balance,
1354            margin_leverage: margin.margin_leverage,
1355            margin_used_pcnt: margin.margin_used_pcnt,
1356            excess_margin: margin.excess_margin,
1357            available_margin: margin.available_margin,
1358            withdrawable_margin: margin.withdrawable_margin,
1359            maker_fee_discount: None, // Not in HTTP response
1360            taker_fee_discount: None, // Not in HTTP response
1361            timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
1362            foreign_margin_balance: None,
1363            foreign_requirement: None,
1364        };
1365
1366        parse_account_state(&margin_msg, account_id, ts_init)
1367    }
1368
1369    /// Submit a new order.
1370    ///
1371    /// # Errors
1372    ///
1373    /// Returns an error if credentials are missing, the request fails, order validation fails,
1374    /// the order is rejected, or the API returns an error.
1375    #[allow(clippy::too_many_arguments)]
1376    pub async fn submit_order(
1377        &self,
1378        instrument_id: InstrumentId,
1379        client_order_id: ClientOrderId,
1380        order_side: OrderSide,
1381        order_type: OrderType,
1382        quantity: Quantity,
1383        time_in_force: TimeInForce,
1384        price: Option<Price>,
1385        trigger_price: Option<Price>,
1386        trigger_type: Option<TriggerType>,
1387        display_qty: Option<Quantity>,
1388        post_only: bool,
1389        reduce_only: bool,
1390        order_list_id: Option<OrderListId>,
1391        contingency_type: Option<ContingencyType>,
1392    ) -> anyhow::Result<OrderStatusReport> {
1393        use crate::common::enums::{
1394            BitmexExecInstruction, BitmexOrderType, BitmexSide, BitmexTimeInForce,
1395        };
1396
1397        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1398
1399        let mut params = super::query::PostOrderParamsBuilder::default();
1400        params.text(NAUTILUS_TRADER);
1401        params.symbol(instrument_id.symbol.as_str());
1402        params.cl_ord_id(client_order_id.as_str());
1403
1404        let side = BitmexSide::try_from_order_side(order_side)?;
1405        params.side(side);
1406
1407        let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
1408        params.ord_type(ord_type);
1409
1410        params.order_qty(quantity_to_u32(&quantity, &instrument));
1411
1412        let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
1413        params.time_in_force(tif);
1414
1415        if let Some(price) = price {
1416            params.price(price.as_f64());
1417        }
1418
1419        if let Some(trigger_price) = trigger_price {
1420            params.stop_px(trigger_price.as_f64());
1421        }
1422
1423        if let Some(display_qty) = display_qty {
1424            params.display_qty(quantity_to_u32(&display_qty, &instrument));
1425        }
1426
1427        if let Some(order_list_id) = order_list_id {
1428            params.cl_ord_link_id(order_list_id.as_str());
1429        }
1430
1431        let mut exec_inst = Vec::new();
1432
1433        if post_only {
1434            exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
1435        }
1436
1437        if reduce_only {
1438            exec_inst.push(BitmexExecInstruction::ReduceOnly);
1439        }
1440
1441        if trigger_price.is_some()
1442            && let Some(trigger_type) = trigger_type
1443        {
1444            match trigger_type {
1445                TriggerType::LastPrice => exec_inst.push(BitmexExecInstruction::LastPrice),
1446                TriggerType::MarkPrice => exec_inst.push(BitmexExecInstruction::MarkPrice),
1447                TriggerType::IndexPrice => exec_inst.push(BitmexExecInstruction::IndexPrice),
1448                _ => {} // Use BitMEX default (LastPrice) for other trigger types
1449            }
1450        }
1451
1452        if !exec_inst.is_empty() {
1453            params.exec_inst(exec_inst);
1454        }
1455
1456        if let Some(contingency_type) = contingency_type {
1457            let bitmex_contingency = BitmexContingencyType::try_from(contingency_type)?;
1458            params.contingency_type(bitmex_contingency);
1459        }
1460
1461        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1462
1463        let response = self.inner.http_place_order(params).await?;
1464
1465        let order: BitmexOrder = serde_json::from_value(response)?;
1466
1467        if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1468            let reason = order
1469                .ord_rej_reason
1470                .map(|r| r.to_string())
1471                .unwrap_or_else(|| "No reason provided".to_string());
1472            anyhow::bail!("Order rejected: {reason}");
1473        }
1474
1475        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1476        let ts_init = self.generate_ts_init();
1477
1478        parse_order_status_report(&order, &instrument, ts_init)
1479    }
1480
1481    /// Cancel an order.
1482    ///
1483    /// # Errors
1484    ///
1485    /// Returns an error if:
1486    /// - Credentials are missing.
1487    /// - The request fails.
1488    /// - The order doesn't exist.
1489    /// - The API returns an error.
1490    pub async fn cancel_order(
1491        &self,
1492        instrument_id: InstrumentId,
1493        client_order_id: Option<ClientOrderId>,
1494        venue_order_id: Option<VenueOrderId>,
1495    ) -> anyhow::Result<OrderStatusReport> {
1496        let mut params = super::query::DeleteOrderParamsBuilder::default();
1497        params.text(NAUTILUS_TRADER);
1498
1499        if let Some(venue_order_id) = venue_order_id {
1500            params.order_id(vec![venue_order_id.as_str().to_string()]);
1501        } else if let Some(client_order_id) = client_order_id {
1502            params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
1503        } else {
1504            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1505        }
1506
1507        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1508
1509        let response = self.inner.http_cancel_orders(params).await?;
1510
1511        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1512        let order = orders
1513            .into_iter()
1514            .next()
1515            .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1516
1517        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1518        let ts_init = self.generate_ts_init();
1519
1520        parse_order_status_report(&order, &instrument, ts_init)
1521    }
1522
1523    /// Cancel multiple orders.
1524    ///
1525    /// # Errors
1526    ///
1527    /// Returns an error if:
1528    /// - Credentials are missing.
1529    /// - The request fails.
1530    /// - The order doesn't exist.
1531    /// - The API returns an error.
1532    pub async fn cancel_orders(
1533        &self,
1534        instrument_id: InstrumentId,
1535        client_order_ids: Option<Vec<ClientOrderId>>,
1536        venue_order_ids: Option<Vec<VenueOrderId>>,
1537    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1538        let mut params = super::query::DeleteOrderParamsBuilder::default();
1539        params.text(NAUTILUS_TRADER);
1540
1541        // BitMEX API requires either client order IDs or venue order IDs, not both
1542        // Prioritize venue order IDs if both are provided
1543        if let Some(venue_order_ids) = venue_order_ids {
1544            if venue_order_ids.is_empty() {
1545                anyhow::bail!("venue_order_ids cannot be empty");
1546            }
1547            params.order_id(
1548                venue_order_ids
1549                    .iter()
1550                    .map(|id| id.to_string())
1551                    .collect::<Vec<_>>(),
1552            );
1553        } else if let Some(client_order_ids) = client_order_ids {
1554            if client_order_ids.is_empty() {
1555                anyhow::bail!("client_order_ids cannot be empty");
1556            }
1557            params.cl_ord_id(
1558                client_order_ids
1559                    .iter()
1560                    .map(|id| id.to_string())
1561                    .collect::<Vec<_>>(),
1562            );
1563        } else {
1564            anyhow::bail!("Either client_order_ids or venue_order_ids must be provided");
1565        }
1566
1567        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1568
1569        let response = self.inner.http_cancel_orders(params).await?;
1570
1571        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1572
1573        let ts_init = self.generate_ts_init();
1574        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1575
1576        let mut reports = Vec::new();
1577
1578        for order in orders {
1579            reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1580        }
1581
1582        Self::populate_linked_order_ids(&mut reports);
1583
1584        Ok(reports)
1585    }
1586
1587    /// Cancel all orders for an instrument and optionally an order side.
1588    ///
1589    /// # Errors
1590    ///
1591    /// Returns an error if:
1592    /// - Credentials are missing.
1593    /// - The request fails.
1594    /// - The order doesn't exist.
1595    /// - The API returns an error.
1596    pub async fn cancel_all_orders(
1597        &self,
1598        instrument_id: InstrumentId,
1599        order_side: Option<OrderSide>,
1600    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1601        let mut params = DeleteAllOrdersParamsBuilder::default();
1602        params.text(NAUTILUS_TRADER);
1603        params.symbol(instrument_id.symbol.as_str());
1604
1605        if let Some(side) = order_side {
1606            let side = BitmexSide::try_from_order_side(side)?;
1607            params.filter(serde_json::json!({
1608                "side": side
1609            }));
1610        }
1611
1612        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1613
1614        let response = self.inner.http_cancel_all_orders(params).await?;
1615
1616        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1617
1618        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1619        let ts_init = self.generate_ts_init();
1620
1621        let mut reports = Vec::new();
1622
1623        for order in orders {
1624            reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1625        }
1626
1627        Self::populate_linked_order_ids(&mut reports);
1628
1629        Ok(reports)
1630    }
1631
1632    /// Modify an existing order.
1633    ///
1634    /// # Errors
1635    ///
1636    /// Returns an error if:
1637    /// - Credentials are missing.
1638    /// - The request fails.
1639    /// - The order doesn't exist.
1640    /// - The order is already closed.
1641    /// - The API returns an error.
1642    pub async fn modify_order(
1643        &self,
1644        instrument_id: InstrumentId,
1645        client_order_id: Option<ClientOrderId>,
1646        venue_order_id: Option<VenueOrderId>,
1647        quantity: Option<Quantity>,
1648        price: Option<Price>,
1649        trigger_price: Option<Price>,
1650    ) -> anyhow::Result<OrderStatusReport> {
1651        let mut params = PutOrderParamsBuilder::default();
1652        params.text(NAUTILUS_TRADER);
1653
1654        // Set order ID - prefer venue_order_id if available
1655        if let Some(venue_order_id) = venue_order_id {
1656            params.order_id(venue_order_id.as_str());
1657        } else if let Some(client_order_id) = client_order_id {
1658            params.orig_cl_ord_id(client_order_id.as_str());
1659        } else {
1660            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1661        }
1662
1663        if let Some(quantity) = quantity {
1664            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1665            params.order_qty(quantity_to_u32(&quantity, &instrument));
1666        }
1667
1668        if let Some(price) = price {
1669            params.price(price.as_f64());
1670        }
1671
1672        if let Some(trigger_price) = trigger_price {
1673            params.stop_px(trigger_price.as_f64());
1674        }
1675
1676        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1677
1678        let response = self.inner.http_amend_order(params).await?;
1679
1680        let order: BitmexOrder = serde_json::from_value(response)?;
1681
1682        if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1683            let reason = order
1684                .ord_rej_reason
1685                .map(|r| r.to_string())
1686                .unwrap_or_else(|| "No reason provided".to_string());
1687            anyhow::bail!("Order modification rejected: {reason}");
1688        }
1689
1690        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1691        let ts_init = self.generate_ts_init();
1692
1693        parse_order_status_report(&order, &instrument, ts_init)
1694    }
1695
1696    /// Query a single order by client order ID or venue order ID.
1697    ///
1698    /// # Errors
1699    ///
1700    /// Returns an error if:
1701    /// - Credentials are missing.
1702    /// - The request fails.
1703    /// - The API returns an error.
1704    pub async fn query_order(
1705        &self,
1706        instrument_id: InstrumentId,
1707        client_order_id: Option<ClientOrderId>,
1708        venue_order_id: Option<VenueOrderId>,
1709    ) -> anyhow::Result<Option<OrderStatusReport>> {
1710        let mut params = GetOrderParamsBuilder::default();
1711
1712        let filter_json = if let Some(client_order_id) = client_order_id {
1713            serde_json::json!({
1714                "clOrdID": client_order_id.to_string()
1715            })
1716        } else if let Some(venue_order_id) = venue_order_id {
1717            serde_json::json!({
1718                "orderID": venue_order_id.to_string()
1719            })
1720        } else {
1721            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1722        };
1723
1724        params.filter(filter_json);
1725        params.count(1); // Only need one order
1726
1727        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1728
1729        let response = self.inner.http_get_orders(params).await?;
1730
1731        if response.is_empty() {
1732            return Ok(None);
1733        }
1734
1735        let order = &response[0];
1736
1737        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1738        let ts_init = self.generate_ts_init();
1739
1740        let report = parse_order_status_report(order, &instrument, ts_init)?;
1741
1742        Ok(Some(report))
1743    }
1744
1745    /// Request a single order status report.
1746    ///
1747    /// # Errors
1748    ///
1749    /// Returns an error if:
1750    /// - Credentials are missing.
1751    /// - The request fails.
1752    /// - The API returns an error.
1753    pub async fn request_order_status_report(
1754        &self,
1755        instrument_id: InstrumentId,
1756        client_order_id: Option<ClientOrderId>,
1757        venue_order_id: Option<VenueOrderId>,
1758    ) -> anyhow::Result<OrderStatusReport> {
1759        let mut params = GetOrderParamsBuilder::default();
1760        params.symbol(instrument_id.symbol.as_str());
1761
1762        if let Some(venue_order_id) = venue_order_id {
1763            params.filter(serde_json::json!({
1764                "orderID": venue_order_id.as_str()
1765            }));
1766        } else if let Some(client_order_id) = client_order_id {
1767            params.filter(serde_json::json!({
1768                "clOrdID": client_order_id.as_str()
1769            }));
1770        }
1771
1772        params.count(1i32);
1773        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1774
1775        let response = self.inner.http_get_orders(params).await?;
1776
1777        let order = response
1778            .into_iter()
1779            .next()
1780            .ok_or_else(|| anyhow::anyhow!("Order not found"))?;
1781
1782        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1783        let ts_init = self.generate_ts_init();
1784
1785        parse_order_status_report(&order, &instrument, ts_init)
1786    }
1787
1788    /// Request multiple order status reports.
1789    ///
1790    /// # Errors
1791    ///
1792    /// Returns an error if:
1793    /// - Credentials are missing.
1794    /// - The request fails.
1795    /// - The API returns an error.
1796    pub async fn request_order_status_reports(
1797        &self,
1798        instrument_id: Option<InstrumentId>,
1799        open_only: bool,
1800        limit: Option<u32>,
1801    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1802        let mut params = GetOrderParamsBuilder::default();
1803
1804        if let Some(instrument_id) = &instrument_id {
1805            params.symbol(instrument_id.symbol.as_str());
1806        }
1807
1808        if open_only {
1809            params.filter(serde_json::json!({
1810                "open": true
1811            }));
1812        }
1813
1814        if let Some(limit) = limit {
1815            params.count(limit as i32);
1816        } else {
1817            params.count(500); // Default count to avoid empty query
1818        }
1819
1820        params.reverse(true); // Get newest orders first
1821
1822        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1823
1824        let response = self.inner.http_get_orders(params).await?;
1825
1826        let ts_init = self.generate_ts_init();
1827
1828        let mut reports = Vec::new();
1829
1830        for order in response {
1831            // Skip orders without symbol (can happen with query responses)
1832            let Some(symbol) = order.symbol else {
1833                tracing::warn!("Order response missing symbol, skipping");
1834                continue;
1835            };
1836
1837            let instrument = self.instrument_from_cache(symbol)?;
1838
1839            match parse_order_status_report(&order, &instrument, ts_init) {
1840                Ok(report) => reports.push(report),
1841                Err(e) => tracing::error!("Failed to parse order status report: {e}"),
1842            }
1843        }
1844
1845        Self::populate_linked_order_ids(&mut reports);
1846
1847        Ok(reports)
1848    }
1849
1850    /// Request trades for the given instrument.
1851    ///
1852    /// # Errors
1853    ///
1854    /// Returns an error if the HTTP request fails or parsing fails.
1855    pub async fn request_trades(
1856        &self,
1857        instrument_id: InstrumentId,
1858        start: Option<DateTime<Utc>>,
1859        end: Option<DateTime<Utc>>,
1860        limit: Option<u32>,
1861    ) -> anyhow::Result<Vec<TradeTick>> {
1862        let mut params = GetTradeParamsBuilder::default();
1863        params.symbol(instrument_id.symbol.as_str());
1864
1865        if let Some(start) = start {
1866            params.start_time(start);
1867        }
1868
1869        if let Some(end) = end {
1870            params.end_time(end);
1871        }
1872
1873        if let (Some(start), Some(end)) = (start, end) {
1874            anyhow::ensure!(
1875                start < end,
1876                "Invalid time range: start={start:?} end={end:?}",
1877            );
1878        }
1879
1880        if let Some(limit) = limit {
1881            let clamped_limit = limit.min(1000);
1882            if limit > 1000 {
1883                tracing::warn!(
1884                    limit,
1885                    clamped_limit,
1886                    "BitMEX trade request limit exceeds venue maximum; clamping",
1887                );
1888            }
1889            params.count(i32::try_from(clamped_limit).unwrap_or(1000));
1890        }
1891        params.reverse(false);
1892        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1893
1894        let response = self.inner.http_get_trades(params).await?;
1895
1896        let ts_init = self.generate_ts_init();
1897
1898        let mut parsed_trades = Vec::new();
1899
1900        for trade in response {
1901            if let Some(start) = start
1902                && trade.timestamp < start
1903            {
1904                continue;
1905            }
1906
1907            if let Some(end) = end
1908                && trade.timestamp > end
1909            {
1910                continue;
1911            }
1912
1913            let price_precision = self.get_price_precision(trade.symbol)?;
1914
1915            match parse_trade(trade, price_precision, ts_init) {
1916                Ok(trade) => parsed_trades.push(trade),
1917                Err(e) => tracing::error!("Failed to parse trade: {e}"),
1918            }
1919        }
1920
1921        Ok(parsed_trades)
1922    }
1923
1924    /// Request bars for the given bar type.
1925    ///
1926    /// # Errors
1927    ///
1928    /// Returns an error if the HTTP request fails, parsing fails, or the bar specification is
1929    /// unsupported by BitMEX.
1930    pub async fn request_bars(
1931        &self,
1932        mut bar_type: BarType,
1933        start: Option<DateTime<Utc>>,
1934        end: Option<DateTime<Utc>>,
1935        limit: Option<u32>,
1936        partial: bool,
1937    ) -> anyhow::Result<Vec<Bar>> {
1938        bar_type = bar_type.standard();
1939
1940        anyhow::ensure!(
1941            bar_type.aggregation_source() == AggregationSource::External,
1942            "Only EXTERNAL aggregation bars are supported"
1943        );
1944        anyhow::ensure!(
1945            bar_type.spec().price_type == PriceType::Last,
1946            "Only LAST price type bars are supported"
1947        );
1948        if let (Some(start), Some(end)) = (start, end) {
1949            anyhow::ensure!(
1950                start < end,
1951                "Invalid time range: start={start:?} end={end:?}"
1952            );
1953        }
1954
1955        let spec = bar_type.spec();
1956        let bin_size = match (spec.aggregation, spec.step.get()) {
1957            (BarAggregation::Minute, 1) => "1m",
1958            (BarAggregation::Minute, 5) => "5m",
1959            (BarAggregation::Hour, 1) => "1h",
1960            (BarAggregation::Day, 1) => "1d",
1961            _ => anyhow::bail!(
1962                "BitMEX does not support {}-{:?}-{:?} bars",
1963                spec.step.get(),
1964                spec.aggregation,
1965                spec.price_type,
1966            ),
1967        };
1968
1969        let instrument_id = bar_type.instrument_id();
1970        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1971
1972        let mut params = GetTradeBucketedParamsBuilder::default();
1973        params.symbol(instrument_id.symbol.as_str());
1974        params.bin_size(bin_size);
1975        if partial {
1976            params.partial(true);
1977        }
1978        if let Some(start) = start {
1979            params.start_time(start);
1980        }
1981        if let Some(end) = end {
1982            params.end_time(end);
1983        }
1984        if let Some(limit) = limit {
1985            let clamped_limit = limit.min(1000);
1986            if limit > 1000 {
1987                tracing::warn!(
1988                    limit,
1989                    clamped_limit,
1990                    "BitMEX bar request limit exceeds venue maximum; clamping",
1991                );
1992            }
1993            params.count(i32::try_from(clamped_limit).unwrap_or(1000));
1994        }
1995        params.reverse(false);
1996        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1997
1998        let response = self.inner.http_get_trade_bucketed(params).await?;
1999        let ts_init = self.generate_ts_init();
2000        let mut bars = Vec::new();
2001
2002        for bin in response {
2003            if let Some(start) = start
2004                && bin.timestamp < start
2005            {
2006                continue;
2007            }
2008            if let Some(end) = end
2009                && bin.timestamp > end
2010            {
2011                continue;
2012            }
2013            if bin.symbol != instrument_id.symbol.inner() {
2014                tracing::warn!(
2015                    symbol = %bin.symbol,
2016                    expected = %instrument_id.symbol,
2017                    "Skipping trade bin for unexpected symbol",
2018                );
2019                continue;
2020            }
2021
2022            match parse_trade_bin(bin, &instrument, &bar_type, ts_init) {
2023                Ok(bar) => bars.push(bar),
2024                Err(e) => tracing::warn!("Failed to parse trade bin: {e}"),
2025            }
2026        }
2027
2028        Ok(bars)
2029    }
2030
2031    /// Request fill reports for the given instrument.
2032    ///
2033    /// # Errors
2034    ///
2035    /// Returns an error if the HTTP request fails or parsing fails.
2036    pub async fn request_fill_reports(
2037        &self,
2038        instrument_id: Option<InstrumentId>,
2039        limit: Option<u32>,
2040    ) -> anyhow::Result<Vec<FillReport>> {
2041        let mut params = GetExecutionParamsBuilder::default();
2042        if let Some(instrument_id) = instrument_id {
2043            params.symbol(instrument_id.symbol.as_str());
2044        }
2045        if let Some(limit) = limit {
2046            params.count(limit as i32);
2047        } else {
2048            params.count(500); // Default count
2049        }
2050        params.reverse(true); // Get newest fills first
2051
2052        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2053
2054        let response = self.inner.http_get_executions(params).await?;
2055
2056        let ts_init = self.generate_ts_init();
2057
2058        let mut reports = Vec::new();
2059
2060        for exec in response {
2061            // Skip executions without symbol (e.g., CancelReject)
2062            let Some(symbol) = exec.symbol else {
2063                tracing::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
2064                continue;
2065            };
2066            let symbol_str = symbol.to_string();
2067
2068            let instrument = match self.instrument_from_cache(symbol) {
2069                Ok(instrument) => instrument,
2070                Err(err) => {
2071                    tracing::error!(symbol = %symbol_str, "Instrument not found in cache for execution parsing: {err}");
2072                    continue;
2073                }
2074            };
2075
2076            match parse_fill_report(exec, &instrument, ts_init) {
2077                Ok(report) => reports.push(report),
2078                Err(e) => {
2079                    // Log at debug level for expected skip cases
2080                    let error_msg = e.to_string();
2081                    if error_msg.starts_with("Skipping non-trade execution")
2082                        || error_msg.starts_with("Skipping execution without order_id")
2083                    {
2084                        tracing::debug!("{e}");
2085                    } else {
2086                        tracing::error!("Failed to parse fill report: {e}");
2087                    }
2088                }
2089            }
2090        }
2091
2092        Ok(reports)
2093    }
2094
2095    /// Request position reports.
2096    ///
2097    /// # Errors
2098    ///
2099    /// Returns an error if the HTTP request fails or parsing fails.
2100    pub async fn request_position_status_reports(
2101        &self,
2102    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2103        let params = GetPositionParamsBuilder::default()
2104            .count(500) // Default count
2105            .build()
2106            .map_err(|e| anyhow::anyhow!(e))?;
2107
2108        let response = self.inner.http_get_positions(params).await?;
2109
2110        let ts_init = self.generate_ts_init();
2111
2112        let mut reports = Vec::new();
2113
2114        for pos in response {
2115            let symbol = Ustr::from(pos.symbol.as_str());
2116            let instrument = match self.instrument_from_cache(symbol) {
2117                Ok(instrument) => instrument,
2118                Err(err) => {
2119                    tracing::error!(
2120                        symbol = pos.symbol.as_str(),
2121                        "Instrument not found in cache for position parsing: {err}"
2122                    );
2123                    continue;
2124                }
2125            };
2126
2127            match parse_position_report(pos, &instrument, ts_init) {
2128                Ok(report) => reports.push(report),
2129                Err(e) => tracing::error!("Failed to parse position report: {e}"),
2130            }
2131        }
2132
2133        Ok(reports)
2134    }
2135
2136    /// Update position leverage.
2137    ///
2138    /// # Errors
2139    ///
2140    /// - Credentials are missing.
2141    /// - The request fails.
2142    /// - The API returns an error.
2143    pub async fn update_position_leverage(
2144        &self,
2145        symbol: &str,
2146        leverage: f64,
2147    ) -> anyhow::Result<PositionStatusReport> {
2148        let params = PostPositionLeverageParams {
2149            symbol: symbol.to_string(),
2150            leverage,
2151            target_account_id: None,
2152        };
2153
2154        let response = self.inner.http_update_position_leverage(params).await?;
2155
2156        let instrument = self.instrument_from_cache(Ustr::from(symbol))?;
2157        let ts_init = self.generate_ts_init();
2158
2159        parse_position_report(response, &instrument, ts_init)
2160    }
2161}
2162
2163////////////////////////////////////////////////////////////////////////////////
2164// Tests
2165////////////////////////////////////////////////////////////////////////////////
2166
2167#[cfg(test)]
2168mod tests {
2169    use nautilus_core::UUID4;
2170    use nautilus_model::enums::OrderStatus;
2171    use rstest::rstest;
2172    use serde_json::json;
2173
2174    use super::*;
2175
2176    fn build_report(
2177        client_order_id: &str,
2178        venue_order_id: &str,
2179        contingency_type: ContingencyType,
2180        order_list_id: Option<&str>,
2181    ) -> OrderStatusReport {
2182        let mut report = OrderStatusReport::new(
2183            AccountId::from("BITMEX-1"),
2184            InstrumentId::from("XBTUSD.BITMEX"),
2185            Some(ClientOrderId::from(client_order_id)),
2186            VenueOrderId::from(venue_order_id),
2187            OrderSide::Buy,
2188            OrderType::Limit,
2189            TimeInForce::Gtc,
2190            OrderStatus::Accepted,
2191            Quantity::new(100.0, 0),
2192            Quantity::default(),
2193            UnixNanos::from(1_u64),
2194            UnixNanos::from(1_u64),
2195            UnixNanos::from(1_u64),
2196            Some(UUID4::new()),
2197        );
2198
2199        if let Some(id) = order_list_id {
2200            report = report.with_order_list_id(OrderListId::from(id));
2201        }
2202
2203        report.with_contingency_type(contingency_type)
2204    }
2205
2206    #[rstest]
2207    fn test_sign_request_generates_correct_headers() {
2208        let client = BitmexHttpInnerClient::with_credentials(
2209            "test_api_key".to_string(),
2210            "test_api_secret".to_string(),
2211            "http://localhost:8080".to_string(),
2212            Some(60),
2213            None, // max_retries
2214            None, // retry_delay_ms
2215            None, // retry_delay_max_ms
2216            None, // recv_window_ms
2217            None, // max_requests_per_second
2218            None, // max_requests_per_minute
2219        )
2220        .expect("Failed to create test client");
2221
2222        let headers = client
2223            .sign_request(&Method::GET, "/api/v1/order", None)
2224            .unwrap();
2225
2226        assert!(headers.contains_key("api-key"));
2227        assert!(headers.contains_key("api-signature"));
2228        assert!(headers.contains_key("api-expires"));
2229        assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
2230    }
2231
2232    #[rstest]
2233    fn test_sign_request_with_body() {
2234        let client = BitmexHttpInnerClient::with_credentials(
2235            "test_api_key".to_string(),
2236            "test_api_secret".to_string(),
2237            "http://localhost:8080".to_string(),
2238            Some(60),
2239            None, // max_retries
2240            None, // retry_delay_ms
2241            None, // retry_delay_max_ms
2242            None, // recv_window_ms
2243            None, // max_requests_per_second
2244            None, // max_requests_per_minute
2245        )
2246        .expect("Failed to create test client");
2247
2248        let body = json!({"symbol": "XBTUSD", "orderQty": 100});
2249        let body_bytes = serde_json::to_vec(&body).unwrap();
2250
2251        let headers_without_body = client
2252            .sign_request(&Method::POST, "/api/v1/order", None)
2253            .unwrap();
2254        let headers_with_body = client
2255            .sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
2256            .unwrap();
2257
2258        // Signatures should be different when body is included
2259        assert_ne!(
2260            headers_without_body.get("api-signature").unwrap(),
2261            headers_with_body.get("api-signature").unwrap()
2262        );
2263    }
2264
2265    #[rstest]
2266    fn test_sign_request_uses_custom_recv_window() {
2267        let client_default = BitmexHttpInnerClient::with_credentials(
2268            "test_api_key".to_string(),
2269            "test_api_secret".to_string(),
2270            "http://localhost:8080".to_string(),
2271            Some(60),
2272            None,
2273            None,
2274            None,
2275            None, // Use default recv_window_ms (10000ms = 10s)
2276            None, // max_requests_per_second
2277            None, // max_requests_per_minute
2278        )
2279        .expect("Failed to create test client");
2280
2281        let client_custom = BitmexHttpInnerClient::with_credentials(
2282            "test_api_key".to_string(),
2283            "test_api_secret".to_string(),
2284            "http://localhost:8080".to_string(),
2285            Some(60),
2286            None,
2287            None,
2288            None,
2289            Some(30_000), // 30 seconds
2290            None,         // max_requests_per_second
2291            None,         // max_requests_per_minute
2292        )
2293        .expect("Failed to create test client");
2294
2295        let headers_default = client_default
2296            .sign_request(&Method::GET, "/api/v1/order", None)
2297            .unwrap();
2298        let headers_custom = client_custom
2299            .sign_request(&Method::GET, "/api/v1/order", None)
2300            .unwrap();
2301
2302        // Parse expires timestamps
2303        let expires_default: i64 = headers_default.get("api-expires").unwrap().parse().unwrap();
2304        let expires_custom: i64 = headers_custom.get("api-expires").unwrap().parse().unwrap();
2305
2306        // Verify both are valid future timestamps
2307        let now = Utc::now().timestamp();
2308        assert!(expires_default > now);
2309        assert!(expires_custom > now);
2310
2311        // Custom window should be greater than default
2312        assert!(expires_custom > expires_default);
2313
2314        // The difference should be approximately 20 seconds (30s - 10s)
2315        // Allow wider tolerance for delays between calls on slow CI runners
2316        let diff = expires_custom - expires_default;
2317        assert!((18..=25).contains(&diff));
2318    }
2319
2320    #[rstest]
2321    fn test_populate_linked_order_ids_from_order_list() {
2322        let base = "O-20250922-002219-001-000";
2323        let entry = format!("{base}-1");
2324        let stop = format!("{base}-2");
2325        let take = format!("{base}-3");
2326
2327        let mut reports = vec![
2328            build_report(&entry, "V-1", ContingencyType::Oto, Some("OL-1")),
2329            build_report(&stop, "V-2", ContingencyType::Ouo, Some("OL-1")),
2330            build_report(&take, "V-3", ContingencyType::Ouo, Some("OL-1")),
2331        ];
2332
2333        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2334
2335        assert_eq!(
2336            reports[0].linked_order_ids,
2337            Some(vec![
2338                ClientOrderId::from(stop.as_str()),
2339                ClientOrderId::from(take.as_str()),
2340            ]),
2341        );
2342        assert_eq!(
2343            reports[1].linked_order_ids,
2344            Some(vec![
2345                ClientOrderId::from(entry.as_str()),
2346                ClientOrderId::from(take.as_str()),
2347            ]),
2348        );
2349        assert_eq!(
2350            reports[2].linked_order_ids,
2351            Some(vec![
2352                ClientOrderId::from(entry.as_str()),
2353                ClientOrderId::from(stop.as_str()),
2354            ]),
2355        );
2356    }
2357
2358    #[rstest]
2359    fn test_populate_linked_order_ids_from_id_prefix() {
2360        let base = "O-20250922-002220-001-000";
2361        let entry = format!("{base}-1");
2362        let stop = format!("{base}-2");
2363        let take = format!("{base}-3");
2364
2365        let mut reports = vec![
2366            build_report(&entry, "V-1", ContingencyType::Oto, None),
2367            build_report(&stop, "V-2", ContingencyType::Ouo, None),
2368            build_report(&take, "V-3", ContingencyType::Ouo, None),
2369        ];
2370
2371        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2372
2373        assert_eq!(
2374            reports[0].linked_order_ids,
2375            Some(vec![
2376                ClientOrderId::from(stop.as_str()),
2377                ClientOrderId::from(take.as_str()),
2378            ]),
2379        );
2380        assert_eq!(
2381            reports[1].linked_order_ids,
2382            Some(vec![
2383                ClientOrderId::from(entry.as_str()),
2384                ClientOrderId::from(take.as_str()),
2385            ]),
2386        );
2387        assert_eq!(
2388            reports[2].linked_order_ids,
2389            Some(vec![
2390                ClientOrderId::from(entry.as_str()),
2391                ClientOrderId::from(stop.as_str()),
2392            ]),
2393        );
2394    }
2395
2396    #[rstest]
2397    fn test_populate_linked_order_ids_respects_non_contingent_orders() {
2398        let base = "O-20250922-002221-001-000";
2399        let entry = format!("{base}-1");
2400        let passive = format!("{base}-2");
2401
2402        let mut reports = vec![
2403            build_report(&entry, "V-1", ContingencyType::NoContingency, None),
2404            build_report(&passive, "V-2", ContingencyType::Ouo, None),
2405        ];
2406
2407        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2408
2409        // Non-contingent orders should not be linked
2410        assert!(reports[0].linked_order_ids.is_none());
2411
2412        // A contingent order with no other contingent peers should have contingency reset
2413        assert!(reports[1].linked_order_ids.is_none());
2414        assert_eq!(reports[1].contingency_type, ContingencyType::NoContingency);
2415    }
2416}