nautilus_bitmex/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the HTTP client integration for the [BitMEX](https://bitmex.com) REST API.
17//!
18//! This module defines and implements a [`BitmexHttpClient`] for
19//! sending requests to various BitMEX endpoints. It handles request signing
20//! (when credentials are provided), constructs valid HTTP requests
21//! using the [`HttpClient`], and parses the responses back into structured data or a [`BitmexHttpError`].
22//!
23//! BitMEX API reference <https://www.bitmex.com/api/explorer/#/default>.
24
25use std::{
26    collections::HashMap,
27    num::NonZeroU32,
28    sync::{
29        Arc,
30        atomic::{AtomicBool, Ordering},
31    },
32};
33
34use chrono::{DateTime, Utc};
35use dashmap::DashMap;
36use nautilus_core::{
37    UnixNanos,
38    consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
39    env::get_env_var,
40    time::get_atomic_clock_realtime,
41};
42use nautilus_model::{
43    data::{Bar, BarType, TradeTick},
44    enums::{
45        AggregationSource, BarAggregation, ContingencyType, OrderSide, OrderType, PriceType,
46        TimeInForce, TriggerType,
47    },
48    events::AccountState,
49    identifiers::{AccountId, ClientOrderId, InstrumentId, OrderListId, VenueOrderId},
50    instruments::{Instrument as InstrumentTrait, InstrumentAny},
51    reports::{FillReport, OrderStatusReport, PositionStatusReport},
52    types::{Price, Quantity},
53};
54use nautilus_network::{
55    http::HttpClient,
56    ratelimiter::quota::Quota,
57    retry::{RetryConfig, RetryManager},
58};
59use reqwest::{Method, StatusCode, header::USER_AGENT};
60use serde::{Deserialize, Serialize, de::DeserializeOwned};
61use serde_json::Value;
62use tokio_util::sync::CancellationToken;
63use ustr::Ustr;
64
65use super::{
66    error::{BitmexErrorResponse, BitmexHttpError},
67    models::{
68        BitmexApiInfo, BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder,
69        BitmexPosition, BitmexTrade, BitmexTradeBin, BitmexWallet,
70    },
71    query::{
72        DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
73        GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeBucketedParams,
74        GetTradeBucketedParamsBuilder, GetTradeParams, GetTradeParamsBuilder, PostOrderParams,
75        PostPositionLeverageParams, PutOrderParams,
76    },
77};
78use crate::{
79    common::{
80        consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL},
81        credential::Credential,
82        enums::{BitmexContingencyType, BitmexOrderStatus, BitmexSide},
83        parse::{parse_account_state, quantity_to_u32},
84    },
85    http::{
86        parse::{
87            parse_fill_report, parse_instrument_any, parse_order_status_report,
88            parse_position_report, parse_trade, parse_trade_bin,
89        },
90        query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
91    },
92    websocket::messages::BitmexMarginMsg,
93};
94
95/// Default BitMEX REST API rate limits.
96///
97/// BitMEX implements a dual-layer rate limiting system:
98/// - Primary limit: 120 requests per minute for authenticated users (30 for unauthenticated).
99/// - Secondary limit: 10 requests per second burst limit for specific endpoints.
100const BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 10;
101const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED: u32 = 120;
102const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED: u32 = 30;
103
104const BITMEX_GLOBAL_RATE_KEY: &str = "bitmex:global";
105const BITMEX_MINUTE_RATE_KEY: &str = "bitmex:minute";
106
107/// Represents a BitMEX HTTP response.
108#[derive(Debug, Serialize, Deserialize)]
109pub struct BitmexResponse<T> {
110    /// The typed data returned by the BitMEX endpoint.
111    pub data: Vec<T>,
112}
113
114/// Provides a lower-level HTTP client for connecting to the [BitMEX](https://bitmex.com) REST API.
115///
116/// This client wraps the underlying [`HttpClient`] to handle functionality
117/// specific to BitMEX, such as request signing (for authenticated endpoints),
118/// forming request URLs, and deserializing responses into specific data models.
119///
120/// # Connection Management
121///
122/// The client uses HTTP keep-alive for connection pooling with a 90-second idle timeout,
123/// which matches BitMEX's server-side keep-alive timeout. Connections are automatically
124/// reused for subsequent requests to minimize latency.
125///
126/// # Rate Limiting
127///
128/// BitMEX enforces the following rate limits:
129/// - 120 requests per minute for authenticated users (30 for unauthenticated).
130/// - 10 requests per second burst limit for certain endpoints (order management).
131///
132/// The client automatically respects these limits through the configured quota.
133#[derive(Debug, Clone)]
134pub struct BitmexRawHttpClient {
135    base_url: String,
136    client: HttpClient,
137    credential: Option<Credential>,
138    recv_window_ms: u64,
139    retry_manager: RetryManager<BitmexHttpError>,
140    cancellation_token: CancellationToken,
141}
142
143impl Default for BitmexRawHttpClient {
144    fn default() -> Self {
145        Self::new(None, Some(60), None, None, None, None, None, None, None)
146            .expect("Failed to create default BitmexHttpInnerClient")
147    }
148}
149
150impl BitmexRawHttpClient {
151    /// Creates a new [`BitmexRawHttpClient`] using the default BitMEX HTTP URL,
152    /// optionally overridden with a custom base URL.
153    ///
154    /// This version of the client has **no credentials**, so it can only
155    /// call publicly accessible endpoints.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the retry manager cannot be created.
160    #[allow(clippy::too_many_arguments)]
161    pub fn new(
162        base_url: Option<String>,
163        timeout_secs: Option<u64>,
164        max_retries: Option<u32>,
165        retry_delay_ms: Option<u64>,
166        retry_delay_max_ms: Option<u64>,
167        recv_window_ms: Option<u64>,
168        max_requests_per_second: Option<u32>,
169        max_requests_per_minute: Option<u32>,
170        proxy_url: Option<String>,
171    ) -> Result<Self, BitmexHttpError> {
172        let retry_config = RetryConfig {
173            max_retries: max_retries.unwrap_or(3),
174            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
175            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
176            backoff_factor: 2.0,
177            jitter_ms: 1000,
178            operation_timeout_ms: Some(60_000),
179            immediate_first: false,
180            max_elapsed_ms: Some(180_000),
181        };
182
183        let retry_manager = RetryManager::new(retry_config);
184
185        let max_req_per_sec =
186            max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
187        let max_req_per_min =
188            max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED);
189
190        Ok(Self {
191            base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
192            client: HttpClient::new(
193                Self::default_headers(),
194                vec![],
195                Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
196                Some(Self::default_quota(max_req_per_sec)),
197                timeout_secs,
198                proxy_url,
199            )
200            .map_err(|e| {
201                BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
202            })?,
203            credential: None,
204            recv_window_ms: recv_window_ms.unwrap_or(10_000),
205            retry_manager,
206            cancellation_token: CancellationToken::new(),
207        })
208    }
209
210    /// Creates a new [`BitmexRawHttpClient`] configured with credentials
211    /// for authenticated requests, optionally using a custom base URL.
212    ///
213    /// # Errors
214    ///
215    /// Returns an error if the retry manager cannot be created.
216    #[allow(clippy::too_many_arguments)]
217    pub fn with_credentials(
218        api_key: String,
219        api_secret: String,
220        base_url: String,
221        timeout_secs: Option<u64>,
222        max_retries: Option<u32>,
223        retry_delay_ms: Option<u64>,
224        retry_delay_max_ms: Option<u64>,
225        recv_window_ms: Option<u64>,
226        max_requests_per_second: Option<u32>,
227        max_requests_per_minute: Option<u32>,
228        proxy_url: Option<String>,
229    ) -> Result<Self, BitmexHttpError> {
230        let retry_config = RetryConfig {
231            max_retries: max_retries.unwrap_or(3),
232            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
233            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
234            backoff_factor: 2.0,
235            jitter_ms: 1000,
236            operation_timeout_ms: Some(60_000),
237            immediate_first: false,
238            max_elapsed_ms: Some(180_000),
239        };
240
241        let retry_manager = RetryManager::new(retry_config);
242
243        let max_req_per_sec =
244            max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
245        let max_req_per_min =
246            max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED);
247
248        Ok(Self {
249            base_url,
250            client: HttpClient::new(
251                Self::default_headers(),
252                vec![],
253                Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
254                Some(Self::default_quota(max_req_per_sec)),
255                timeout_secs,
256                proxy_url,
257            )
258            .map_err(|e| {
259                BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
260            })?,
261            credential: Some(Credential::new(api_key, api_secret)),
262            recv_window_ms: recv_window_ms.unwrap_or(10_000),
263            retry_manager,
264            cancellation_token: CancellationToken::new(),
265        })
266    }
267
268    fn default_headers() -> HashMap<String, String> {
269        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
270    }
271
272    fn default_quota(max_requests_per_second: u32) -> Quota {
273        Quota::per_second(
274            NonZeroU32::new(max_requests_per_second)
275                .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
276        )
277    }
278
279    fn rate_limiter_quotas(
280        max_requests_per_second: u32,
281        max_requests_per_minute: u32,
282    ) -> Vec<(String, Quota)> {
283        let per_sec_quota = Quota::per_second(
284            NonZeroU32::new(max_requests_per_second)
285                .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
286        );
287        let per_min_quota =
288            Quota::per_minute(NonZeroU32::new(max_requests_per_minute).unwrap_or_else(|| {
289                NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED).unwrap()
290            }));
291
292        vec![
293            (BITMEX_GLOBAL_RATE_KEY.to_string(), per_sec_quota),
294            (BITMEX_MINUTE_RATE_KEY.to_string(), per_min_quota),
295        ]
296    }
297
298    fn rate_limit_keys() -> Vec<Ustr> {
299        vec![
300            Ustr::from(BITMEX_GLOBAL_RATE_KEY),
301            Ustr::from(BITMEX_MINUTE_RATE_KEY),
302        ]
303    }
304
305    /// Cancel all pending HTTP requests.
306    pub fn cancel_all_requests(&self) {
307        self.cancellation_token.cancel();
308    }
309
310    /// Get the cancellation token for this client.
311    pub fn cancellation_token(&self) -> &CancellationToken {
312        &self.cancellation_token
313    }
314
315    fn sign_request(
316        &self,
317        method: &Method,
318        endpoint: &str,
319        body: Option<&[u8]>,
320    ) -> Result<HashMap<String, String>, BitmexHttpError> {
321        let credential = self
322            .credential
323            .as_ref()
324            .ok_or(BitmexHttpError::MissingCredentials)?;
325
326        let expires = Utc::now().timestamp() + (self.recv_window_ms / 1000) as i64;
327        let body_str = body.and_then(|b| std::str::from_utf8(b).ok()).unwrap_or("");
328
329        let full_path = if endpoint.starts_with("/api/v1") {
330            endpoint.to_string()
331        } else {
332            format!("/api/v1{endpoint}")
333        };
334
335        let signature = credential.sign(method.as_str(), &full_path, expires, body_str);
336
337        let mut headers = HashMap::new();
338        headers.insert("api-expires".to_string(), expires.to_string());
339        headers.insert("api-key".to_string(), credential.api_key.to_string());
340        headers.insert("api-signature".to_string(), signature);
341
342        // Add Content-Type header for form-encoded body
343        if body.is_some()
344            && (*method == Method::POST || *method == Method::PUT || *method == Method::DELETE)
345        {
346            headers.insert(
347                "Content-Type".to_string(),
348                "application/x-www-form-urlencoded".to_string(),
349            );
350        }
351
352        Ok(headers)
353    }
354
355    async fn send_request<T: DeserializeOwned, P: Serialize>(
356        &self,
357        method: Method,
358        endpoint: &str,
359        params: Option<&P>,
360        body: Option<Vec<u8>>,
361        authenticate: bool,
362    ) -> Result<T, BitmexHttpError> {
363        let endpoint = endpoint.to_string();
364        let method_clone = method.clone();
365        let body_clone = body.clone();
366
367        // Serialize params before closure to avoid reference lifetime issues
368        // Query params are used with GET and DELETE methods
369        let params_str = if method == Method::GET || method == Method::DELETE {
370            params
371                .map(serde_urlencoded::to_string)
372                .transpose()
373                .map_err(|e| {
374                    BitmexHttpError::JsonError(format!("Failed to serialize params: {e}"))
375                })?
376        } else {
377            None
378        };
379
380        let full_endpoint = if let Some(ref query) = params_str {
381            if query.is_empty() {
382                endpoint.clone()
383            } else {
384                format!("{endpoint}?{query}")
385            }
386        } else {
387            endpoint.clone()
388        };
389
390        let url = format!("{}{}", self.base_url, full_endpoint);
391
392        let operation = || {
393            let url = url.clone();
394            let method = method_clone.clone();
395            let body = body_clone.clone();
396            let full_endpoint = full_endpoint.clone();
397
398            async move {
399                let headers = if authenticate {
400                    Some(self.sign_request(&method, &full_endpoint, body.as_deref())?)
401                } else {
402                    None
403                };
404
405                let rate_keys = Self::rate_limit_keys();
406                let resp = self
407                    .client
408                    .request_with_ustr_keys(method, url, None, headers, body, None, Some(rate_keys))
409                    .await?;
410
411                if resp.status.is_success() {
412                    serde_json::from_slice(&resp.body).map_err(Into::into)
413                } else if let Ok(error_resp) =
414                    serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
415                {
416                    Err(error_resp.into())
417                } else {
418                    Err(BitmexHttpError::UnexpectedStatus {
419                        status: StatusCode::from_u16(resp.status.as_u16())
420                            .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
421                        body: String::from_utf8_lossy(&resp.body).to_string(),
422                    })
423                }
424            }
425        };
426
427        // Retry strategy based on BitMEX error responses and HTTP status codes:
428        //
429        // 1. Network errors: always retry (transient connection issues).
430        // 2. HTTP 5xx/429: server errors and rate limiting should be retried.
431        // 3. BitMEX JSON errors with specific handling:
432        //    - "RateLimitError": explicit rate limit error from BitMEX.
433        //    - "HTTPError": generic error name used by BitMEX for various issues
434        //      Only retry if message contains "rate limit" to avoid retrying
435        //      non-transient errors like authentication failures, validation errors,
436        //      insufficient balance, etc. which also return as "HTTPError".
437        //
438        // Note: BitMEX returns many permanent errors as "HTTPError" (e.g., "Invalid orderQty",
439        // "Account has insufficient Available Balance", "Invalid API Key") which should NOT
440        // be retried. We only retry when the message explicitly mentions rate limiting.
441        //
442        // See tests in tests/http.rs for retry behavior validation.
443        let should_retry = |error: &BitmexHttpError| -> bool {
444            match error {
445                BitmexHttpError::NetworkError(_) => true,
446                BitmexHttpError::UnexpectedStatus { status, .. } => {
447                    status.as_u16() >= 500 || status.as_u16() == 429
448                }
449                BitmexHttpError::BitmexError {
450                    error_name,
451                    message,
452                } => {
453                    error_name == "RateLimitError"
454                        || (error_name == "HTTPError"
455                            && message.to_lowercase().contains("rate limit"))
456                }
457                _ => false,
458            }
459        };
460
461        let create_error = |msg: String| -> BitmexHttpError {
462            if msg == "canceled" {
463                BitmexHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
464            } else {
465                BitmexHttpError::NetworkError(msg)
466            }
467        };
468
469        self.retry_manager
470            .execute_with_retry_with_cancel(
471                endpoint.as_str(),
472                operation,
473                should_retry,
474                create_error,
475                &self.cancellation_token,
476            )
477            .await
478    }
479
480    /// Get all instruments.
481    ///
482    /// # Errors
483    ///
484    /// Returns an error if the request fails, the response cannot be parsed, or the API returns an error.
485    pub async fn get_instruments(
486        &self,
487        active_only: bool,
488    ) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
489        let path = if active_only {
490            "/instrument/active"
491        } else {
492            "/instrument"
493        };
494        self.send_request::<_, ()>(Method::GET, path, None, None, false)
495            .await
496    }
497
498    /// Requests the current server time from BitMEX.
499    ///
500    /// Retrieves the BitMEX API info including the system time in Unix timestamp (milliseconds).
501    /// This is useful for synchronizing local clocks with the exchange server and logging time drift.
502    ///
503    /// # Errors
504    ///
505    /// Returns an error if the HTTP request fails or if the response body
506    /// cannot be parsed into [`BitmexApiInfo`].
507    pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
508        let response: BitmexApiInfo = self
509            .send_request::<_, ()>(Method::GET, "", None, None, false)
510            .await?;
511        Ok(response.timestamp)
512    }
513
514    /// Get the instrument definition for the specified symbol.
515    ///
516    /// BitMEX responds to `/instrument?symbol=...` with an array, even when
517    /// a single symbol is requested. This helper returns the first element of
518    /// that array and yields `Ok(None)` when the venue returns an empty list
519    /// (e.g. unknown symbol).
520    ///
521    /// # Errors
522    ///
523    /// Returns an error if the request fails or the payload cannot be deserialized.
524    pub async fn get_instrument(
525        &self,
526        symbol: &str,
527    ) -> Result<Option<BitmexInstrument>, BitmexHttpError> {
528        let path = &format!("/instrument?symbol={symbol}");
529        let instruments: Vec<BitmexInstrument> = self
530            .send_request::<_, ()>(Method::GET, path, None, None, false)
531            .await?;
532
533        Ok(instruments.into_iter().next())
534    }
535
536    /// Get user wallet information.
537    ///
538    /// # Errors
539    ///
540    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
541    pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
542        let endpoint = "/user/wallet";
543        self.send_request::<_, ()>(Method::GET, endpoint, None, None, true)
544            .await
545    }
546
547    /// Get user margin information.
548    ///
549    /// # Errors
550    ///
551    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
552    pub async fn get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
553        let path = format!("/user/margin?currency={currency}");
554        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
555            .await
556    }
557
558    /// Get historical trades.
559    ///
560    /// # Errors
561    ///
562    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
563    ///
564    /// # Panics
565    ///
566    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
567    pub async fn get_trades(
568        &self,
569        params: GetTradeParams,
570    ) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
571        self.send_request(Method::GET, "/trade", Some(&params), None, true)
572            .await
573    }
574
575    /// Get bucketed (aggregated) trade data.
576    ///
577    /// # Errors
578    ///
579    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
580    pub async fn get_trade_bucketed(
581        &self,
582        params: GetTradeBucketedParams,
583    ) -> Result<Vec<BitmexTradeBin>, BitmexHttpError> {
584        self.send_request(Method::GET, "/trade/bucketed", Some(&params), None, true)
585            .await
586    }
587
588    /// Get user orders.
589    ///
590    /// # Errors
591    ///
592    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
593    ///
594    /// # Panics
595    ///
596    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
597    pub async fn get_orders(
598        &self,
599        params: GetOrderParams,
600    ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
601        self.send_request(Method::GET, "/order", Some(&params), None, true)
602            .await
603    }
604
605    /// Place a new order.
606    ///
607    /// # Errors
608    ///
609    /// Returns an error if credentials are missing, the request fails, order validation fails, or the API returns an error.
610    ///
611    /// # Panics
612    ///
613    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
614    pub async fn place_order(&self, params: PostOrderParams) -> Result<Value, BitmexHttpError> {
615        // BitMEX spec requires form-encoded body for POST /order
616        let body = serde_urlencoded::to_string(&params)
617            .map_err(|e| {
618                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
619            })?
620            .into_bytes();
621        let path = "/order";
622        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
623            .await
624    }
625
626    /// Cancel user orders.
627    ///
628    /// # Errors
629    ///
630    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
631    ///
632    /// # Panics
633    ///
634    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
635    pub async fn cancel_orders(&self, params: DeleteOrderParams) -> Result<Value, BitmexHttpError> {
636        // BitMEX spec requires form-encoded body for DELETE /order
637        let body = serde_urlencoded::to_string(&params)
638            .map_err(|e| {
639                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
640            })?
641            .into_bytes();
642        let path = "/order";
643        self.send_request::<_, ()>(Method::DELETE, path, None, Some(body), true)
644            .await
645    }
646
647    /// Amend an existing order.
648    ///
649    /// # Errors
650    ///
651    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
652    ///
653    /// # Panics
654    ///
655    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
656    pub async fn amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
657        // BitMEX spec requires form-encoded body for PUT /order
658        let body = serde_urlencoded::to_string(&params)
659            .map_err(|e| {
660                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
661            })?
662            .into_bytes();
663        let path = "/order";
664        self.send_request::<_, ()>(Method::PUT, path, None, Some(body), true)
665            .await
666    }
667
668    /// Cancel all orders.
669    ///
670    /// # Errors
671    ///
672    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
673    ///
674    /// # Panics
675    ///
676    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
677    ///
678    /// # References
679    ///
680    /// <https://www.bitmex.com/api/explorer/#!/Order/Order_cancelAll>
681    pub async fn cancel_all_orders(
682        &self,
683        params: DeleteAllOrdersParams,
684    ) -> Result<Value, BitmexHttpError> {
685        self.send_request(Method::DELETE, "/order/all", Some(&params), None, true)
686            .await
687    }
688
689    /// Get user executions.
690    ///
691    /// # Errors
692    ///
693    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
694    ///
695    /// # Panics
696    ///
697    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
698    pub async fn get_executions(
699        &self,
700        params: GetExecutionParams,
701    ) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
702        let query = serde_urlencoded::to_string(&params).map_err(|e| {
703            BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
704        })?;
705        let path = format!("/execution/tradeHistory?{query}");
706        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
707            .await
708    }
709
710    /// Get user positions.
711    ///
712    /// # Errors
713    ///
714    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
715    ///
716    /// # Panics
717    ///
718    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
719    pub async fn get_positions(
720        &self,
721        params: GetPositionParams,
722    ) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
723        self.send_request(Method::GET, "/position", Some(&params), None, true)
724            .await
725    }
726
727    /// Update position leverage.
728    ///
729    /// # Errors
730    ///
731    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
732    ///
733    /// # Panics
734    ///
735    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
736    pub async fn update_position_leverage(
737        &self,
738        params: PostPositionLeverageParams,
739    ) -> Result<BitmexPosition, BitmexHttpError> {
740        // BitMEX spec requires form-encoded body for POST endpoints
741        let body = serde_urlencoded::to_string(&params)
742            .map_err(|e| {
743                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
744            })?
745            .into_bytes();
746        let path = "/position/leverage";
747        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
748            .await
749    }
750}
751
752/// Provides a HTTP client for connecting to the [BitMEX](https://bitmex.com) REST API.
753///
754/// This is the high-level client that wraps the inner client and provides
755/// Nautilus-specific functionality for trading operations.
756#[derive(Debug)]
757#[cfg_attr(
758    feature = "python",
759    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
760)]
761pub struct BitmexHttpClient {
762    inner: Arc<BitmexRawHttpClient>,
763    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
764    cache_initialized: AtomicBool,
765}
766
767impl Clone for BitmexHttpClient {
768    fn clone(&self) -> Self {
769        let cache_initialized = AtomicBool::new(false);
770
771        let is_initialized = self.cache_initialized.load(Ordering::Acquire);
772        if is_initialized {
773            cache_initialized.store(true, Ordering::Release);
774        }
775
776        Self {
777            inner: self.inner.clone(),
778            instruments_cache: self.instruments_cache.clone(),
779            cache_initialized,
780        }
781    }
782}
783
784impl Default for BitmexHttpClient {
785    fn default() -> Self {
786        Self::new(
787            None,
788            None,
789            None,
790            false,
791            Some(60),
792            None,
793            None,
794            None,
795            None,
796            None,
797            None,
798            None, // proxy_url
799        )
800        .expect("Failed to create default BitmexHttpClient")
801    }
802}
803
804impl BitmexHttpClient {
805    /// Creates a new [`BitmexHttpClient`] instance.
806    ///
807    /// # Errors
808    ///
809    /// Returns an error if the HTTP client cannot be created.
810    #[allow(clippy::too_many_arguments)]
811    pub fn new(
812        base_url: Option<String>,
813        api_key: Option<String>,
814        api_secret: Option<String>,
815        testnet: bool,
816        timeout_secs: Option<u64>,
817        max_retries: Option<u32>,
818        retry_delay_ms: Option<u64>,
819        retry_delay_max_ms: Option<u64>,
820        recv_window_ms: Option<u64>,
821        max_requests_per_second: Option<u32>,
822        max_requests_per_minute: Option<u32>,
823        proxy_url: Option<String>,
824    ) -> Result<Self, BitmexHttpError> {
825        // Determine the base URL
826        let url = base_url.unwrap_or_else(|| {
827            if testnet {
828                BITMEX_HTTP_TESTNET_URL.to_string()
829            } else {
830                BITMEX_HTTP_URL.to_string()
831            }
832        });
833
834        let inner = match (api_key, api_secret) {
835            (Some(key), Some(secret)) => BitmexRawHttpClient::with_credentials(
836                key,
837                secret,
838                url,
839                timeout_secs,
840                max_retries,
841                retry_delay_ms,
842                retry_delay_max_ms,
843                recv_window_ms,
844                max_requests_per_second,
845                max_requests_per_minute,
846                proxy_url,
847            )?,
848            _ => BitmexRawHttpClient::new(
849                Some(url),
850                timeout_secs,
851                max_retries,
852                retry_delay_ms,
853                retry_delay_max_ms,
854                recv_window_ms,
855                max_requests_per_second,
856                max_requests_per_minute,
857                proxy_url,
858            )?,
859        };
860
861        Ok(Self {
862            inner: Arc::new(inner),
863            instruments_cache: Arc::new(DashMap::new()),
864            cache_initialized: AtomicBool::new(false),
865        })
866    }
867
868    /// Creates a new [`BitmexHttpClient`] instance using environment variables and
869    /// the default BitMEX HTTP base URL.
870    ///
871    /// # Errors
872    ///
873    /// Returns an error if required environment variables are not set or invalid.
874    pub fn from_env() -> anyhow::Result<Self> {
875        Self::with_credentials(
876            None, None, None, None, None, None, None, None, None, None, None,
877        )
878        .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
879    }
880
881    /// Creates a new [`BitmexHttpClient`] configured with credentials
882    /// for authenticated requests.
883    ///
884    /// If `api_key` or `api_secret` are `None`, they will be sourced from the
885    /// `BITMEX_API_KEY` and `BITMEX_API_SECRET` environment variables.
886    ///
887    /// # Errors
888    ///
889    /// Returns an error if one credential is provided without the other.
890    #[allow(clippy::too_many_arguments)]
891    pub fn with_credentials(
892        api_key: Option<String>,
893        api_secret: Option<String>,
894        base_url: Option<String>,
895        timeout_secs: Option<u64>,
896        max_retries: Option<u32>,
897        retry_delay_ms: Option<u64>,
898        retry_delay_max_ms: Option<u64>,
899        recv_window_ms: Option<u64>,
900        max_requests_per_second: Option<u32>,
901        max_requests_per_minute: Option<u32>,
902        proxy_url: Option<String>,
903    ) -> anyhow::Result<Self> {
904        // Determine testnet from URL first to select correct environment variables
905        let testnet = base_url.as_ref().is_some_and(|url| url.contains("testnet"));
906
907        // Choose environment variables based on testnet flag
908        let (key_var, secret_var) = if testnet {
909            ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
910        } else {
911            ("BITMEX_API_KEY", "BITMEX_API_SECRET")
912        };
913
914        let api_key = api_key.or_else(|| get_env_var(key_var).ok());
915        let api_secret = api_secret.or_else(|| get_env_var(secret_var).ok());
916
917        // If we're trying to create an authenticated client, we need both key and secret
918        if api_key.is_some() && api_secret.is_none() {
919            anyhow::bail!("{secret_var} is required when {key_var} is provided");
920        }
921        if api_key.is_none() && api_secret.is_some() {
922            anyhow::bail!("{key_var} is required when {secret_var} is provided");
923        }
924
925        Self::new(
926            base_url,
927            api_key,
928            api_secret,
929            testnet,
930            timeout_secs,
931            max_retries,
932            retry_delay_ms,
933            retry_delay_max_ms,
934            recv_window_ms,
935            max_requests_per_second,
936            max_requests_per_minute,
937            proxy_url,
938        )
939        .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
940    }
941
942    /// Returns the base url being used by the client.
943    #[must_use]
944    pub fn base_url(&self) -> &str {
945        self.inner.base_url.as_str()
946    }
947
948    /// Returns the public API key being used by the client.
949    #[must_use]
950    pub fn api_key(&self) -> Option<&str> {
951        self.inner.credential.as_ref().map(|c| c.api_key.as_str())
952    }
953
954    /// Returns a masked version of the API key for logging purposes.
955    #[must_use]
956    pub fn api_key_masked(&self) -> Option<String> {
957        self.inner.credential.as_ref().map(|c| c.api_key_masked())
958    }
959
960    /// Requests the current server time from BitMEX.
961    ///
962    /// Returns the BitMEX system time as a Unix timestamp in milliseconds.
963    ///
964    /// # Errors
965    ///
966    /// Returns an error if the HTTP request fails or if the response cannot be parsed.
967    pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
968        self.inner.get_server_time().await
969    }
970
971    /// Generates a timestamp for initialization.
972    fn generate_ts_init(&self) -> UnixNanos {
973        get_atomic_clock_realtime().get_time_ns()
974    }
975
976    /// Check if the order has a contingency type that requires linking.
977    fn is_contingent_order(contingency_type: ContingencyType) -> bool {
978        matches!(
979            contingency_type,
980            ContingencyType::Oco | ContingencyType::Oto | ContingencyType::Ouo
981        )
982    }
983
984    /// Check if the order is a parent in contingency relationships.
985    fn is_parent_contingency(contingency_type: ContingencyType) -> bool {
986        matches!(
987            contingency_type,
988            ContingencyType::Oco | ContingencyType::Oto
989        )
990    }
991
992    /// Populate missing `linked_order_ids` for contingency orders by grouping on `order_list_id`.
993    fn populate_linked_order_ids(reports: &mut [OrderStatusReport]) {
994        let mut order_list_groups: HashMap<OrderListId, Vec<ClientOrderId>> = HashMap::new();
995        let mut order_list_parents: HashMap<OrderListId, ClientOrderId> = HashMap::new();
996        let mut prefix_groups: HashMap<String, Vec<ClientOrderId>> = HashMap::new();
997        let mut prefix_parents: HashMap<String, ClientOrderId> = HashMap::new();
998
999        for report in reports.iter() {
1000            let Some(client_order_id) = report.client_order_id else {
1001                continue;
1002            };
1003
1004            if let Some(order_list_id) = report.order_list_id {
1005                order_list_groups
1006                    .entry(order_list_id)
1007                    .or_default()
1008                    .push(client_order_id);
1009
1010                if Self::is_parent_contingency(report.contingency_type) {
1011                    order_list_parents
1012                        .entry(order_list_id)
1013                        .or_insert(client_order_id);
1014                }
1015            }
1016
1017            if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1018                && Self::is_contingent_order(report.contingency_type)
1019            {
1020                prefix_groups
1021                    .entry(base.to_owned())
1022                    .or_default()
1023                    .push(client_order_id);
1024
1025                if Self::is_parent_contingency(report.contingency_type) {
1026                    prefix_parents
1027                        .entry(base.to_owned())
1028                        .or_insert(client_order_id);
1029                }
1030            }
1031        }
1032
1033        for report in reports.iter_mut() {
1034            let Some(client_order_id) = report.client_order_id else {
1035                continue;
1036            };
1037
1038            if report.linked_order_ids.is_some() {
1039                continue;
1040            }
1041
1042            // Only process contingent orders
1043            if !Self::is_contingent_order(report.contingency_type) {
1044                continue;
1045            }
1046
1047            if let Some(order_list_id) = report.order_list_id
1048                && let Some(group) = order_list_groups.get(&order_list_id)
1049            {
1050                let mut linked: Vec<ClientOrderId> = group
1051                    .iter()
1052                    .copied()
1053                    .filter(|candidate| candidate != &client_order_id)
1054                    .collect();
1055
1056                if !linked.is_empty() {
1057                    if let Some(parent_id) = order_list_parents.get(&order_list_id) {
1058                        if client_order_id != *parent_id {
1059                            linked.sort_by_key(
1060                                |candidate| {
1061                                    if candidate == parent_id { 0 } else { 1 }
1062                                },
1063                            );
1064                            report.parent_order_id = Some(*parent_id);
1065                        } else {
1066                            report.parent_order_id = None;
1067                        }
1068                    } else {
1069                        report.parent_order_id = None;
1070                    }
1071
1072                    tracing::trace!(
1073                        client_order_id = ?client_order_id,
1074                        order_list_id = ?order_list_id,
1075                        contingency_type = ?report.contingency_type,
1076                        linked_order_ids = ?linked,
1077                        "BitMEX linked ids sourced from order list id",
1078                    );
1079                    report.linked_order_ids = Some(linked);
1080                    continue;
1081                }
1082
1083                tracing::trace!(
1084                    client_order_id = ?client_order_id,
1085                    order_list_id = ?order_list_id,
1086                    contingency_type = ?report.contingency_type,
1087                    order_list_group = ?group,
1088                    "BitMEX order list id group had no peers",
1089                );
1090                report.parent_order_id = None;
1091            } else if report.order_list_id.is_none() {
1092                report.parent_order_id = None;
1093            }
1094
1095            if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1096                && let Some(group) = prefix_groups.get(base)
1097            {
1098                let mut linked: Vec<ClientOrderId> = group
1099                    .iter()
1100                    .copied()
1101                    .filter(|candidate| candidate != &client_order_id)
1102                    .collect();
1103
1104                if !linked.is_empty() {
1105                    if let Some(parent_id) = prefix_parents.get(base) {
1106                        if client_order_id != *parent_id {
1107                            linked.sort_by_key(
1108                                |candidate| {
1109                                    if candidate == parent_id { 0 } else { 1 }
1110                                },
1111                            );
1112                            report.parent_order_id = Some(*parent_id);
1113                        } else {
1114                            report.parent_order_id = None;
1115                        }
1116                    } else {
1117                        report.parent_order_id = None;
1118                    }
1119
1120                    tracing::trace!(
1121                        client_order_id = ?client_order_id,
1122                        contingency_type = ?report.contingency_type,
1123                        base = base,
1124                        linked_order_ids = ?linked,
1125                        "BitMEX linked ids constructed from client order id prefix",
1126                    );
1127                    report.linked_order_ids = Some(linked);
1128                    continue;
1129                }
1130
1131                tracing::trace!(
1132                    client_order_id = ?client_order_id,
1133                    contingency_type = ?report.contingency_type,
1134                    base = base,
1135                    prefix_group = ?group,
1136                    "BitMEX client order id prefix group had no peers",
1137                );
1138                report.parent_order_id = None;
1139            } else if client_order_id.as_str().contains('-') {
1140                report.parent_order_id = None;
1141            }
1142
1143            if Self::is_contingent_order(report.contingency_type) {
1144                tracing::warn!(
1145                    client_order_id = ?report.client_order_id,
1146                    order_list_id = ?report.order_list_id,
1147                    contingency_type = ?report.contingency_type,
1148                    "BitMEX order status report missing linked ids after grouping",
1149                );
1150                report.contingency_type = ContingencyType::NoContingency;
1151                report.parent_order_id = None;
1152            }
1153
1154            report.linked_order_ids = None;
1155        }
1156    }
1157
1158    /// Cancel all pending HTTP requests.
1159    pub fn cancel_all_requests(&self) {
1160        self.inner.cancel_all_requests();
1161    }
1162
1163    /// Get the cancellation token for this client.
1164    pub fn cancellation_token(&self) -> CancellationToken {
1165        self.inner.cancellation_token().clone()
1166    }
1167
1168    /// Caches a single instrument.
1169    ///
1170    /// Any existing instrument with the same symbol will be replaced.
1171    pub fn cache_instrument(&self, instrument: InstrumentAny) {
1172        self.instruments_cache
1173            .insert(instrument.raw_symbol().inner(), instrument);
1174        self.cache_initialized.store(true, Ordering::Release);
1175    }
1176
1177    /// Gets an instrument from the cache by symbol.
1178    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1179        self.instruments_cache
1180            .get(symbol)
1181            .map(|entry| entry.value().clone())
1182    }
1183
1184    /// Request a single instrument and parse it into a Nautilus type.
1185    ///
1186    /// # Errors
1187    ///
1188    /// Returns `Ok(Some(..))` when the venue returns a definition that parses
1189    /// successfully, `Ok(None)` when the instrument is unknown or the payload
1190    /// cannot be converted into a Nautilus `Instrument`.
1191    pub async fn request_instrument(
1192        &self,
1193        instrument_id: InstrumentId,
1194    ) -> anyhow::Result<Option<InstrumentAny>> {
1195        let response = self
1196            .inner
1197            .get_instrument(instrument_id.symbol.as_str())
1198            .await?;
1199
1200        let instrument = match response {
1201            Some(instrument) => instrument,
1202            None => return Ok(None),
1203        };
1204
1205        let ts_init = self.generate_ts_init();
1206
1207        Ok(parse_instrument_any(&instrument, ts_init))
1208    }
1209
1210    /// Request all available instruments and parse them into Nautilus types.
1211    ///
1212    /// # Errors
1213    ///
1214    /// Returns an error if the HTTP request fails or parsing fails.
1215    pub async fn request_instruments(
1216        &self,
1217        active_only: bool,
1218    ) -> anyhow::Result<Vec<InstrumentAny>> {
1219        let instruments = self.inner.get_instruments(active_only).await?;
1220        let ts_init = self.generate_ts_init();
1221
1222        let mut parsed_instruments = Vec::new();
1223        let mut failed_count = 0;
1224        let total_count = instruments.len();
1225
1226        for inst in instruments {
1227            if let Some(instrument_any) = parse_instrument_any(&inst, ts_init) {
1228                parsed_instruments.push(instrument_any);
1229            } else {
1230                failed_count += 1;
1231                tracing::error!(
1232                    "Failed to parse instrument: symbol={}, type={:?}, state={:?} - instrument will not be cached",
1233                    inst.symbol,
1234                    inst.instrument_type,
1235                    inst.state
1236                );
1237            }
1238        }
1239
1240        if failed_count > 0 {
1241            tracing::error!(
1242                "Instrument parse failures: {} failed out of {} total ({}  successfully parsed)",
1243                failed_count,
1244                total_count,
1245                parsed_instruments.len()
1246            );
1247        }
1248
1249        Ok(parsed_instruments)
1250    }
1251
1252    /// Get user wallet information.
1253    ///
1254    /// # Errors
1255    ///
1256    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1257    ///
1258    /// # Panics
1259    ///
1260    /// Panics if the inner mutex is poisoned.
1261    pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
1262        let inner = self.inner.clone();
1263        inner.get_wallet().await
1264    }
1265
1266    /// Get user orders.
1267    ///
1268    /// # Errors
1269    ///
1270    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1271    ///
1272    /// # Panics
1273    ///
1274    /// Panics if the inner mutex is poisoned.
1275    pub async fn get_orders(
1276        &self,
1277        params: GetOrderParams,
1278    ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
1279        let inner = self.inner.clone();
1280        inner.get_orders(params).await
1281    }
1282
1283    /// Get instrument from the instruments cache (if found).
1284    ///
1285    /// # Errors
1286    ///
1287    /// Returns an error if the instrument is not found in the cache.
1288    fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1289        self.get_instrument(&symbol).ok_or_else(|| {
1290            anyhow::anyhow!(
1291                "Instrument {symbol} not found in cache, ensure instruments loaded first"
1292            )
1293        })
1294    }
1295
1296    /// Returns the cached price precision for the given symbol.
1297    ///
1298    /// # Errors
1299    ///
1300    /// Returns an error if the instrument was never cached (for example, if
1301    /// instruments were not loaded prior to use).
1302    pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
1303        self.instrument_from_cache(symbol)
1304            .map(|instrument| instrument.price_precision())
1305    }
1306
1307    /// Get user margin information.
1308    ///
1309    /// # Errors
1310    ///
1311    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1312    pub async fn get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
1313        self.inner
1314            .get_margin(currency)
1315            .await
1316            .map_err(|e| anyhow::anyhow!(e))
1317    }
1318
1319    /// Request account state for the given account.
1320    ///
1321    /// # Errors
1322    ///
1323    /// Returns an error if the HTTP request fails or no account state is returned.
1324    pub async fn request_account_state(
1325        &self,
1326        account_id: AccountId,
1327    ) -> anyhow::Result<AccountState> {
1328        // Get margin data for XBt (Bitcoin) by default
1329        let margin = self
1330            .inner
1331            .get_margin("XBt")
1332            .await
1333            .map_err(|e| anyhow::anyhow!(e))?;
1334
1335        let ts_init = nautilus_core::nanos::UnixNanos::from(
1336            chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64,
1337        );
1338
1339        // Convert HTTP Margin to WebSocket MarginMsg for parsing
1340        let margin_msg = BitmexMarginMsg {
1341            account: margin.account,
1342            currency: margin.currency,
1343            risk_limit: margin.risk_limit,
1344            amount: margin.amount,
1345            prev_realised_pnl: margin.prev_realised_pnl,
1346            gross_comm: margin.gross_comm,
1347            gross_open_cost: margin.gross_open_cost,
1348            gross_open_premium: margin.gross_open_premium,
1349            gross_exec_cost: margin.gross_exec_cost,
1350            gross_mark_value: margin.gross_mark_value,
1351            risk_value: margin.risk_value,
1352            init_margin: margin.init_margin,
1353            maint_margin: margin.maint_margin,
1354            target_excess_margin: margin.target_excess_margin,
1355            realised_pnl: margin.realised_pnl,
1356            unrealised_pnl: margin.unrealised_pnl,
1357            wallet_balance: margin.wallet_balance,
1358            margin_balance: margin.margin_balance,
1359            margin_leverage: margin.margin_leverage,
1360            margin_used_pcnt: margin.margin_used_pcnt,
1361            excess_margin: margin.excess_margin,
1362            available_margin: margin.available_margin,
1363            withdrawable_margin: margin.withdrawable_margin,
1364            maker_fee_discount: None, // Not in HTTP response
1365            taker_fee_discount: None, // Not in HTTP response
1366            timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
1367            foreign_margin_balance: None,
1368            foreign_requirement: None,
1369        };
1370
1371        parse_account_state(&margin_msg, account_id, ts_init)
1372    }
1373
1374    /// Submit a new order.
1375    ///
1376    /// # Errors
1377    ///
1378    /// Returns an error if credentials are missing, the request fails, order validation fails,
1379    /// the order is rejected, or the API returns an error.
1380    #[allow(clippy::too_many_arguments)]
1381    pub async fn submit_order(
1382        &self,
1383        instrument_id: InstrumentId,
1384        client_order_id: ClientOrderId,
1385        order_side: OrderSide,
1386        order_type: OrderType,
1387        quantity: Quantity,
1388        time_in_force: TimeInForce,
1389        price: Option<Price>,
1390        trigger_price: Option<Price>,
1391        trigger_type: Option<TriggerType>,
1392        display_qty: Option<Quantity>,
1393        post_only: bool,
1394        reduce_only: bool,
1395        order_list_id: Option<OrderListId>,
1396        contingency_type: Option<ContingencyType>,
1397    ) -> anyhow::Result<OrderStatusReport> {
1398        use crate::common::enums::{
1399            BitmexExecInstruction, BitmexOrderType, BitmexSide, BitmexTimeInForce,
1400        };
1401
1402        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1403
1404        let mut params = super::query::PostOrderParamsBuilder::default();
1405        params.text(NAUTILUS_TRADER);
1406        params.symbol(instrument_id.symbol.as_str());
1407        params.cl_ord_id(client_order_id.as_str());
1408
1409        let side = BitmexSide::try_from_order_side(order_side)?;
1410        params.side(side);
1411
1412        let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
1413        params.ord_type(ord_type);
1414
1415        params.order_qty(quantity_to_u32(&quantity, &instrument));
1416
1417        let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
1418        params.time_in_force(tif);
1419
1420        if let Some(price) = price {
1421            params.price(price.as_f64());
1422        }
1423
1424        if let Some(trigger_price) = trigger_price {
1425            params.stop_px(trigger_price.as_f64());
1426        }
1427
1428        if let Some(display_qty) = display_qty {
1429            params.display_qty(quantity_to_u32(&display_qty, &instrument));
1430        }
1431
1432        if let Some(order_list_id) = order_list_id {
1433            params.cl_ord_link_id(order_list_id.as_str());
1434        }
1435
1436        let mut exec_inst = Vec::new();
1437
1438        if post_only {
1439            exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
1440        }
1441
1442        if reduce_only {
1443            exec_inst.push(BitmexExecInstruction::ReduceOnly);
1444        }
1445
1446        if trigger_price.is_some()
1447            && let Some(trigger_type) = trigger_type
1448        {
1449            match trigger_type {
1450                TriggerType::LastPrice => exec_inst.push(BitmexExecInstruction::LastPrice),
1451                TriggerType::MarkPrice => exec_inst.push(BitmexExecInstruction::MarkPrice),
1452                TriggerType::IndexPrice => exec_inst.push(BitmexExecInstruction::IndexPrice),
1453                _ => {} // Use BitMEX default (LastPrice) for other trigger types
1454            }
1455        }
1456
1457        if !exec_inst.is_empty() {
1458            params.exec_inst(exec_inst);
1459        }
1460
1461        if let Some(contingency_type) = contingency_type {
1462            let bitmex_contingency = BitmexContingencyType::try_from(contingency_type)?;
1463            params.contingency_type(bitmex_contingency);
1464        }
1465
1466        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1467
1468        let response = self.inner.place_order(params).await?;
1469
1470        let order: BitmexOrder = serde_json::from_value(response)?;
1471
1472        if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1473            let reason = order
1474                .ord_rej_reason
1475                .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1476            anyhow::bail!("Order rejected: {reason}");
1477        }
1478
1479        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1480        let ts_init = self.generate_ts_init();
1481
1482        parse_order_status_report(&order, &instrument, ts_init)
1483    }
1484
1485    /// Cancel an order.
1486    ///
1487    /// # Errors
1488    ///
1489    /// Returns an error if:
1490    /// - Credentials are missing.
1491    /// - The request fails.
1492    /// - The order doesn't exist.
1493    /// - The API returns an error.
1494    pub async fn cancel_order(
1495        &self,
1496        instrument_id: InstrumentId,
1497        client_order_id: Option<ClientOrderId>,
1498        venue_order_id: Option<VenueOrderId>,
1499    ) -> anyhow::Result<OrderStatusReport> {
1500        let mut params = super::query::DeleteOrderParamsBuilder::default();
1501        params.text(NAUTILUS_TRADER);
1502
1503        if let Some(venue_order_id) = venue_order_id {
1504            params.order_id(vec![venue_order_id.as_str().to_string()]);
1505        } else if let Some(client_order_id) = client_order_id {
1506            params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
1507        } else {
1508            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1509        }
1510
1511        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1512
1513        let response = self.inner.cancel_orders(params).await?;
1514
1515        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1516        let order = orders
1517            .into_iter()
1518            .next()
1519            .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1520
1521        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1522        let ts_init = self.generate_ts_init();
1523
1524        parse_order_status_report(&order, &instrument, ts_init)
1525    }
1526
1527    /// Cancel multiple orders.
1528    ///
1529    /// # Errors
1530    ///
1531    /// Returns an error if:
1532    /// - Credentials are missing.
1533    /// - The request fails.
1534    /// - The order doesn't exist.
1535    /// - The API returns an error.
1536    pub async fn cancel_orders(
1537        &self,
1538        instrument_id: InstrumentId,
1539        client_order_ids: Option<Vec<ClientOrderId>>,
1540        venue_order_ids: Option<Vec<VenueOrderId>>,
1541    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1542        let mut params = super::query::DeleteOrderParamsBuilder::default();
1543        params.text(NAUTILUS_TRADER);
1544
1545        // BitMEX API requires either client order IDs or venue order IDs, not both
1546        // Prioritize venue order IDs if both are provided
1547        if let Some(venue_order_ids) = venue_order_ids {
1548            if venue_order_ids.is_empty() {
1549                anyhow::bail!("venue_order_ids cannot be empty");
1550            }
1551            params.order_id(
1552                venue_order_ids
1553                    .iter()
1554                    .map(|id| id.to_string())
1555                    .collect::<Vec<_>>(),
1556            );
1557        } else if let Some(client_order_ids) = client_order_ids {
1558            if client_order_ids.is_empty() {
1559                anyhow::bail!("client_order_ids cannot be empty");
1560            }
1561            params.cl_ord_id(
1562                client_order_ids
1563                    .iter()
1564                    .map(|id| id.to_string())
1565                    .collect::<Vec<_>>(),
1566            );
1567        } else {
1568            anyhow::bail!("Either client_order_ids or venue_order_ids must be provided");
1569        }
1570
1571        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1572
1573        let response = self.inner.cancel_orders(params).await?;
1574
1575        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1576
1577        let ts_init = self.generate_ts_init();
1578        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1579
1580        let mut reports = Vec::new();
1581
1582        for order in orders {
1583            reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1584        }
1585
1586        Self::populate_linked_order_ids(&mut reports);
1587
1588        Ok(reports)
1589    }
1590
1591    /// Cancel all orders for an instrument and optionally an order side.
1592    ///
1593    /// # Errors
1594    ///
1595    /// Returns an error if:
1596    /// - Credentials are missing.
1597    /// - The request fails.
1598    /// - The order doesn't exist.
1599    /// - The API returns an error.
1600    pub async fn cancel_all_orders(
1601        &self,
1602        instrument_id: InstrumentId,
1603        order_side: Option<OrderSide>,
1604    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1605        let mut params = DeleteAllOrdersParamsBuilder::default();
1606        params.text(NAUTILUS_TRADER);
1607        params.symbol(instrument_id.symbol.as_str());
1608
1609        if let Some(side) = order_side {
1610            let side = BitmexSide::try_from_order_side(side)?;
1611            params.filter(serde_json::json!({
1612                "side": side
1613            }));
1614        }
1615
1616        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1617
1618        let response = self.inner.cancel_all_orders(params).await?;
1619
1620        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1621
1622        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1623        let ts_init = self.generate_ts_init();
1624
1625        let mut reports = Vec::new();
1626
1627        for order in orders {
1628            reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1629        }
1630
1631        Self::populate_linked_order_ids(&mut reports);
1632
1633        Ok(reports)
1634    }
1635
1636    /// Modify an existing order.
1637    ///
1638    /// # Errors
1639    ///
1640    /// Returns an error if:
1641    /// - Credentials are missing.
1642    /// - The request fails.
1643    /// - The order doesn't exist.
1644    /// - The order is already closed.
1645    /// - The API returns an error.
1646    pub async fn modify_order(
1647        &self,
1648        instrument_id: InstrumentId,
1649        client_order_id: Option<ClientOrderId>,
1650        venue_order_id: Option<VenueOrderId>,
1651        quantity: Option<Quantity>,
1652        price: Option<Price>,
1653        trigger_price: Option<Price>,
1654    ) -> anyhow::Result<OrderStatusReport> {
1655        let mut params = PutOrderParamsBuilder::default();
1656        params.text(NAUTILUS_TRADER);
1657
1658        // Set order ID - prefer venue_order_id if available
1659        if let Some(venue_order_id) = venue_order_id {
1660            params.order_id(venue_order_id.as_str());
1661        } else if let Some(client_order_id) = client_order_id {
1662            params.orig_cl_ord_id(client_order_id.as_str());
1663        } else {
1664            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1665        }
1666
1667        if let Some(quantity) = quantity {
1668            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1669            params.order_qty(quantity_to_u32(&quantity, &instrument));
1670        }
1671
1672        if let Some(price) = price {
1673            params.price(price.as_f64());
1674        }
1675
1676        if let Some(trigger_price) = trigger_price {
1677            params.stop_px(trigger_price.as_f64());
1678        }
1679
1680        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1681
1682        let response = self.inner.amend_order(params).await?;
1683
1684        let order: BitmexOrder = serde_json::from_value(response)?;
1685
1686        if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1687            let reason = order
1688                .ord_rej_reason
1689                .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1690            anyhow::bail!("Order modification rejected: {reason}");
1691        }
1692
1693        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1694        let ts_init = self.generate_ts_init();
1695
1696        parse_order_status_report(&order, &instrument, ts_init)
1697    }
1698
1699    /// Query a single order by client order ID or venue order ID.
1700    ///
1701    /// # Errors
1702    ///
1703    /// Returns an error if:
1704    /// - Credentials are missing.
1705    /// - The request fails.
1706    /// - The API returns an error.
1707    pub async fn query_order(
1708        &self,
1709        instrument_id: InstrumentId,
1710        client_order_id: Option<ClientOrderId>,
1711        venue_order_id: Option<VenueOrderId>,
1712    ) -> anyhow::Result<Option<OrderStatusReport>> {
1713        let mut params = GetOrderParamsBuilder::default();
1714
1715        let filter_json = if let Some(client_order_id) = client_order_id {
1716            serde_json::json!({
1717                "clOrdID": client_order_id.to_string()
1718            })
1719        } else if let Some(venue_order_id) = venue_order_id {
1720            serde_json::json!({
1721                "orderID": venue_order_id.to_string()
1722            })
1723        } else {
1724            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1725        };
1726
1727        params.filter(filter_json);
1728        params.count(1); // Only need one order
1729
1730        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1731
1732        let response = self.inner.get_orders(params).await?;
1733
1734        if response.is_empty() {
1735            return Ok(None);
1736        }
1737
1738        let order = &response[0];
1739
1740        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1741        let ts_init = self.generate_ts_init();
1742
1743        let report = parse_order_status_report(order, &instrument, ts_init)?;
1744
1745        Ok(Some(report))
1746    }
1747
1748    /// Request a single order status report.
1749    ///
1750    /// # Errors
1751    ///
1752    /// Returns an error if:
1753    /// - Credentials are missing.
1754    /// - The request fails.
1755    /// - The API returns an error.
1756    pub async fn request_order_status_report(
1757        &self,
1758        instrument_id: InstrumentId,
1759        client_order_id: Option<ClientOrderId>,
1760        venue_order_id: Option<VenueOrderId>,
1761    ) -> anyhow::Result<OrderStatusReport> {
1762        let mut params = GetOrderParamsBuilder::default();
1763        params.symbol(instrument_id.symbol.as_str());
1764
1765        if let Some(venue_order_id) = venue_order_id {
1766            params.filter(serde_json::json!({
1767                "orderID": venue_order_id.as_str()
1768            }));
1769        } else if let Some(client_order_id) = client_order_id {
1770            params.filter(serde_json::json!({
1771                "clOrdID": client_order_id.as_str()
1772            }));
1773        }
1774
1775        params.count(1i32);
1776        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1777
1778        let response = self.inner.get_orders(params).await?;
1779
1780        let order = response
1781            .into_iter()
1782            .next()
1783            .ok_or_else(|| anyhow::anyhow!("Order not found"))?;
1784
1785        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1786        let ts_init = self.generate_ts_init();
1787
1788        parse_order_status_report(&order, &instrument, ts_init)
1789    }
1790
1791    /// Request multiple order status reports.
1792    ///
1793    /// # Errors
1794    ///
1795    /// Returns an error if:
1796    /// - Credentials are missing.
1797    /// - The request fails.
1798    /// - The API returns an error.
1799    pub async fn request_order_status_reports(
1800        &self,
1801        instrument_id: Option<InstrumentId>,
1802        open_only: bool,
1803        limit: Option<u32>,
1804    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1805        let mut params = GetOrderParamsBuilder::default();
1806
1807        if let Some(instrument_id) = &instrument_id {
1808            params.symbol(instrument_id.symbol.as_str());
1809        }
1810
1811        if open_only {
1812            params.filter(serde_json::json!({
1813                "open": true
1814            }));
1815        }
1816
1817        if let Some(limit) = limit {
1818            params.count(limit as i32);
1819        } else {
1820            params.count(500); // Default count to avoid empty query
1821        }
1822
1823        params.reverse(true); // Get newest orders first
1824
1825        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1826
1827        let response = self.inner.get_orders(params).await?;
1828
1829        let ts_init = self.generate_ts_init();
1830
1831        let mut reports = Vec::new();
1832
1833        for order in response {
1834            // Skip orders without symbol (can happen with query responses)
1835            let Some(symbol) = order.symbol else {
1836                tracing::warn!("Order response missing symbol, skipping");
1837                continue;
1838            };
1839
1840            let Ok(instrument) = self.instrument_from_cache(symbol) else {
1841                tracing::debug!(
1842                    symbol = %symbol,
1843                    "Skipping order report for instrument not in cache"
1844                );
1845                continue;
1846            };
1847
1848            match parse_order_status_report(&order, &instrument, ts_init) {
1849                Ok(report) => reports.push(report),
1850                Err(e) => tracing::error!("Failed to parse order status report: {e}"),
1851            }
1852        }
1853
1854        Self::populate_linked_order_ids(&mut reports);
1855
1856        Ok(reports)
1857    }
1858
1859    /// Request trades for the given instrument.
1860    ///
1861    /// # Errors
1862    ///
1863    /// Returns an error if the HTTP request fails or parsing fails.
1864    pub async fn request_trades(
1865        &self,
1866        instrument_id: InstrumentId,
1867        start: Option<DateTime<Utc>>,
1868        end: Option<DateTime<Utc>>,
1869        limit: Option<u32>,
1870    ) -> anyhow::Result<Vec<TradeTick>> {
1871        let mut params = GetTradeParamsBuilder::default();
1872        params.symbol(instrument_id.symbol.as_str());
1873
1874        if let Some(start) = start {
1875            params.start_time(start);
1876        }
1877
1878        if let Some(end) = end {
1879            params.end_time(end);
1880        }
1881
1882        if let (Some(start), Some(end)) = (start, end) {
1883            anyhow::ensure!(
1884                start < end,
1885                "Invalid time range: start={start:?} end={end:?}",
1886            );
1887        }
1888
1889        if let Some(limit) = limit {
1890            let clamped_limit = limit.min(1000);
1891            if limit > 1000 {
1892                tracing::warn!(
1893                    limit,
1894                    clamped_limit,
1895                    "BitMEX trade request limit exceeds venue maximum; clamping",
1896                );
1897            }
1898            params.count(i32::try_from(clamped_limit).unwrap_or(1000));
1899        }
1900        params.reverse(false);
1901        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1902
1903        let response = self.inner.get_trades(params).await?;
1904
1905        let ts_init = self.generate_ts_init();
1906
1907        let mut parsed_trades = Vec::new();
1908
1909        for trade in response {
1910            if let Some(start) = start
1911                && trade.timestamp < start
1912            {
1913                continue;
1914            }
1915
1916            if let Some(end) = end
1917                && trade.timestamp > end
1918            {
1919                continue;
1920            }
1921
1922            let price_precision = self.get_price_precision(trade.symbol)?;
1923
1924            match parse_trade(trade, price_precision, ts_init) {
1925                Ok(trade) => parsed_trades.push(trade),
1926                Err(e) => tracing::error!("Failed to parse trade: {e}"),
1927            }
1928        }
1929
1930        Ok(parsed_trades)
1931    }
1932
1933    /// Request bars for the given bar type.
1934    ///
1935    /// # Errors
1936    ///
1937    /// Returns an error if the HTTP request fails, parsing fails, or the bar specification is
1938    /// unsupported by BitMEX.
1939    pub async fn request_bars(
1940        &self,
1941        mut bar_type: BarType,
1942        start: Option<DateTime<Utc>>,
1943        end: Option<DateTime<Utc>>,
1944        limit: Option<u32>,
1945        partial: bool,
1946    ) -> anyhow::Result<Vec<Bar>> {
1947        bar_type = bar_type.standard();
1948
1949        anyhow::ensure!(
1950            bar_type.aggregation_source() == AggregationSource::External,
1951            "Only EXTERNAL aggregation bars are supported"
1952        );
1953        anyhow::ensure!(
1954            bar_type.spec().price_type == PriceType::Last,
1955            "Only LAST price type bars are supported"
1956        );
1957        if let (Some(start), Some(end)) = (start, end) {
1958            anyhow::ensure!(
1959                start < end,
1960                "Invalid time range: start={start:?} end={end:?}"
1961            );
1962        }
1963
1964        let spec = bar_type.spec();
1965        let bin_size = match (spec.aggregation, spec.step.get()) {
1966            (BarAggregation::Minute, 1) => "1m",
1967            (BarAggregation::Minute, 5) => "5m",
1968            (BarAggregation::Hour, 1) => "1h",
1969            (BarAggregation::Day, 1) => "1d",
1970            _ => anyhow::bail!(
1971                "BitMEX does not support {}-{:?}-{:?} bars",
1972                spec.step.get(),
1973                spec.aggregation,
1974                spec.price_type,
1975            ),
1976        };
1977
1978        let instrument_id = bar_type.instrument_id();
1979        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1980
1981        let mut params = GetTradeBucketedParamsBuilder::default();
1982        params.symbol(instrument_id.symbol.as_str());
1983        params.bin_size(bin_size);
1984        if partial {
1985            params.partial(true);
1986        }
1987        if let Some(start) = start {
1988            params.start_time(start);
1989        }
1990        if let Some(end) = end {
1991            params.end_time(end);
1992        }
1993        if let Some(limit) = limit {
1994            let clamped_limit = limit.min(1000);
1995            if limit > 1000 {
1996                tracing::warn!(
1997                    limit,
1998                    clamped_limit,
1999                    "BitMEX bar request limit exceeds venue maximum; clamping",
2000                );
2001            }
2002            params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2003        }
2004        params.reverse(false);
2005        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2006
2007        let response = self.inner.get_trade_bucketed(params).await?;
2008        let ts_init = self.generate_ts_init();
2009        let mut bars = Vec::new();
2010
2011        for bin in response {
2012            if let Some(start) = start
2013                && bin.timestamp < start
2014            {
2015                continue;
2016            }
2017            if let Some(end) = end
2018                && bin.timestamp > end
2019            {
2020                continue;
2021            }
2022            if bin.symbol != instrument_id.symbol.inner() {
2023                tracing::warn!(
2024                    symbol = %bin.symbol,
2025                    expected = %instrument_id.symbol,
2026                    "Skipping trade bin for unexpected symbol",
2027                );
2028                continue;
2029            }
2030
2031            match parse_trade_bin(bin, &instrument, &bar_type, ts_init) {
2032                Ok(bar) => bars.push(bar),
2033                Err(e) => tracing::warn!("Failed to parse trade bin: {e}"),
2034            }
2035        }
2036
2037        Ok(bars)
2038    }
2039
2040    /// Request fill reports for the given instrument.
2041    ///
2042    /// # Errors
2043    ///
2044    /// Returns an error if the HTTP request fails or parsing fails.
2045    pub async fn request_fill_reports(
2046        &self,
2047        instrument_id: Option<InstrumentId>,
2048        limit: Option<u32>,
2049    ) -> anyhow::Result<Vec<FillReport>> {
2050        let mut params = GetExecutionParamsBuilder::default();
2051        if let Some(instrument_id) = instrument_id {
2052            params.symbol(instrument_id.symbol.as_str());
2053        }
2054        if let Some(limit) = limit {
2055            params.count(limit as i32);
2056        } else {
2057            params.count(500); // Default count
2058        }
2059        params.reverse(true); // Get newest fills first
2060
2061        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2062
2063        let response = self.inner.get_executions(params).await?;
2064
2065        let ts_init = self.generate_ts_init();
2066
2067        let mut reports = Vec::new();
2068
2069        for exec in response {
2070            // Skip executions without symbol (e.g., CancelReject)
2071            let Some(symbol) = exec.symbol else {
2072                tracing::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
2073                continue;
2074            };
2075            let symbol_str = symbol.to_string();
2076
2077            let instrument = match self.instrument_from_cache(symbol) {
2078                Ok(instrument) => instrument,
2079                Err(e) => {
2080                    tracing::error!(symbol = %symbol_str, "Instrument not found in cache for execution parsing: {e}");
2081                    continue;
2082                }
2083            };
2084
2085            match parse_fill_report(exec, &instrument, ts_init) {
2086                Ok(report) => reports.push(report),
2087                Err(e) => {
2088                    // Log at debug level for expected skip cases
2089                    let error_msg = e.to_string();
2090                    if error_msg.starts_with("Skipping non-trade execution")
2091                        || error_msg.starts_with("Skipping execution without order_id")
2092                    {
2093                        tracing::debug!("{e}");
2094                    } else {
2095                        tracing::error!("Failed to parse fill report: {e}");
2096                    }
2097                }
2098            }
2099        }
2100
2101        Ok(reports)
2102    }
2103
2104    /// Request position reports.
2105    ///
2106    /// # Errors
2107    ///
2108    /// Returns an error if the HTTP request fails or parsing fails.
2109    pub async fn request_position_status_reports(
2110        &self,
2111    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2112        let params = GetPositionParamsBuilder::default()
2113            .count(500) // Default count
2114            .build()
2115            .map_err(|e| anyhow::anyhow!(e))?;
2116
2117        let response = self.inner.get_positions(params).await?;
2118
2119        let ts_init = self.generate_ts_init();
2120
2121        let mut reports = Vec::new();
2122
2123        for pos in response {
2124            let symbol = pos.symbol;
2125            let instrument = match self.instrument_from_cache(symbol) {
2126                Ok(instrument) => instrument,
2127                Err(e) => {
2128                    tracing::error!(
2129                        symbol = pos.symbol.as_str(),
2130                        "Instrument not found in cache for position parsing: {e}"
2131                    );
2132                    continue;
2133                }
2134            };
2135
2136            match parse_position_report(pos, &instrument, ts_init) {
2137                Ok(report) => reports.push(report),
2138                Err(e) => tracing::error!("Failed to parse position report: {e}"),
2139            }
2140        }
2141
2142        Ok(reports)
2143    }
2144
2145    /// Update position leverage.
2146    ///
2147    /// # Errors
2148    ///
2149    /// - Credentials are missing.
2150    /// - The request fails.
2151    /// - The API returns an error.
2152    pub async fn update_position_leverage(
2153        &self,
2154        symbol: &str,
2155        leverage: f64,
2156    ) -> anyhow::Result<PositionStatusReport> {
2157        let params = PostPositionLeverageParams {
2158            symbol: symbol.to_string(),
2159            leverage,
2160            target_account_id: None,
2161        };
2162
2163        let response = self.inner.update_position_leverage(params).await?;
2164
2165        let instrument = self.instrument_from_cache(Ustr::from(symbol))?;
2166        let ts_init = self.generate_ts_init();
2167
2168        parse_position_report(response, &instrument, ts_init)
2169    }
2170}
2171
2172////////////////////////////////////////////////////////////////////////////////
2173// Tests
2174////////////////////////////////////////////////////////////////////////////////
2175
2176#[cfg(test)]
2177mod tests {
2178    use nautilus_core::UUID4;
2179    use nautilus_model::enums::OrderStatus;
2180    use rstest::rstest;
2181    use serde_json::json;
2182
2183    use super::*;
2184
2185    fn build_report(
2186        client_order_id: &str,
2187        venue_order_id: &str,
2188        contingency_type: ContingencyType,
2189        order_list_id: Option<&str>,
2190    ) -> OrderStatusReport {
2191        let mut report = OrderStatusReport::new(
2192            AccountId::from("BITMEX-1"),
2193            InstrumentId::from("XBTUSD.BITMEX"),
2194            Some(ClientOrderId::from(client_order_id)),
2195            VenueOrderId::from(venue_order_id),
2196            OrderSide::Buy,
2197            OrderType::Limit,
2198            TimeInForce::Gtc,
2199            OrderStatus::Accepted,
2200            Quantity::new(100.0, 0),
2201            Quantity::default(),
2202            UnixNanos::from(1_u64),
2203            UnixNanos::from(1_u64),
2204            UnixNanos::from(1_u64),
2205            Some(UUID4::new()),
2206        );
2207
2208        if let Some(id) = order_list_id {
2209            report = report.with_order_list_id(OrderListId::from(id));
2210        }
2211
2212        report.with_contingency_type(contingency_type)
2213    }
2214
2215    #[rstest]
2216    fn test_sign_request_generates_correct_headers() {
2217        let client = BitmexRawHttpClient::with_credentials(
2218            "test_api_key".to_string(),
2219            "test_api_secret".to_string(),
2220            "http://localhost:8080".to_string(),
2221            Some(60),
2222            None, // max_retries
2223            None, // retry_delay_ms
2224            None, // retry_delay_max_ms
2225            None, // recv_window_ms
2226            None, // max_requests_per_second
2227            None, // max_requests_per_minute
2228            None, // proxy_url
2229        )
2230        .expect("Failed to create test client");
2231
2232        let headers = client
2233            .sign_request(&Method::GET, "/api/v1/order", None)
2234            .unwrap();
2235
2236        assert!(headers.contains_key("api-key"));
2237        assert!(headers.contains_key("api-signature"));
2238        assert!(headers.contains_key("api-expires"));
2239        assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
2240    }
2241
2242    #[rstest]
2243    fn test_sign_request_with_body() {
2244        let client = BitmexRawHttpClient::with_credentials(
2245            "test_api_key".to_string(),
2246            "test_api_secret".to_string(),
2247            "http://localhost:8080".to_string(),
2248            Some(60),
2249            None, // max_retries
2250            None, // retry_delay_ms
2251            None, // retry_delay_max_ms
2252            None, // recv_window_ms
2253            None, // max_requests_per_second
2254            None, // max_requests_per_minute
2255            None, // proxy_url
2256        )
2257        .expect("Failed to create test client");
2258
2259        let body = json!({"symbol": "XBTUSD", "orderQty": 100});
2260        let body_bytes = serde_json::to_vec(&body).unwrap();
2261
2262        let headers_without_body = client
2263            .sign_request(&Method::POST, "/api/v1/order", None)
2264            .unwrap();
2265        let headers_with_body = client
2266            .sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
2267            .unwrap();
2268
2269        // Signatures should be different when body is included
2270        assert_ne!(
2271            headers_without_body.get("api-signature").unwrap(),
2272            headers_with_body.get("api-signature").unwrap()
2273        );
2274    }
2275
2276    #[rstest]
2277    fn test_sign_request_uses_custom_recv_window() {
2278        let client_default = BitmexRawHttpClient::with_credentials(
2279            "test_api_key".to_string(),
2280            "test_api_secret".to_string(),
2281            "http://localhost:8080".to_string(),
2282            Some(60),
2283            None,
2284            None,
2285            None,
2286            None, // Use default recv_window_ms (10000ms = 10s)
2287            None, // max_requests_per_second
2288            None, // max_requests_per_minute
2289            None, // proxy_url
2290        )
2291        .expect("Failed to create test client");
2292
2293        let client_custom = BitmexRawHttpClient::with_credentials(
2294            "test_api_key".to_string(),
2295            "test_api_secret".to_string(),
2296            "http://localhost:8080".to_string(),
2297            Some(60),
2298            None,
2299            None,
2300            None,
2301            Some(30_000), // 30 seconds
2302            None,         // max_requests_per_second
2303            None,         // max_requests_per_minute
2304            None,         // proxy_url
2305        )
2306        .expect("Failed to create test client");
2307
2308        let headers_default = client_default
2309            .sign_request(&Method::GET, "/api/v1/order", None)
2310            .unwrap();
2311        let headers_custom = client_custom
2312            .sign_request(&Method::GET, "/api/v1/order", None)
2313            .unwrap();
2314
2315        // Parse expires timestamps
2316        let expires_default: i64 = headers_default.get("api-expires").unwrap().parse().unwrap();
2317        let expires_custom: i64 = headers_custom.get("api-expires").unwrap().parse().unwrap();
2318
2319        // Verify both are valid future timestamps
2320        let now = Utc::now().timestamp();
2321        assert!(expires_default > now);
2322        assert!(expires_custom > now);
2323
2324        // Custom window should be greater than default
2325        assert!(expires_custom > expires_default);
2326
2327        // The difference should be approximately 20 seconds (30s - 10s)
2328        // Allow wider tolerance for delays between calls on slow CI runners
2329        let diff = expires_custom - expires_default;
2330        assert!((18..=25).contains(&diff));
2331    }
2332
2333    #[rstest]
2334    fn test_populate_linked_order_ids_from_order_list() {
2335        let base = "O-20250922-002219-001-000";
2336        let entry = format!("{base}-1");
2337        let stop = format!("{base}-2");
2338        let take = format!("{base}-3");
2339
2340        let mut reports = vec![
2341            build_report(&entry, "V-1", ContingencyType::Oto, Some("OL-1")),
2342            build_report(&stop, "V-2", ContingencyType::Ouo, Some("OL-1")),
2343            build_report(&take, "V-3", ContingencyType::Ouo, Some("OL-1")),
2344        ];
2345
2346        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2347
2348        assert_eq!(
2349            reports[0].linked_order_ids,
2350            Some(vec![
2351                ClientOrderId::from(stop.as_str()),
2352                ClientOrderId::from(take.as_str()),
2353            ]),
2354        );
2355        assert_eq!(
2356            reports[1].linked_order_ids,
2357            Some(vec![
2358                ClientOrderId::from(entry.as_str()),
2359                ClientOrderId::from(take.as_str()),
2360            ]),
2361        );
2362        assert_eq!(
2363            reports[2].linked_order_ids,
2364            Some(vec![
2365                ClientOrderId::from(entry.as_str()),
2366                ClientOrderId::from(stop.as_str()),
2367            ]),
2368        );
2369    }
2370
2371    #[rstest]
2372    fn test_populate_linked_order_ids_from_id_prefix() {
2373        let base = "O-20250922-002220-001-000";
2374        let entry = format!("{base}-1");
2375        let stop = format!("{base}-2");
2376        let take = format!("{base}-3");
2377
2378        let mut reports = vec![
2379            build_report(&entry, "V-1", ContingencyType::Oto, None),
2380            build_report(&stop, "V-2", ContingencyType::Ouo, None),
2381            build_report(&take, "V-3", ContingencyType::Ouo, None),
2382        ];
2383
2384        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2385
2386        assert_eq!(
2387            reports[0].linked_order_ids,
2388            Some(vec![
2389                ClientOrderId::from(stop.as_str()),
2390                ClientOrderId::from(take.as_str()),
2391            ]),
2392        );
2393        assert_eq!(
2394            reports[1].linked_order_ids,
2395            Some(vec![
2396                ClientOrderId::from(entry.as_str()),
2397                ClientOrderId::from(take.as_str()),
2398            ]),
2399        );
2400        assert_eq!(
2401            reports[2].linked_order_ids,
2402            Some(vec![
2403                ClientOrderId::from(entry.as_str()),
2404                ClientOrderId::from(stop.as_str()),
2405            ]),
2406        );
2407    }
2408
2409    #[rstest]
2410    fn test_populate_linked_order_ids_respects_non_contingent_orders() {
2411        let base = "O-20250922-002221-001-000";
2412        let entry = format!("{base}-1");
2413        let passive = format!("{base}-2");
2414
2415        let mut reports = vec![
2416            build_report(&entry, "V-1", ContingencyType::NoContingency, None),
2417            build_report(&passive, "V-2", ContingencyType::Ouo, None),
2418        ];
2419
2420        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2421
2422        // Non-contingent orders should not be linked
2423        assert!(reports[0].linked_order_ids.is_none());
2424
2425        // A contingent order with no other contingent peers should have contingency reset
2426        assert!(reports[1].linked_order_ids.is_none());
2427        assert_eq!(reports[1].contingency_type, ContingencyType::NoContingency);
2428    }
2429}