Skip to main content

nautilus_bitmex/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the HTTP client integration for the [BitMEX](https://bitmex.com) REST API.
17//!
18//! This module defines and implements a [`BitmexHttpClient`] for
19//! sending requests to various BitMEX endpoints. It handles request signing
20//! (when credentials are provided), constructs valid HTTP requests
21//! using the [`HttpClient`], and parses the responses back into structured data or a [`BitmexHttpError`].
22//!
23//! BitMEX API reference <https://www.bitmex.com/api/explorer/#/default>.
24
25use std::{
26    collections::HashMap,
27    num::NonZeroU32,
28    sync::{
29        Arc, LazyLock, RwLock,
30        atomic::{AtomicBool, Ordering},
31    },
32};
33
34use chrono::{DateTime, Utc};
35use dashmap::DashMap;
36use nautilus_core::{
37    UUID4, UnixNanos,
38    consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
39    env::get_or_env_var_opt,
40    time::get_atomic_clock_realtime,
41};
42use nautilus_model::{
43    data::{Bar, BarType, TradeTick},
44    enums::{
45        AccountType, AggregationSource, BarAggregation, ContingencyType, OrderSide, OrderType,
46        PriceType, TimeInForce, TrailingOffsetType, TriggerType,
47    },
48    events::AccountState,
49    identifiers::{AccountId, ClientOrderId, InstrumentId, OrderListId, VenueOrderId},
50    instruments::{Instrument as InstrumentTrait, InstrumentAny},
51    reports::{FillReport, OrderStatusReport, PositionStatusReport},
52    types::{Price, Quantity},
53};
54use nautilus_network::{
55    http::{HttpClient, Method, StatusCode, USER_AGENT},
56    ratelimiter::quota::Quota,
57    retry::{RetryConfig, RetryManager},
58};
59use serde::{Deserialize, Serialize, de::DeserializeOwned};
60use serde_json::Value;
61use tokio_util::sync::CancellationToken;
62use ustr::Ustr;
63
64use super::{
65    error::{BitmexErrorResponse, BitmexHttpError},
66    models::{
67        BitmexApiInfo, BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder,
68        BitmexPosition, BitmexTrade, BitmexTradeBin, BitmexWallet,
69    },
70    query::{
71        DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
72        GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeBucketedParams,
73        GetTradeBucketedParamsBuilder, GetTradeParams, GetTradeParamsBuilder, PostOrderParams,
74        PostPositionLeverageParams, PutOrderParams,
75    },
76};
77use crate::{
78    common::{
79        consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL},
80        credential::Credential,
81        enums::{
82            BitmexContingencyType, BitmexExecInstruction, BitmexOrderStatus, BitmexOrderType,
83            BitmexPegPriceType, BitmexSide, BitmexTimeInForce,
84        },
85        parse::{parse_account_balance, quantity_to_u32},
86    },
87    http::{
88        parse::{
89            InstrumentParseResult, parse_fill_report, parse_instrument_any,
90            parse_order_status_report, parse_position_report, parse_trade, parse_trade_bin,
91        },
92        query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
93    },
94    websocket::messages::BitmexMarginMsg,
95};
96
97/// Default BitMEX REST API rate limits.
98///
99/// BitMEX implements a dual-layer rate limiting system:
100/// - Primary limit: 120 requests per minute for authenticated users (30 for unauthenticated).
101/// - Secondary limit: 10 requests per second burst limit for specific endpoints.
102const BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 10;
103const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED: u32 = 120;
104const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED: u32 = 30;
105
106const BITMEX_GLOBAL_RATE_KEY: &str = "bitmex:global";
107const BITMEX_MINUTE_RATE_KEY: &str = "bitmex:minute";
108
109static RATE_LIMIT_KEYS: LazyLock<Vec<Ustr>> = LazyLock::new(|| {
110    vec![
111        Ustr::from(BITMEX_GLOBAL_RATE_KEY),
112        Ustr::from(BITMEX_MINUTE_RATE_KEY),
113    ]
114});
115
116/// Represents a BitMEX HTTP response.
117#[derive(Debug, Serialize, Deserialize)]
118pub struct BitmexResponse<T> {
119    /// The typed data returned by the BitMEX endpoint.
120    pub data: Vec<T>,
121}
122
123/// Provides a lower-level HTTP client for connecting to the [BitMEX](https://bitmex.com) REST API.
124///
125/// This client wraps the underlying [`HttpClient`] to handle functionality
126/// specific to BitMEX, such as request signing (for authenticated endpoints),
127/// forming request URLs, and deserializing responses into specific data models.
128///
129/// # Connection Management
130///
131/// The client uses HTTP keep-alive for connection pooling with a 90-second idle timeout,
132/// which matches BitMEX's server-side keep-alive timeout. Connections are automatically
133/// reused for subsequent requests to minimize latency.
134///
135/// # Rate Limiting
136///
137/// BitMEX enforces the following rate limits:
138/// - 120 requests per minute for authenticated users (30 for unauthenticated).
139/// - 10 requests per second burst limit for certain endpoints (order management).
140///
141/// The client automatically respects these limits through the configured quota.
142#[derive(Debug, Clone)]
143pub struct BitmexRawHttpClient {
144    base_url: String,
145    client: HttpClient,
146    credential: Option<Credential>,
147    recv_window_ms: u64,
148    retry_manager: RetryManager<BitmexHttpError>,
149    cancellation_token: Arc<RwLock<CancellationToken>>,
150}
151
152impl Default for BitmexRawHttpClient {
153    fn default() -> Self {
154        Self::new(None, Some(60), None, None, None, None, None, None, None)
155            .expect("Failed to create default BitmexHttpInnerClient")
156    }
157}
158
159impl BitmexRawHttpClient {
160    /// Creates a new [`BitmexRawHttpClient`] using the default BitMEX HTTP URL,
161    /// optionally overridden with a custom base URL.
162    ///
163    /// This version of the client has **no credentials**, so it can only
164    /// call publicly accessible endpoints.
165    ///
166    /// # Errors
167    ///
168    /// Returns an error if the retry manager cannot be created.
169    #[allow(clippy::too_many_arguments)]
170    pub fn new(
171        base_url: Option<String>,
172        timeout_secs: Option<u64>,
173        max_retries: Option<u32>,
174        retry_delay_ms: Option<u64>,
175        retry_delay_max_ms: Option<u64>,
176        recv_window_ms: Option<u64>,
177        max_requests_per_second: Option<u32>,
178        max_requests_per_minute: Option<u32>,
179        proxy_url: Option<String>,
180    ) -> Result<Self, BitmexHttpError> {
181        let retry_config = RetryConfig {
182            max_retries: max_retries.unwrap_or(3),
183            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
184            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
185            backoff_factor: 2.0,
186            jitter_ms: 1000,
187            operation_timeout_ms: Some(60_000),
188            immediate_first: false,
189            max_elapsed_ms: Some(180_000),
190        };
191
192        let retry_manager = RetryManager::new(retry_config);
193
194        let max_req_per_sec =
195            max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
196        let max_req_per_min =
197            max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED);
198
199        Ok(Self {
200            base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
201            client: HttpClient::new(
202                Self::default_headers(),
203                vec![],
204                Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
205                Some(Self::default_quota(max_req_per_sec)),
206                timeout_secs,
207                proxy_url,
208            )
209            .map_err(|e| {
210                BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
211            })?,
212            credential: None,
213            recv_window_ms: recv_window_ms.unwrap_or(10_000),
214            retry_manager,
215            cancellation_token: Arc::new(RwLock::new(CancellationToken::new())),
216        })
217    }
218
219    /// Creates a new [`BitmexRawHttpClient`] configured with credentials
220    /// for authenticated requests, optionally using a custom base URL.
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if the retry manager cannot be created.
225    #[allow(clippy::too_many_arguments)]
226    pub fn with_credentials(
227        api_key: String,
228        api_secret: String,
229        base_url: String,
230        timeout_secs: Option<u64>,
231        max_retries: Option<u32>,
232        retry_delay_ms: Option<u64>,
233        retry_delay_max_ms: Option<u64>,
234        recv_window_ms: Option<u64>,
235        max_requests_per_second: Option<u32>,
236        max_requests_per_minute: Option<u32>,
237        proxy_url: Option<String>,
238    ) -> Result<Self, BitmexHttpError> {
239        let retry_config = RetryConfig {
240            max_retries: max_retries.unwrap_or(3),
241            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
242            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
243            backoff_factor: 2.0,
244            jitter_ms: 1000,
245            operation_timeout_ms: Some(60_000),
246            immediate_first: false,
247            max_elapsed_ms: Some(180_000),
248        };
249
250        let retry_manager = RetryManager::new(retry_config);
251
252        let max_req_per_sec =
253            max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
254        let max_req_per_min =
255            max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED);
256
257        Ok(Self {
258            base_url,
259            client: HttpClient::new(
260                Self::default_headers(),
261                vec![],
262                Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
263                Some(Self::default_quota(max_req_per_sec)),
264                timeout_secs,
265                proxy_url,
266            )
267            .map_err(|e| {
268                BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
269            })?,
270            credential: Some(Credential::new(api_key, api_secret)),
271            recv_window_ms: recv_window_ms.unwrap_or(10_000),
272            retry_manager,
273            cancellation_token: Arc::new(RwLock::new(CancellationToken::new())),
274        })
275    }
276
277    fn default_headers() -> HashMap<String, String> {
278        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
279    }
280
281    fn default_quota(max_requests_per_second: u32) -> Quota {
282        Quota::per_second(
283            NonZeroU32::new(max_requests_per_second)
284                .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
285        )
286    }
287
288    fn rate_limiter_quotas(
289        max_requests_per_second: u32,
290        max_requests_per_minute: u32,
291    ) -> Vec<(String, Quota)> {
292        let per_sec_quota = Quota::per_second(
293            NonZeroU32::new(max_requests_per_second)
294                .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
295        );
296        let per_min_quota =
297            Quota::per_minute(NonZeroU32::new(max_requests_per_minute).unwrap_or_else(|| {
298                NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED).unwrap()
299            }));
300
301        vec![
302            (BITMEX_GLOBAL_RATE_KEY.to_string(), per_sec_quota),
303            (BITMEX_MINUTE_RATE_KEY.to_string(), per_min_quota),
304        ]
305    }
306
307    fn rate_limit_keys() -> Vec<Ustr> {
308        RATE_LIMIT_KEYS.clone()
309    }
310
311    /// Cancel all pending HTTP requests.
312    ///
313    /// # Panics
314    ///
315    /// Panics if the cancellation token lock is poisoned.
316    pub fn cancel_all_requests(&self) {
317        self.cancellation_token
318            .read()
319            .expect("cancellation token lock poisoned")
320            .cancel();
321    }
322
323    /// Replace the cancellation token so new requests can proceed.
324    ///
325    /// # Panics
326    ///
327    /// Panics if the cancellation token lock is poisoned.
328    pub fn reset_cancellation_token(&self) {
329        *self
330            .cancellation_token
331            .write()
332            .expect("cancellation token lock poisoned") = CancellationToken::new();
333    }
334
335    /// Get a clone of the cancellation token for this client.
336    ///
337    /// # Panics
338    ///
339    /// Panics if the cancellation token lock is poisoned.
340    pub fn cancellation_token(&self) -> CancellationToken {
341        self.cancellation_token
342            .read()
343            .expect("cancellation token lock poisoned")
344            .clone()
345    }
346
347    fn sign_request(
348        &self,
349        method: &Method,
350        endpoint: &str,
351        body: Option<&[u8]>,
352    ) -> Result<HashMap<String, String>, BitmexHttpError> {
353        let credential = self
354            .credential
355            .as_ref()
356            .ok_or(BitmexHttpError::MissingCredentials)?;
357
358        let expires = Utc::now().timestamp() + (self.recv_window_ms / 1000) as i64;
359        let body_str = body.and_then(|b| std::str::from_utf8(b).ok()).unwrap_or("");
360
361        let full_path = if endpoint.starts_with("/api/v1") {
362            endpoint.to_string()
363        } else {
364            format!("/api/v1{endpoint}")
365        };
366
367        let signature = credential.sign(method.as_str(), &full_path, expires, body_str);
368
369        let mut headers = HashMap::new();
370        headers.insert("api-expires".to_string(), expires.to_string());
371        headers.insert("api-key".to_string(), credential.api_key.to_string());
372        headers.insert("api-signature".to_string(), signature);
373
374        // Add Content-Type header for form-encoded body
375        if body.is_some()
376            && (*method == Method::POST || *method == Method::PUT || *method == Method::DELETE)
377        {
378            headers.insert(
379                "Content-Type".to_string(),
380                "application/x-www-form-urlencoded".to_string(),
381            );
382        }
383
384        Ok(headers)
385    }
386
387    async fn send_request<T: DeserializeOwned, P: Serialize>(
388        &self,
389        method: Method,
390        endpoint: &str,
391        params: Option<&P>,
392        body: Option<Vec<u8>>,
393        authenticate: bool,
394    ) -> Result<T, BitmexHttpError> {
395        let endpoint = endpoint.to_string();
396        let method_clone = method.clone();
397        let body_clone = body.clone();
398
399        // Serialize params before closure to avoid reference lifetime issues
400        // Query params are used with GET and DELETE methods
401        let params_str = if method == Method::GET || method == Method::DELETE {
402            params
403                .map(serde_urlencoded::to_string)
404                .transpose()
405                .map_err(|e| {
406                    BitmexHttpError::JsonError(format!("Failed to serialize params: {e}"))
407                })?
408        } else {
409            None
410        };
411
412        let full_endpoint = match params_str {
413            Some(ref query) if !query.is_empty() => format!("{endpoint}?{query}"),
414            _ => endpoint.clone(),
415        };
416
417        let url = format!("{}{}", self.base_url, full_endpoint);
418
419        let operation = || {
420            let url = url.clone();
421            let method = method_clone.clone();
422            let body = body_clone.clone();
423            let full_endpoint = full_endpoint.clone();
424
425            async move {
426                let headers = if authenticate {
427                    Some(self.sign_request(&method, &full_endpoint, body.as_deref())?)
428                } else {
429                    None
430                };
431
432                let rate_keys = Self::rate_limit_keys();
433                let resp = self
434                    .client
435                    .request_with_ustr_keys(method, url, None, headers, body, None, Some(rate_keys))
436                    .await?;
437
438                if resp.status.is_success() {
439                    serde_json::from_slice(&resp.body).map_err(Into::into)
440                } else if let Ok(error_resp) =
441                    serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
442                {
443                    Err(error_resp.into())
444                } else {
445                    Err(BitmexHttpError::UnexpectedStatus {
446                        status: StatusCode::from_u16(resp.status.as_u16())
447                            .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
448                        body: String::from_utf8_lossy(&resp.body).to_string(),
449                    })
450                }
451            }
452        };
453
454        // Retry strategy based on BitMEX error responses and HTTP status codes:
455        //
456        // 1. Network errors: always retry (transient connection issues).
457        // 2. HTTP 5xx/429: server errors and rate limiting should be retried.
458        // 3. BitMEX JSON errors with specific handling:
459        //    - "RateLimitError": explicit rate limit error from BitMEX.
460        //    - "HTTPError": generic error name used by BitMEX for various issues
461        //      Only retry if message contains "rate limit" to avoid retrying
462        //      non-transient errors like authentication failures, validation errors,
463        //      insufficient balance, etc. which also return as "HTTPError".
464        //
465        // Note: BitMEX returns many permanent errors as "HTTPError" (e.g., "Invalid orderQty",
466        // "Account has insufficient Available Balance", "Invalid API Key") which should NOT
467        // be retried. We only retry when the message explicitly mentions rate limiting.
468        //
469        // See tests in tests/http.rs for retry behavior validation.
470        let should_retry = |error: &BitmexHttpError| -> bool {
471            match error {
472                BitmexHttpError::NetworkError(_) => true,
473                BitmexHttpError::UnexpectedStatus { status, .. } => {
474                    status.as_u16() >= 500 || status.as_u16() == 429
475                }
476                BitmexHttpError::BitmexError {
477                    error_name,
478                    message,
479                } => {
480                    error_name == "RateLimitError"
481                        || (error_name == "HTTPError"
482                            && message.to_lowercase().contains("rate limit"))
483                }
484                _ => false,
485            }
486        };
487
488        let create_error = |msg: String| -> BitmexHttpError {
489            if msg == "canceled" {
490                BitmexHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
491            } else {
492                BitmexHttpError::NetworkError(msg)
493            }
494        };
495
496        let cancel_token = self.cancellation_token();
497
498        self.retry_manager
499            .execute_with_retry_with_cancel(
500                endpoint.as_str(),
501                operation,
502                should_retry,
503                create_error,
504                &cancel_token,
505            )
506            .await
507    }
508
509    /// Get all instruments.
510    ///
511    /// # Errors
512    ///
513    /// Returns an error if the request fails, the response cannot be parsed, or the API returns an error.
514    pub async fn get_instruments(
515        &self,
516        active_only: bool,
517    ) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
518        let path = if active_only {
519            "/instrument/active"
520        } else {
521            "/instrument"
522        };
523        self.send_request::<_, ()>(Method::GET, path, None, None, false)
524            .await
525    }
526
527    /// Requests the current server time from BitMEX.
528    ///
529    /// Retrieves the BitMEX API info including the system time in Unix timestamp (milliseconds).
530    /// This is useful for synchronizing local clocks with the exchange server and logging time drift.
531    ///
532    /// # Errors
533    ///
534    /// Returns an error if the HTTP request fails or if the response body
535    /// cannot be parsed into [`BitmexApiInfo`].
536    pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
537        let response: BitmexApiInfo = self
538            .send_request::<_, ()>(Method::GET, "", None, None, false)
539            .await?;
540        Ok(response.timestamp)
541    }
542
543    /// Get the instrument definition for the specified symbol.
544    ///
545    /// BitMEX responds to `/instrument?symbol=...` with an array, even when
546    /// a single symbol is requested. This helper returns the first element of
547    /// that array and yields `Ok(None)` when the venue returns an empty list
548    /// (e.g. unknown symbol).
549    ///
550    /// # Errors
551    ///
552    /// Returns an error if the request fails or the payload cannot be deserialized.
553    pub async fn get_instrument(
554        &self,
555        symbol: &str,
556    ) -> Result<Option<BitmexInstrument>, BitmexHttpError> {
557        let path = &format!("/instrument?symbol={symbol}");
558        let instruments: Vec<BitmexInstrument> = self
559            .send_request::<_, ()>(Method::GET, path, None, None, false)
560            .await?;
561
562        Ok(instruments.into_iter().next())
563    }
564
565    /// Get user wallet information.
566    ///
567    /// # Errors
568    ///
569    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
570    pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
571        let endpoint = "/user/wallet";
572        self.send_request::<_, ()>(Method::GET, endpoint, None, None, true)
573            .await
574    }
575
576    /// Get user margin information for a specific currency.
577    ///
578    /// # Errors
579    ///
580    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
581    pub async fn get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
582        let path = format!("/user/margin?currency={currency}");
583        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
584            .await
585    }
586
587    /// Get user margin information for all currencies.
588    ///
589    /// # Errors
590    ///
591    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
592    pub async fn get_all_margins(&self) -> Result<Vec<BitmexMargin>, BitmexHttpError> {
593        self.send_request::<_, ()>(Method::GET, "/user/margin?currency=all", None, None, true)
594            .await
595    }
596
597    /// Get historical trades.
598    ///
599    /// # Errors
600    ///
601    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
602    ///
603    /// # Panics
604    ///
605    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
606    pub async fn get_trades(
607        &self,
608        params: GetTradeParams,
609    ) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
610        self.send_request(Method::GET, "/trade", Some(&params), None, true)
611            .await
612    }
613
614    /// Get bucketed (aggregated) trade data.
615    ///
616    /// # Errors
617    ///
618    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
619    pub async fn get_trade_bucketed(
620        &self,
621        params: GetTradeBucketedParams,
622    ) -> Result<Vec<BitmexTradeBin>, BitmexHttpError> {
623        self.send_request(Method::GET, "/trade/bucketed", Some(&params), None, true)
624            .await
625    }
626
627    /// Get user orders.
628    ///
629    /// # Errors
630    ///
631    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
632    ///
633    /// # Panics
634    ///
635    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
636    pub async fn get_orders(
637        &self,
638        params: GetOrderParams,
639    ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
640        self.send_request(Method::GET, "/order", Some(&params), None, true)
641            .await
642    }
643
644    /// Place a new order.
645    ///
646    /// # Errors
647    ///
648    /// Returns an error if credentials are missing, the request fails, order validation fails, or the API returns an error.
649    ///
650    /// # Panics
651    ///
652    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
653    pub async fn place_order(&self, params: PostOrderParams) -> Result<Value, BitmexHttpError> {
654        // BitMEX spec requires form-encoded body for POST /order
655        let body = serde_urlencoded::to_string(&params)
656            .map_err(|e| {
657                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
658            })?
659            .into_bytes();
660        let path = "/order";
661        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
662            .await
663    }
664
665    /// Cancel user orders.
666    ///
667    /// # Errors
668    ///
669    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
670    ///
671    /// # Panics
672    ///
673    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
674    pub async fn cancel_orders(&self, params: DeleteOrderParams) -> Result<Value, BitmexHttpError> {
675        // BitMEX spec requires form-encoded body for DELETE /order
676        let body = serde_urlencoded::to_string(&params)
677            .map_err(|e| {
678                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
679            })?
680            .into_bytes();
681        let path = "/order";
682        self.send_request::<_, ()>(Method::DELETE, path, None, Some(body), true)
683            .await
684    }
685
686    /// Amend an existing order.
687    ///
688    /// # Errors
689    ///
690    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
691    ///
692    /// # Panics
693    ///
694    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
695    pub async fn amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
696        // BitMEX spec requires form-encoded body for PUT /order
697        let body = serde_urlencoded::to_string(&params)
698            .map_err(|e| {
699                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
700            })?
701            .into_bytes();
702        let path = "/order";
703        self.send_request::<_, ()>(Method::PUT, path, None, Some(body), true)
704            .await
705    }
706
707    /// Cancel all orders.
708    ///
709    /// # Errors
710    ///
711    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
712    ///
713    /// # Panics
714    ///
715    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
716    ///
717    /// # References
718    ///
719    /// <https://www.bitmex.com/api/explorer/#!/Order/Order_cancelAll>
720    pub async fn cancel_all_orders(
721        &self,
722        params: DeleteAllOrdersParams,
723    ) -> Result<Value, BitmexHttpError> {
724        self.send_request(Method::DELETE, "/order/all", Some(&params), None, true)
725            .await
726    }
727
728    /// Get user executions.
729    ///
730    /// # Errors
731    ///
732    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
733    ///
734    /// # Panics
735    ///
736    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
737    pub async fn get_executions(
738        &self,
739        params: GetExecutionParams,
740    ) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
741        let query = serde_urlencoded::to_string(&params).map_err(|e| {
742            BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
743        })?;
744        let path = format!("/execution/tradeHistory?{query}");
745        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
746            .await
747    }
748
749    /// Get user positions.
750    ///
751    /// # Errors
752    ///
753    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
754    ///
755    /// # Panics
756    ///
757    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
758    pub async fn get_positions(
759        &self,
760        params: GetPositionParams,
761    ) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
762        self.send_request(Method::GET, "/position", Some(&params), None, true)
763            .await
764    }
765
766    /// Update position leverage.
767    ///
768    /// # Errors
769    ///
770    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
771    ///
772    /// # Panics
773    ///
774    /// Panics if the parameters cannot be serialized (should never happen with valid builder-generated params).
775    pub async fn update_position_leverage(
776        &self,
777        params: PostPositionLeverageParams,
778    ) -> Result<BitmexPosition, BitmexHttpError> {
779        // BitMEX spec requires form-encoded body for POST endpoints
780        let body = serde_urlencoded::to_string(&params)
781            .map_err(|e| {
782                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
783            })?
784            .into_bytes();
785        let path = "/position/leverage";
786        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
787            .await
788    }
789}
790
791/// Provides a HTTP client for connecting to the [BitMEX](https://bitmex.com) REST API.
792///
793/// This is the high-level client that wraps the inner client and provides
794/// Nautilus-specific functionality for trading operations.
795#[derive(Debug)]
796#[cfg_attr(
797    feature = "python",
798    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.bitmex")
799)]
800pub struct BitmexHttpClient {
801    inner: Arc<BitmexRawHttpClient>,
802    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
803    pub(crate) order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
804    cache_initialized: AtomicBool,
805}
806
807impl Clone for BitmexHttpClient {
808    fn clone(&self) -> Self {
809        let cache_initialized = AtomicBool::new(false);
810
811        let is_initialized = self.cache_initialized.load(Ordering::Acquire);
812        if is_initialized {
813            cache_initialized.store(true, Ordering::Release);
814        }
815
816        Self {
817            inner: self.inner.clone(),
818            instruments_cache: self.instruments_cache.clone(),
819            order_type_cache: self.order_type_cache.clone(),
820            cache_initialized,
821        }
822    }
823}
824
825impl Default for BitmexHttpClient {
826    fn default() -> Self {
827        Self::new(
828            None,
829            None,
830            None,
831            false,
832            Some(60),
833            None,
834            None,
835            None,
836            None,
837            None,
838            None,
839            None, // proxy_url
840        )
841        .expect("Failed to create default BitmexHttpClient")
842    }
843}
844
845impl BitmexHttpClient {
846    /// Creates a new [`BitmexHttpClient`] instance.
847    ///
848    /// # Errors
849    ///
850    /// Returns an error if the HTTP client cannot be created.
851    #[allow(clippy::too_many_arguments)]
852    pub fn new(
853        base_url: Option<String>,
854        api_key: Option<String>,
855        api_secret: Option<String>,
856        testnet: bool,
857        timeout_secs: Option<u64>,
858        max_retries: Option<u32>,
859        retry_delay_ms: Option<u64>,
860        retry_delay_max_ms: Option<u64>,
861        recv_window_ms: Option<u64>,
862        max_requests_per_second: Option<u32>,
863        max_requests_per_minute: Option<u32>,
864        proxy_url: Option<String>,
865    ) -> Result<Self, BitmexHttpError> {
866        // Determine the base URL
867        let url = base_url.unwrap_or_else(|| {
868            if testnet {
869                BITMEX_HTTP_TESTNET_URL.to_string()
870            } else {
871                BITMEX_HTTP_URL.to_string()
872            }
873        });
874
875        let inner = match (api_key, api_secret) {
876            (Some(key), Some(secret)) => BitmexRawHttpClient::with_credentials(
877                key,
878                secret,
879                url,
880                timeout_secs,
881                max_retries,
882                retry_delay_ms,
883                retry_delay_max_ms,
884                recv_window_ms,
885                max_requests_per_second,
886                max_requests_per_minute,
887                proxy_url,
888            )?,
889            (Some(_), None) | (None, Some(_)) => {
890                return Err(BitmexHttpError::ValidationError(
891                    "Both api_key and api_secret must be provided, or neither".to_string(),
892                ));
893            }
894            (None, None) => BitmexRawHttpClient::new(
895                Some(url),
896                timeout_secs,
897                max_retries,
898                retry_delay_ms,
899                retry_delay_max_ms,
900                recv_window_ms,
901                max_requests_per_second,
902                max_requests_per_minute,
903                proxy_url,
904            )?,
905        };
906
907        Ok(Self {
908            inner: Arc::new(inner),
909            instruments_cache: Arc::new(DashMap::new()),
910            order_type_cache: Arc::new(DashMap::new()),
911            cache_initialized: AtomicBool::new(false),
912        })
913    }
914
915    /// Creates a new [`BitmexHttpClient`] instance using environment variables and
916    /// the default BitMEX HTTP base URL.
917    ///
918    /// # Errors
919    ///
920    /// Returns an error if required environment variables are not set or invalid.
921    pub fn from_env() -> anyhow::Result<Self> {
922        Self::with_credentials(
923            None, None, None, None, None, None, None, None, None, None, None,
924        )
925        .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
926    }
927
928    /// Creates a new [`BitmexHttpClient`] configured with credentials
929    /// for authenticated requests.
930    ///
931    /// If `api_key` or `api_secret` are `None`, they will be sourced from the
932    /// `BITMEX_API_KEY` and `BITMEX_API_SECRET` environment variables.
933    ///
934    /// # Errors
935    ///
936    /// Returns an error if one credential is provided without the other.
937    #[allow(clippy::too_many_arguments)]
938    pub fn with_credentials(
939        api_key: Option<String>,
940        api_secret: Option<String>,
941        base_url: Option<String>,
942        timeout_secs: Option<u64>,
943        max_retries: Option<u32>,
944        retry_delay_ms: Option<u64>,
945        retry_delay_max_ms: Option<u64>,
946        recv_window_ms: Option<u64>,
947        max_requests_per_second: Option<u32>,
948        max_requests_per_minute: Option<u32>,
949        proxy_url: Option<String>,
950    ) -> anyhow::Result<Self> {
951        // Determine testnet from URL first to select correct environment variables
952        let testnet = base_url.as_ref().is_some_and(|url| url.contains("testnet"));
953
954        // Choose environment variables based on testnet flag
955        let (key_var, secret_var) = if testnet {
956            ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
957        } else {
958            ("BITMEX_API_KEY", "BITMEX_API_SECRET")
959        };
960
961        let api_key = get_or_env_var_opt(api_key, key_var);
962        let api_secret = get_or_env_var_opt(api_secret, secret_var);
963
964        // If we're trying to create an authenticated client, we need both key and secret
965        if api_key.is_some() && api_secret.is_none() {
966            anyhow::bail!("{secret_var} is required when {key_var} is provided");
967        }
968        if api_key.is_none() && api_secret.is_some() {
969            anyhow::bail!("{key_var} is required when {secret_var} is provided");
970        }
971
972        Self::new(
973            base_url,
974            api_key,
975            api_secret,
976            testnet,
977            timeout_secs,
978            max_retries,
979            retry_delay_ms,
980            retry_delay_max_ms,
981            recv_window_ms,
982            max_requests_per_second,
983            max_requests_per_minute,
984            proxy_url,
985        )
986        .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
987    }
988
989    /// Returns the base url being used by the client.
990    #[must_use]
991    pub fn base_url(&self) -> &str {
992        self.inner.base_url.as_str()
993    }
994
995    /// Returns the public API key being used by the client.
996    #[must_use]
997    pub fn api_key(&self) -> Option<&str> {
998        self.inner.credential.as_ref().map(|c| c.api_key.as_str())
999    }
1000
1001    /// Returns a masked version of the API key for logging purposes.
1002    #[must_use]
1003    pub fn api_key_masked(&self) -> Option<String> {
1004        self.inner.credential.as_ref().map(|c| c.api_key_masked())
1005    }
1006
1007    /// Requests the current server time from BitMEX.
1008    ///
1009    /// Returns the BitMEX system time as a Unix timestamp in milliseconds.
1010    ///
1011    /// # Errors
1012    ///
1013    /// Returns an error if the HTTP request fails or if the response cannot be parsed.
1014    pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
1015        self.inner.get_server_time().await
1016    }
1017
1018    /// Generates a timestamp for initialization.
1019    fn generate_ts_init(&self) -> UnixNanos {
1020        get_atomic_clock_realtime().get_time_ns()
1021    }
1022
1023    /// Check if the order has a contingency type that requires linking.
1024    fn is_contingent_order(contingency_type: ContingencyType) -> bool {
1025        matches!(
1026            contingency_type,
1027            ContingencyType::Oco | ContingencyType::Oto | ContingencyType::Ouo
1028        )
1029    }
1030
1031    /// Check if the order is a parent in contingency relationships.
1032    fn is_parent_contingency(contingency_type: ContingencyType) -> bool {
1033        matches!(
1034            contingency_type,
1035            ContingencyType::Oco | ContingencyType::Oto
1036        )
1037    }
1038
1039    /// Populate missing `linked_order_ids` for contingency orders by grouping on `order_list_id`.
1040    fn populate_linked_order_ids(reports: &mut [OrderStatusReport]) {
1041        let mut order_list_groups: HashMap<OrderListId, Vec<ClientOrderId>> = HashMap::new();
1042        let mut order_list_parents: HashMap<OrderListId, ClientOrderId> = HashMap::new();
1043        let mut prefix_groups: HashMap<String, Vec<ClientOrderId>> = HashMap::new();
1044        let mut prefix_parents: HashMap<String, ClientOrderId> = HashMap::new();
1045
1046        for report in reports.iter() {
1047            let Some(client_order_id) = report.client_order_id else {
1048                continue;
1049            };
1050
1051            if let Some(order_list_id) = report.order_list_id {
1052                order_list_groups
1053                    .entry(order_list_id)
1054                    .or_default()
1055                    .push(client_order_id);
1056
1057                if Self::is_parent_contingency(report.contingency_type) {
1058                    order_list_parents
1059                        .entry(order_list_id)
1060                        .or_insert(client_order_id);
1061                }
1062            }
1063
1064            if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1065                && Self::is_contingent_order(report.contingency_type)
1066            {
1067                prefix_groups
1068                    .entry(base.to_owned())
1069                    .or_default()
1070                    .push(client_order_id);
1071
1072                if Self::is_parent_contingency(report.contingency_type) {
1073                    prefix_parents
1074                        .entry(base.to_owned())
1075                        .or_insert(client_order_id);
1076                }
1077            }
1078        }
1079
1080        for report in reports.iter_mut() {
1081            let Some(client_order_id) = report.client_order_id else {
1082                continue;
1083            };
1084
1085            if report.linked_order_ids.is_some() {
1086                continue;
1087            }
1088
1089            // Only process contingent orders
1090            if !Self::is_contingent_order(report.contingency_type) {
1091                continue;
1092            }
1093
1094            if let Some(order_list_id) = report.order_list_id
1095                && let Some(group) = order_list_groups.get(&order_list_id)
1096            {
1097                let mut linked: Vec<ClientOrderId> = group
1098                    .iter()
1099                    .copied()
1100                    .filter(|candidate| candidate != &client_order_id)
1101                    .collect();
1102
1103                if !linked.is_empty() {
1104                    if let Some(parent_id) = order_list_parents.get(&order_list_id) {
1105                        if client_order_id == *parent_id {
1106                            report.parent_order_id = None;
1107                        } else {
1108                            linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1109                            report.parent_order_id = Some(*parent_id);
1110                        }
1111                    } else {
1112                        report.parent_order_id = None;
1113                    }
1114
1115                    log::trace!(
1116                        "BitMEX linked ids sourced from order list id: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}, linked_order_ids={:?}",
1117                        client_order_id,
1118                        order_list_id,
1119                        report.contingency_type,
1120                        linked,
1121                    );
1122                    report.linked_order_ids = Some(linked);
1123                    continue;
1124                }
1125
1126                log::trace!(
1127                    "BitMEX order list id group had no peers: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}, order_list_group={:?}",
1128                    client_order_id,
1129                    order_list_id,
1130                    report.contingency_type,
1131                    group,
1132                );
1133                report.parent_order_id = None;
1134            } else if report.order_list_id.is_none() {
1135                report.parent_order_id = None;
1136            }
1137
1138            if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1139                && let Some(group) = prefix_groups.get(base)
1140            {
1141                let mut linked: Vec<ClientOrderId> = group
1142                    .iter()
1143                    .copied()
1144                    .filter(|candidate| candidate != &client_order_id)
1145                    .collect();
1146
1147                if !linked.is_empty() {
1148                    if let Some(parent_id) = prefix_parents.get(base) {
1149                        if client_order_id == *parent_id {
1150                            report.parent_order_id = None;
1151                        } else {
1152                            linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1153                            report.parent_order_id = Some(*parent_id);
1154                        }
1155                    } else {
1156                        report.parent_order_id = None;
1157                    }
1158
1159                    log::trace!(
1160                        "BitMEX linked ids constructed from client order id prefix: client_order_id={:?}, contingency_type={:?}, base={}, linked_order_ids={:?}",
1161                        client_order_id,
1162                        report.contingency_type,
1163                        base,
1164                        linked,
1165                    );
1166                    report.linked_order_ids = Some(linked);
1167                    continue;
1168                }
1169
1170                log::trace!(
1171                    "BitMEX client order id prefix group had no peers: client_order_id={:?}, contingency_type={:?}, base={}, prefix_group={:?}",
1172                    client_order_id,
1173                    report.contingency_type,
1174                    base,
1175                    group,
1176                );
1177                report.parent_order_id = None;
1178            } else if client_order_id.as_str().contains('-') {
1179                report.parent_order_id = None;
1180            }
1181
1182            if Self::is_contingent_order(report.contingency_type) {
1183                log::warn!(
1184                    "BitMEX order status report missing linked ids after grouping: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}",
1185                    report.client_order_id,
1186                    report.order_list_id,
1187                    report.contingency_type,
1188                );
1189                report.contingency_type = ContingencyType::NoContingency;
1190                report.parent_order_id = None;
1191            }
1192
1193            report.linked_order_ids = None;
1194        }
1195    }
1196
1197    /// Cancel all pending HTTP requests.
1198    pub fn cancel_all_requests(&self) {
1199        self.inner.cancel_all_requests();
1200    }
1201
1202    /// Replace the cancellation token so new requests can proceed.
1203    pub fn reset_cancellation_token(&self) {
1204        self.inner.reset_cancellation_token();
1205    }
1206
1207    /// Get a clone of the cancellation token for this client.
1208    pub fn cancellation_token(&self) -> CancellationToken {
1209        self.inner.cancellation_token()
1210    }
1211
1212    /// Caches a single instrument.
1213    ///
1214    /// Any existing instrument with the same symbol will be replaced.
1215    pub fn cache_instrument(&self, instrument: InstrumentAny) {
1216        self.instruments_cache
1217            .insert(instrument.raw_symbol().inner(), instrument);
1218        self.cache_initialized.store(true, Ordering::Release);
1219    }
1220
1221    /// Gets an instrument from the cache by symbol.
1222    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1223        self.instruments_cache
1224            .get(symbol)
1225            .map(|entry| entry.value().clone())
1226    }
1227
1228    /// Request a single instrument and parse it into a Nautilus type.
1229    ///
1230    /// # Errors
1231    ///
1232    /// Returns `Ok(Some(..))` when the venue returns a definition that parses
1233    /// successfully, `Ok(None)` when the instrument is unknown, unsupported, or the payload
1234    /// cannot be converted into a Nautilus `Instrument`.
1235    pub async fn request_instrument(
1236        &self,
1237        instrument_id: InstrumentId,
1238    ) -> anyhow::Result<Option<InstrumentAny>> {
1239        let response = self
1240            .inner
1241            .get_instrument(instrument_id.symbol.as_str())
1242            .await?;
1243
1244        let instrument = match response {
1245            Some(instrument) => instrument,
1246            None => return Ok(None),
1247        };
1248
1249        let ts_init = self.generate_ts_init();
1250
1251        match parse_instrument_any(&instrument, ts_init) {
1252            InstrumentParseResult::Ok(inst) => Ok(Some(*inst)),
1253            InstrumentParseResult::Unsupported {
1254                symbol,
1255                instrument_type,
1256            } => {
1257                log::debug!(
1258                    "Instrument {symbol} has unsupported type {instrument_type:?}, returning None"
1259                );
1260                Ok(None)
1261            }
1262            InstrumentParseResult::Inactive { symbol, state } => {
1263                log::debug!("Instrument {symbol} is inactive (state={state}), returning None");
1264                Ok(None)
1265            }
1266            InstrumentParseResult::Failed {
1267                symbol,
1268                instrument_type,
1269                error,
1270            } => {
1271                log::error!(
1272                    "Failed to parse instrument {symbol} (type={instrument_type:?}): {error}"
1273                );
1274                Ok(None)
1275            }
1276        }
1277    }
1278
1279    /// Request all available instruments and parse them into Nautilus types.
1280    ///
1281    /// # Errors
1282    ///
1283    /// Returns an error if the HTTP request fails or parsing fails.
1284    pub async fn request_instruments(
1285        &self,
1286        active_only: bool,
1287    ) -> anyhow::Result<Vec<InstrumentAny>> {
1288        let instruments = self.inner.get_instruments(active_only).await?;
1289        let ts_init = self.generate_ts_init();
1290
1291        let mut parsed_instruments = Vec::new();
1292        let mut skipped_count = 0;
1293        let mut inactive_count = 0;
1294        let mut failed_count = 0;
1295        let total_count = instruments.len();
1296
1297        for inst in instruments {
1298            match parse_instrument_any(&inst, ts_init) {
1299                InstrumentParseResult::Ok(instrument_any) => {
1300                    parsed_instruments.push(*instrument_any);
1301                }
1302                InstrumentParseResult::Unsupported {
1303                    symbol,
1304                    instrument_type,
1305                } => {
1306                    skipped_count += 1;
1307                    log::debug!(
1308                        "Skipping unsupported instrument type: symbol={symbol}, type={instrument_type:?}"
1309                    );
1310                }
1311                InstrumentParseResult::Inactive { symbol, state } => {
1312                    inactive_count += 1;
1313                    log::debug!("Skipping inactive instrument: symbol={symbol}, state={state}");
1314                }
1315                InstrumentParseResult::Failed {
1316                    symbol,
1317                    instrument_type,
1318                    error,
1319                } => {
1320                    failed_count += 1;
1321                    log::error!(
1322                        "Failed to parse instrument: symbol={symbol}, type={instrument_type:?}, error={error}"
1323                    );
1324                }
1325            }
1326        }
1327
1328        if skipped_count > 0 {
1329            log::info!(
1330                "Skipped {skipped_count} unsupported instrument type(s) out of {total_count} total"
1331            );
1332        }
1333
1334        if inactive_count > 0 {
1335            log::info!(
1336                "Skipped {inactive_count} inactive instrument(s) out of {total_count} total"
1337            );
1338        }
1339
1340        if failed_count > 0 {
1341            log::error!(
1342                "Instrument parse failures: {failed_count} failed out of {total_count} total ({} successfully parsed)",
1343                parsed_instruments.len()
1344            );
1345        }
1346
1347        Ok(parsed_instruments)
1348    }
1349
1350    /// Get user wallet information.
1351    ///
1352    /// # Errors
1353    ///
1354    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1355    ///
1356    /// # Panics
1357    ///
1358    /// Panics if the inner mutex is poisoned.
1359    pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
1360        let inner = self.inner.clone();
1361        inner.get_wallet().await
1362    }
1363
1364    /// Get user orders.
1365    ///
1366    /// # Errors
1367    ///
1368    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1369    ///
1370    /// # Panics
1371    ///
1372    /// Panics if the inner mutex is poisoned.
1373    pub async fn get_orders(
1374        &self,
1375        params: GetOrderParams,
1376    ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
1377        let inner = self.inner.clone();
1378        inner.get_orders(params).await
1379    }
1380
1381    /// Get instrument from the instruments cache (if found).
1382    ///
1383    /// # Errors
1384    ///
1385    /// Returns an error if the instrument is not found in the cache.
1386    fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1387        self.get_instrument(&symbol).ok_or_else(|| {
1388            anyhow::anyhow!(
1389                "Instrument {symbol} not found in cache, ensure instruments loaded first"
1390            )
1391        })
1392    }
1393
1394    /// Returns the cached price precision for the given symbol.
1395    ///
1396    /// # Errors
1397    ///
1398    /// Returns an error if the instrument was never cached (for example, if
1399    /// instruments were not loaded prior to use).
1400    pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
1401        self.instrument_from_cache(symbol)
1402            .map(|instrument| instrument.price_precision())
1403    }
1404
1405    /// Get user margin information for a specific currency.
1406    ///
1407    /// # Errors
1408    ///
1409    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1410    pub async fn get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
1411        self.inner
1412            .get_margin(currency)
1413            .await
1414            .map_err(|e| anyhow::anyhow!(e))
1415    }
1416
1417    /// Get user margin information for all currencies.
1418    ///
1419    /// # Errors
1420    ///
1421    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1422    pub async fn get_all_margins(&self) -> anyhow::Result<Vec<BitmexMargin>> {
1423        self.inner
1424            .get_all_margins()
1425            .await
1426            .map_err(|e| anyhow::anyhow!(e))
1427    }
1428
1429    /// Request account state for the given account.
1430    ///
1431    /// # Errors
1432    ///
1433    /// Returns an error if the HTTP request fails or no account state is returned.
1434    pub async fn request_account_state(
1435        &self,
1436        account_id: AccountId,
1437    ) -> anyhow::Result<AccountState> {
1438        let margins = self
1439            .inner
1440            .get_all_margins()
1441            .await
1442            .map_err(|e| anyhow::anyhow!(e))?;
1443
1444        let ts_init =
1445            UnixNanos::from(chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64);
1446
1447        let mut balances = Vec::with_capacity(margins.len());
1448        let mut latest_timestamp: Option<chrono::DateTime<chrono::Utc>> = None;
1449
1450        for margin in margins {
1451            if let Some(ts) = margin.timestamp {
1452                latest_timestamp = Some(latest_timestamp.map_or(ts, |prev| prev.max(ts)));
1453            }
1454
1455            let margin_msg = BitmexMarginMsg {
1456                account: margin.account,
1457                currency: margin.currency,
1458                risk_limit: margin.risk_limit,
1459                amount: margin.amount,
1460                prev_realised_pnl: margin.prev_realised_pnl,
1461                gross_comm: margin.gross_comm,
1462                gross_open_cost: margin.gross_open_cost,
1463                gross_open_premium: margin.gross_open_premium,
1464                gross_exec_cost: margin.gross_exec_cost,
1465                gross_mark_value: margin.gross_mark_value,
1466                risk_value: margin.risk_value,
1467                init_margin: margin.init_margin,
1468                maint_margin: margin.maint_margin,
1469                target_excess_margin: margin.target_excess_margin,
1470                realised_pnl: margin.realised_pnl,
1471                unrealised_pnl: margin.unrealised_pnl,
1472                wallet_balance: margin.wallet_balance,
1473                margin_balance: margin.margin_balance,
1474                margin_leverage: margin.margin_leverage,
1475                margin_used_pcnt: margin.margin_used_pcnt,
1476                excess_margin: margin.excess_margin,
1477                available_margin: margin.available_margin,
1478                withdrawable_margin: margin.withdrawable_margin,
1479                maker_fee_discount: None,
1480                taker_fee_discount: None,
1481                timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
1482                foreign_margin_balance: None,
1483                foreign_requirement: None,
1484            };
1485
1486            balances.push(parse_account_balance(&margin_msg));
1487        }
1488
1489        if balances.is_empty() {
1490            anyhow::bail!("No margin data returned from BitMEX");
1491        }
1492
1493        let account_type = AccountType::Margin;
1494        let margins_vec = Vec::new();
1495        let is_reported = true;
1496        let event_id = UUID4::new();
1497
1498        // Use server timestamp if available, otherwise fall back to local time
1499        let ts_event = latest_timestamp.map_or(ts_init, |ts| {
1500            UnixNanos::from(ts.timestamp_nanos_opt().unwrap_or_default() as u64)
1501        });
1502
1503        Ok(AccountState::new(
1504            account_id,
1505            account_type,
1506            balances,
1507            margins_vec,
1508            is_reported,
1509            event_id,
1510            ts_event,
1511            ts_init,
1512            None,
1513        ))
1514    }
1515
1516    /// Submit a new order.
1517    ///
1518    /// # Errors
1519    ///
1520    /// Returns an error if credentials are missing, the request fails, order validation fails,
1521    /// the order is rejected, or the API returns an error.
1522    #[allow(clippy::too_many_arguments)]
1523    pub async fn submit_order(
1524        &self,
1525        instrument_id: InstrumentId,
1526        client_order_id: ClientOrderId,
1527        order_side: OrderSide,
1528        order_type: OrderType,
1529        quantity: Quantity,
1530        time_in_force: TimeInForce,
1531        price: Option<Price>,
1532        trigger_price: Option<Price>,
1533        trigger_type: Option<TriggerType>,
1534        trailing_offset: Option<f64>,
1535        trailing_offset_type: Option<TrailingOffsetType>,
1536        display_qty: Option<Quantity>,
1537        post_only: bool,
1538        reduce_only: bool,
1539        order_list_id: Option<OrderListId>,
1540        contingency_type: Option<ContingencyType>,
1541        peg_price_type: Option<BitmexPegPriceType>,
1542        peg_offset_value: Option<f64>,
1543    ) -> anyhow::Result<OrderStatusReport> {
1544        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1545
1546        let mut params = super::query::PostOrderParamsBuilder::default();
1547        params.text(NAUTILUS_TRADER);
1548        params.symbol(instrument_id.symbol.as_str());
1549        params.cl_ord_id(client_order_id.as_str());
1550
1551        if order_side == OrderSide::NoOrderSide {
1552            anyhow::bail!("Order side must be Buy or Sell");
1553        }
1554        let side = BitmexSide::from(order_side.as_specified());
1555        params.side(side);
1556
1557        let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
1558        params.ord_type(ord_type);
1559
1560        params.order_qty(quantity_to_u32(&quantity, &instrument));
1561
1562        let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
1563        params.time_in_force(tif);
1564
1565        if let Some(price) = price {
1566            params.price(price.as_f64());
1567        }
1568
1569        if let Some(trigger_price) = trigger_price {
1570            params.stop_px(trigger_price.as_f64());
1571        }
1572
1573        if let Some(display_qty) = display_qty {
1574            params.display_qty(quantity_to_u32(&display_qty, &instrument));
1575        }
1576
1577        if let Some(order_list_id) = order_list_id {
1578            params.cl_ord_link_id(order_list_id.as_str());
1579        }
1580
1581        let is_trailing_stop = matches!(
1582            order_type,
1583            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
1584        );
1585
1586        if is_trailing_stop && let Some(offset) = trailing_offset {
1587            if let Some(offset_type) = trailing_offset_type
1588                && offset_type != TrailingOffsetType::Price
1589            {
1590                anyhow::bail!(
1591                    "BitMEX only supports PRICE trailing offset type, was {offset_type:?}"
1592                );
1593            }
1594
1595            params.peg_price_type(BitmexPegPriceType::TrailingStopPeg);
1596
1597            // BitMEX requires negative offset for stop-sell orders
1598            let signed_offset = match order_side {
1599                OrderSide::Sell => -offset.abs(),
1600                OrderSide::Buy => offset.abs(),
1601                _ => offset,
1602            };
1603            params.peg_offset_value(signed_offset);
1604        }
1605
1606        // Pegged orders (BBO) via params override
1607        if peg_price_type.is_none() && peg_offset_value.is_some() {
1608            anyhow::bail!("`peg_offset_value` requires `peg_price_type`");
1609        }
1610        if let Some(peg_type) = peg_price_type {
1611            if order_type != OrderType::Limit {
1612                anyhow::bail!(
1613                    "Pegged orders only supported for LIMIT order type, was {order_type:?}"
1614                );
1615            }
1616            params.ord_type(BitmexOrderType::Pegged);
1617            params.peg_price_type(peg_type);
1618            if let Some(offset) = peg_offset_value {
1619                params.peg_offset_value(offset);
1620            }
1621        }
1622
1623        let mut exec_inst = Vec::new();
1624
1625        if post_only {
1626            exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
1627        }
1628
1629        if reduce_only {
1630            exec_inst.push(BitmexExecInstruction::ReduceOnly);
1631        }
1632
1633        // For trailing stops, trigger_type specifies which price to track (Mark, Last, Index)
1634        if (trigger_price.is_some() || is_trailing_stop)
1635            && let Some(trigger_type) = trigger_type
1636        {
1637            match trigger_type {
1638                TriggerType::LastPrice => exec_inst.push(BitmexExecInstruction::LastPrice),
1639                TriggerType::MarkPrice => exec_inst.push(BitmexExecInstruction::MarkPrice),
1640                TriggerType::IndexPrice => exec_inst.push(BitmexExecInstruction::IndexPrice),
1641                _ => {} // Use BitMEX default (LastPrice) for other trigger types
1642            }
1643        }
1644
1645        if !exec_inst.is_empty() {
1646            params.exec_inst(exec_inst);
1647        }
1648
1649        if let Some(contingency_type) = contingency_type {
1650            let bitmex_contingency = BitmexContingencyType::try_from(contingency_type)?;
1651            params.contingency_type(bitmex_contingency);
1652        }
1653
1654        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1655
1656        let response = self.inner.place_order(params).await?;
1657
1658        let order: BitmexOrder = serde_json::from_value(response)?;
1659
1660        if order.ord_status == Some(BitmexOrderStatus::Rejected) {
1661            let reason = order
1662                .ord_rej_reason
1663                .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1664            anyhow::bail!("Order rejected: {reason}");
1665        }
1666
1667        // Cache order type for future lookups (e.g., cancel responses missing ord_type)
1668        self.order_type_cache.insert(client_order_id, order_type);
1669
1670        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1671        let ts_init = self.generate_ts_init();
1672
1673        parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1674    }
1675
1676    /// Cancel an order.
1677    ///
1678    /// # Errors
1679    ///
1680    /// Returns an error if:
1681    /// - Credentials are missing.
1682    /// - The request fails.
1683    /// - The order doesn't exist.
1684    /// - The API returns an error.
1685    pub async fn cancel_order(
1686        &self,
1687        instrument_id: InstrumentId,
1688        client_order_id: Option<ClientOrderId>,
1689        venue_order_id: Option<VenueOrderId>,
1690    ) -> anyhow::Result<OrderStatusReport> {
1691        let mut params = super::query::DeleteOrderParamsBuilder::default();
1692        params.text(NAUTILUS_TRADER);
1693
1694        if let Some(venue_order_id) = venue_order_id {
1695            params.order_id(vec![venue_order_id.as_str().to_string()]);
1696        } else if let Some(client_order_id) = client_order_id {
1697            params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
1698        } else {
1699            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1700        }
1701
1702        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1703
1704        let response = self.inner.cancel_orders(params).await?;
1705
1706        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1707        let order = orders
1708            .into_iter()
1709            .next()
1710            .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1711
1712        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1713        let ts_init = self.generate_ts_init();
1714
1715        parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1716    }
1717
1718    /// Cancel multiple orders.
1719    ///
1720    /// # Errors
1721    ///
1722    /// Returns an error if:
1723    /// - Credentials are missing.
1724    /// - The request fails.
1725    /// - The order doesn't exist.
1726    /// - The API returns an error.
1727    pub async fn cancel_orders(
1728        &self,
1729        instrument_id: InstrumentId,
1730        client_order_ids: Option<Vec<ClientOrderId>>,
1731        venue_order_ids: Option<Vec<VenueOrderId>>,
1732    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1733        let mut params = super::query::DeleteOrderParamsBuilder::default();
1734        params.text(NAUTILUS_TRADER);
1735
1736        // BitMEX API requires either client order IDs or venue order IDs, not both
1737        // Prioritize venue order IDs if both are provided
1738        if let Some(venue_order_ids) = venue_order_ids {
1739            if venue_order_ids.is_empty() {
1740                anyhow::bail!("venue_order_ids cannot be empty");
1741            }
1742            params.order_id(
1743                venue_order_ids
1744                    .iter()
1745                    .map(|id| id.to_string())
1746                    .collect::<Vec<_>>(),
1747            );
1748        } else if let Some(client_order_ids) = client_order_ids {
1749            if client_order_ids.is_empty() {
1750                anyhow::bail!("client_order_ids cannot be empty");
1751            }
1752            params.cl_ord_id(
1753                client_order_ids
1754                    .iter()
1755                    .map(|id| id.to_string())
1756                    .collect::<Vec<_>>(),
1757            );
1758        } else {
1759            anyhow::bail!("Either client_order_ids or venue_order_ids must be provided");
1760        }
1761
1762        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1763
1764        let response = self.inner.cancel_orders(params).await?;
1765
1766        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1767
1768        let ts_init = self.generate_ts_init();
1769        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1770
1771        let mut reports = Vec::new();
1772
1773        for order in orders {
1774            reports.push(parse_order_status_report(
1775                &order,
1776                &instrument,
1777                &self.order_type_cache,
1778                ts_init,
1779            )?);
1780        }
1781
1782        Self::populate_linked_order_ids(&mut reports);
1783
1784        Ok(reports)
1785    }
1786
1787    /// Cancel all orders for an instrument and optionally an order side.
1788    ///
1789    /// # Errors
1790    ///
1791    /// Returns an error if:
1792    /// - Credentials are missing.
1793    /// - The request fails.
1794    /// - The order doesn't exist.
1795    /// - The API returns an error.
1796    pub async fn cancel_all_orders(
1797        &self,
1798        instrument_id: InstrumentId,
1799        order_side: Option<OrderSide>,
1800    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1801        let mut params = DeleteAllOrdersParamsBuilder::default();
1802        params.text(NAUTILUS_TRADER);
1803        params.symbol(instrument_id.symbol.as_str());
1804
1805        if let Some(side) = order_side {
1806            if side == OrderSide::NoOrderSide {
1807                log::debug!("Ignoring NoOrderSide filter for cancel_all_orders on {instrument_id}",);
1808            } else {
1809                let side = BitmexSide::from(side.as_specified());
1810                params.filter(serde_json::json!({
1811                    "side": side
1812                }));
1813            }
1814        }
1815
1816        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1817
1818        let response = self.inner.cancel_all_orders(params).await?;
1819
1820        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1821
1822        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1823        let ts_init = self.generate_ts_init();
1824
1825        let mut reports = Vec::new();
1826
1827        for order in orders {
1828            reports.push(parse_order_status_report(
1829                &order,
1830                &instrument,
1831                &self.order_type_cache,
1832                ts_init,
1833            )?);
1834        }
1835
1836        Self::populate_linked_order_ids(&mut reports);
1837
1838        Ok(reports)
1839    }
1840
1841    /// Modify an existing order.
1842    ///
1843    /// # Errors
1844    ///
1845    /// Returns an error if:
1846    /// - Credentials are missing.
1847    /// - The request fails.
1848    /// - The order doesn't exist.
1849    /// - The order is already closed.
1850    /// - The API returns an error.
1851    pub async fn modify_order(
1852        &self,
1853        instrument_id: InstrumentId,
1854        client_order_id: Option<ClientOrderId>,
1855        venue_order_id: Option<VenueOrderId>,
1856        quantity: Option<Quantity>,
1857        price: Option<Price>,
1858        trigger_price: Option<Price>,
1859    ) -> anyhow::Result<OrderStatusReport> {
1860        let mut params = PutOrderParamsBuilder::default();
1861        params.text(NAUTILUS_TRADER);
1862
1863        // Set order ID - prefer venue_order_id if available
1864        if let Some(venue_order_id) = venue_order_id {
1865            params.order_id(venue_order_id.as_str());
1866        } else if let Some(client_order_id) = client_order_id {
1867            params.orig_cl_ord_id(client_order_id.as_str());
1868        } else {
1869            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1870        }
1871
1872        if let Some(quantity) = quantity {
1873            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1874            params.order_qty(quantity_to_u32(&quantity, &instrument));
1875        }
1876
1877        if let Some(price) = price {
1878            params.price(price.as_f64());
1879        }
1880
1881        if let Some(trigger_price) = trigger_price {
1882            params.stop_px(trigger_price.as_f64());
1883        }
1884
1885        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1886
1887        let response = self.inner.amend_order(params).await?;
1888
1889        let order: BitmexOrder = serde_json::from_value(response)?;
1890
1891        if order.ord_status == Some(BitmexOrderStatus::Rejected) {
1892            let reason = order
1893                .ord_rej_reason
1894                .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1895            anyhow::bail!("Order modification rejected: {reason}");
1896        }
1897
1898        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1899        let ts_init = self.generate_ts_init();
1900
1901        parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1902    }
1903
1904    /// Query a single order by client order ID or venue order ID.
1905    ///
1906    /// # Errors
1907    ///
1908    /// Returns an error if:
1909    /// - Credentials are missing.
1910    /// - The request fails.
1911    /// - The API returns an error.
1912    pub async fn query_order(
1913        &self,
1914        instrument_id: InstrumentId,
1915        client_order_id: Option<ClientOrderId>,
1916        venue_order_id: Option<VenueOrderId>,
1917    ) -> anyhow::Result<Option<OrderStatusReport>> {
1918        let mut params = GetOrderParamsBuilder::default();
1919
1920        let filter_json = if let Some(client_order_id) = client_order_id {
1921            serde_json::json!({
1922                "clOrdID": client_order_id.to_string()
1923            })
1924        } else if let Some(venue_order_id) = venue_order_id {
1925            serde_json::json!({
1926                "orderID": venue_order_id.to_string()
1927            })
1928        } else {
1929            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1930        };
1931
1932        params.filter(filter_json);
1933        params.count(1); // Only need one order
1934
1935        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1936
1937        let response = self.inner.get_orders(params).await?;
1938
1939        if response.is_empty() {
1940            return Ok(None);
1941        }
1942
1943        let order = &response[0];
1944
1945        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1946        let ts_init = self.generate_ts_init();
1947
1948        let report =
1949            parse_order_status_report(order, &instrument, &self.order_type_cache, ts_init)?;
1950
1951        Ok(Some(report))
1952    }
1953
1954    /// Request a single order status report.
1955    ///
1956    /// # Errors
1957    ///
1958    /// Returns an error if:
1959    /// - Credentials are missing.
1960    /// - The request fails.
1961    /// - The API returns an error.
1962    pub async fn request_order_status_report(
1963        &self,
1964        instrument_id: InstrumentId,
1965        client_order_id: Option<ClientOrderId>,
1966        venue_order_id: Option<VenueOrderId>,
1967    ) -> anyhow::Result<OrderStatusReport> {
1968        if venue_order_id.is_none() && client_order_id.is_none() {
1969            anyhow::bail!("Either venue_order_id or client_order_id must be provided");
1970        }
1971
1972        let mut params = GetOrderParamsBuilder::default();
1973        params.symbol(instrument_id.symbol.as_str());
1974
1975        if let Some(venue_order_id) = venue_order_id {
1976            params.filter(serde_json::json!({
1977                "orderID": venue_order_id.as_str()
1978            }));
1979        } else if let Some(client_order_id) = client_order_id {
1980            params.filter(serde_json::json!({
1981                "clOrdID": client_order_id.as_str()
1982            }));
1983        }
1984
1985        params.count(1i32);
1986        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1987
1988        let response = self.inner.get_orders(params).await?;
1989
1990        let order = response
1991            .into_iter()
1992            .next()
1993            .ok_or_else(|| anyhow::anyhow!("Order not found"))?;
1994
1995        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1996        let ts_init = self.generate_ts_init();
1997
1998        parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1999    }
2000
2001    /// Request multiple order status reports.
2002    ///
2003    /// # Errors
2004    ///
2005    /// Returns an error if:
2006    /// - Credentials are missing.
2007    /// - The request fails.
2008    /// - The API returns an error.
2009    pub async fn request_order_status_reports(
2010        &self,
2011        instrument_id: Option<InstrumentId>,
2012        open_only: bool,
2013        start: Option<DateTime<Utc>>,
2014        end: Option<DateTime<Utc>>,
2015        limit: Option<u32>,
2016    ) -> anyhow::Result<Vec<OrderStatusReport>> {
2017        if let (Some(start), Some(end)) = (start, end) {
2018            anyhow::ensure!(
2019                start < end,
2020                "Invalid time range: start={start:?} end={end:?}",
2021            );
2022        }
2023
2024        let mut params = GetOrderParamsBuilder::default();
2025
2026        if let Some(instrument_id) = &instrument_id {
2027            params.symbol(instrument_id.symbol.as_str());
2028        }
2029
2030        if open_only {
2031            params.filter(serde_json::json!({
2032                "open": true
2033            }));
2034        }
2035
2036        if let Some(start) = start {
2037            params.start_time(start);
2038        }
2039
2040        if let Some(end) = end {
2041            params.end_time(end);
2042        }
2043
2044        if let Some(limit) = limit {
2045            params.count(limit as i32);
2046        } else {
2047            params.count(500); // Default count to avoid empty query
2048        }
2049
2050        params.reverse(true); // Get newest orders first
2051
2052        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2053
2054        let response = self.inner.get_orders(params).await?;
2055
2056        let ts_init = self.generate_ts_init();
2057
2058        let mut reports = Vec::new();
2059
2060        for order in response {
2061            if let Some(start) = start {
2062                match order.timestamp {
2063                    Some(timestamp) if timestamp < start => continue,
2064                    Some(_) => {}
2065                    None => {
2066                        log::debug!("Skipping order report without timestamp for bounded query");
2067                        continue;
2068                    }
2069                }
2070            }
2071
2072            if let Some(end) = end {
2073                match order.timestamp {
2074                    Some(timestamp) if timestamp > end => continue,
2075                    Some(_) => {}
2076                    None => {
2077                        log::debug!("Skipping order report without timestamp for bounded query");
2078                        continue;
2079                    }
2080                }
2081            }
2082
2083            // Skip orders without symbol (can happen with query responses)
2084            let Some(symbol) = order.symbol else {
2085                log::warn!("Order response missing symbol, skipping");
2086                continue;
2087            };
2088
2089            let Ok(instrument) = self.instrument_from_cache(symbol) else {
2090                log::debug!("Skipping order report for instrument not in cache: symbol={symbol}");
2091                continue;
2092            };
2093
2094            match parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init) {
2095                Ok(report) => reports.push(report),
2096                Err(e) => log::error!("Failed to parse order status report: {e}"),
2097            }
2098        }
2099
2100        Self::populate_linked_order_ids(&mut reports);
2101
2102        Ok(reports)
2103    }
2104
2105    /// Request trades for the given instrument.
2106    ///
2107    /// # Errors
2108    ///
2109    /// Returns an error if the HTTP request fails or parsing fails.
2110    pub async fn request_trades(
2111        &self,
2112        instrument_id: InstrumentId,
2113        start: Option<DateTime<Utc>>,
2114        end: Option<DateTime<Utc>>,
2115        limit: Option<u32>,
2116    ) -> anyhow::Result<Vec<TradeTick>> {
2117        let mut params = GetTradeParamsBuilder::default();
2118        params.symbol(instrument_id.symbol.as_str());
2119
2120        if let Some(start) = start {
2121            params.start_time(start);
2122        }
2123
2124        if let Some(end) = end {
2125            params.end_time(end);
2126        }
2127
2128        if let (Some(start), Some(end)) = (start, end) {
2129            anyhow::ensure!(
2130                start < end,
2131                "Invalid time range: start={start:?} end={end:?}",
2132            );
2133        }
2134
2135        if let Some(limit) = limit {
2136            let clamped_limit = limit.min(1000);
2137            if limit > 1000 {
2138                log::warn!(
2139                    "BitMEX trade request limit exceeds venue maximum; clamping: limit={limit}, clamped_limit={clamped_limit}",
2140                );
2141            }
2142            params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2143        }
2144        params.reverse(false);
2145        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2146
2147        let response = self.inner.get_trades(params).await?;
2148
2149        let ts_init = self.generate_ts_init();
2150
2151        let mut parsed_trades = Vec::new();
2152
2153        for trade in response {
2154            if let Some(start) = start
2155                && trade.timestamp < start
2156            {
2157                continue;
2158            }
2159
2160            if let Some(end) = end
2161                && trade.timestamp > end
2162            {
2163                continue;
2164            }
2165
2166            let Some(instrument) = self.get_instrument(&trade.symbol) else {
2167                log::error!(
2168                    "Instrument {} not found in cache, skipping trade",
2169                    trade.symbol
2170                );
2171                continue;
2172            };
2173
2174            match parse_trade(trade, &instrument, ts_init) {
2175                Ok(trade) => parsed_trades.push(trade),
2176                Err(e) => log::error!("Failed to parse trade: {e}"),
2177            }
2178        }
2179
2180        Ok(parsed_trades)
2181    }
2182
2183    /// Request bars for the given bar type.
2184    ///
2185    /// # Errors
2186    ///
2187    /// Returns an error if the HTTP request fails, parsing fails, or the bar specification is
2188    /// unsupported by BitMEX.
2189    pub async fn request_bars(
2190        &self,
2191        mut bar_type: BarType,
2192        start: Option<DateTime<Utc>>,
2193        end: Option<DateTime<Utc>>,
2194        limit: Option<u32>,
2195        partial: bool,
2196    ) -> anyhow::Result<Vec<Bar>> {
2197        bar_type = bar_type.standard();
2198
2199        anyhow::ensure!(
2200            bar_type.aggregation_source() == AggregationSource::External,
2201            "Only EXTERNAL aggregation bars are supported"
2202        );
2203        anyhow::ensure!(
2204            bar_type.spec().price_type == PriceType::Last,
2205            "Only LAST price type bars are supported"
2206        );
2207        if let (Some(start), Some(end)) = (start, end) {
2208            anyhow::ensure!(
2209                start < end,
2210                "Invalid time range: start={start:?} end={end:?}"
2211            );
2212        }
2213
2214        let spec = bar_type.spec();
2215        let bin_size = match (spec.aggregation, spec.step.get()) {
2216            (BarAggregation::Minute, 1) => "1m",
2217            (BarAggregation::Minute, 5) => "5m",
2218            (BarAggregation::Hour, 1) => "1h",
2219            (BarAggregation::Day, 1) => "1d",
2220            _ => anyhow::bail!(
2221                "BitMEX does not support {}-{:?}-{:?} bars",
2222                spec.step.get(),
2223                spec.aggregation,
2224                spec.price_type,
2225            ),
2226        };
2227
2228        let instrument_id = bar_type.instrument_id();
2229        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2230
2231        let mut params = GetTradeBucketedParamsBuilder::default();
2232        params.symbol(instrument_id.symbol.as_str());
2233        params.bin_size(bin_size);
2234        if partial {
2235            params.partial(true);
2236        }
2237        if let Some(start) = start {
2238            params.start_time(start);
2239        }
2240        if let Some(end) = end {
2241            params.end_time(end);
2242        }
2243        if let Some(limit) = limit {
2244            let clamped_limit = limit.min(1000);
2245            if limit > 1000 {
2246                log::warn!(
2247                    "BitMEX bar request limit exceeds venue maximum; clamping: limit={limit}, clamped_limit={clamped_limit}",
2248                );
2249            }
2250            params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2251        }
2252        params.reverse(false);
2253        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2254
2255        let response = self.inner.get_trade_bucketed(params).await?;
2256        let ts_init = self.generate_ts_init();
2257        let mut bars = Vec::new();
2258
2259        for bin in response {
2260            if let Some(start) = start
2261                && bin.timestamp < start
2262            {
2263                continue;
2264            }
2265            if let Some(end) = end
2266                && bin.timestamp > end
2267            {
2268                continue;
2269            }
2270            if bin.symbol != instrument_id.symbol.inner() {
2271                log::warn!(
2272                    "Skipping trade bin for unexpected symbol: symbol={}, expected={}",
2273                    bin.symbol,
2274                    instrument_id.symbol,
2275                );
2276                continue;
2277            }
2278
2279            match parse_trade_bin(bin, &instrument, &bar_type, ts_init) {
2280                Ok(bar) => bars.push(bar),
2281                Err(e) => log::warn!("Failed to parse trade bin: {e}"),
2282            }
2283        }
2284
2285        Ok(bars)
2286    }
2287
2288    /// Request fill reports for the given instrument.
2289    ///
2290    /// # Errors
2291    ///
2292    /// Returns an error if the HTTP request fails or parsing fails.
2293    pub async fn request_fill_reports(
2294        &self,
2295        instrument_id: Option<InstrumentId>,
2296        start: Option<DateTime<Utc>>,
2297        end: Option<DateTime<Utc>>,
2298        limit: Option<u32>,
2299    ) -> anyhow::Result<Vec<FillReport>> {
2300        if let (Some(start), Some(end)) = (start, end) {
2301            anyhow::ensure!(
2302                start < end,
2303                "Invalid time range: start={start:?} end={end:?}",
2304            );
2305        }
2306
2307        let mut params = GetExecutionParamsBuilder::default();
2308        if let Some(instrument_id) = instrument_id {
2309            params.symbol(instrument_id.symbol.as_str());
2310        }
2311        if let Some(start) = start {
2312            params.start_time(start);
2313        }
2314        if let Some(end) = end {
2315            params.end_time(end);
2316        }
2317        if let Some(limit) = limit {
2318            params.count(limit as i32);
2319        } else {
2320            params.count(500); // Default count
2321        }
2322        params.reverse(true); // Get newest fills first
2323
2324        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2325
2326        let response = self.inner.get_executions(params).await?;
2327
2328        let ts_init = self.generate_ts_init();
2329
2330        let mut reports = Vec::new();
2331
2332        for exec in response {
2333            if let Some(start) = start {
2334                match exec.transact_time {
2335                    Some(timestamp) if timestamp < start => continue,
2336                    Some(_) => {}
2337                    None => {
2338                        log::debug!("Skipping fill report without transact_time for bounded query");
2339                        continue;
2340                    }
2341                }
2342            }
2343
2344            if let Some(end) = end {
2345                match exec.transact_time {
2346                    Some(timestamp) if timestamp > end => continue,
2347                    Some(_) => {}
2348                    None => {
2349                        log::debug!("Skipping fill report without transact_time for bounded query");
2350                        continue;
2351                    }
2352                }
2353            }
2354
2355            // Skip executions without symbol (e.g., CancelReject)
2356            let Some(symbol) = exec.symbol else {
2357                log::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
2358                continue;
2359            };
2360            let symbol_str = symbol.to_string();
2361
2362            let instrument = match self.instrument_from_cache(symbol) {
2363                Ok(instrument) => instrument,
2364                Err(e) => {
2365                    log::error!(
2366                        "Instrument not found in cache for execution parsing: symbol={symbol_str}, {e}"
2367                    );
2368                    continue;
2369                }
2370            };
2371
2372            match parse_fill_report(exec, &instrument, ts_init) {
2373                Ok(report) => reports.push(report),
2374                Err(e) => {
2375                    // Log at debug level for expected skip cases
2376                    let error_msg = e.to_string();
2377                    if error_msg.starts_with("Skipping non-trade execution")
2378                        || error_msg.starts_with("Skipping execution without order_id")
2379                    {
2380                        log::debug!("{e}");
2381                    } else {
2382                        log::error!("Failed to parse fill report: {e}");
2383                    }
2384                }
2385            }
2386        }
2387
2388        Ok(reports)
2389    }
2390
2391    /// Request position reports.
2392    ///
2393    /// # Errors
2394    ///
2395    /// Returns an error if the HTTP request fails or parsing fails.
2396    pub async fn request_position_status_reports(
2397        &self,
2398    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2399        let params = GetPositionParamsBuilder::default()
2400            .count(500) // Default count
2401            .build()
2402            .map_err(|e| anyhow::anyhow!(e))?;
2403
2404        let response = self.inner.get_positions(params).await?;
2405
2406        let ts_init = self.generate_ts_init();
2407
2408        let mut reports = Vec::new();
2409
2410        for pos in response {
2411            let symbol = pos.symbol;
2412            let instrument = match self.instrument_from_cache(symbol) {
2413                Ok(instrument) => instrument,
2414                Err(e) => {
2415                    log::error!(
2416                        "Instrument not found in cache for position parsing: symbol={}, {e}",
2417                        pos.symbol.as_str(),
2418                    );
2419                    continue;
2420                }
2421            };
2422
2423            match parse_position_report(pos, &instrument, ts_init) {
2424                Ok(report) => reports.push(report),
2425                Err(e) => log::error!("Failed to parse position report: {e}"),
2426            }
2427        }
2428
2429        Ok(reports)
2430    }
2431
2432    /// Update position leverage.
2433    ///
2434    /// # Errors
2435    ///
2436    /// - Credentials are missing.
2437    /// - The request fails.
2438    /// - The API returns an error.
2439    pub async fn update_position_leverage(
2440        &self,
2441        symbol: &str,
2442        leverage: f64,
2443    ) -> anyhow::Result<PositionStatusReport> {
2444        let params = PostPositionLeverageParams {
2445            symbol: symbol.to_string(),
2446            leverage,
2447            target_account_id: None,
2448        };
2449
2450        let response = self.inner.update_position_leverage(params).await?;
2451
2452        let instrument = self.instrument_from_cache(Ustr::from(symbol))?;
2453        let ts_init = self.generate_ts_init();
2454
2455        parse_position_report(response, &instrument, ts_init)
2456    }
2457}
2458
2459#[cfg(test)]
2460mod tests {
2461    use nautilus_core::UUID4;
2462    use nautilus_model::enums::OrderStatus;
2463    use rstest::rstest;
2464    use serde_json::json;
2465
2466    use super::*;
2467
2468    fn build_report(
2469        client_order_id: &str,
2470        venue_order_id: &str,
2471        contingency_type: ContingencyType,
2472        order_list_id: Option<&str>,
2473    ) -> OrderStatusReport {
2474        let mut report = OrderStatusReport::new(
2475            AccountId::from("BITMEX-1"),
2476            InstrumentId::from("XBTUSD.BITMEX"),
2477            Some(ClientOrderId::from(client_order_id)),
2478            VenueOrderId::from(venue_order_id),
2479            OrderSide::Buy,
2480            OrderType::Limit,
2481            TimeInForce::Gtc,
2482            OrderStatus::Accepted,
2483            Quantity::new(100.0, 0),
2484            Quantity::default(),
2485            UnixNanos::from(1_u64),
2486            UnixNanos::from(1_u64),
2487            UnixNanos::from(1_u64),
2488            Some(UUID4::new()),
2489        );
2490
2491        if let Some(id) = order_list_id {
2492            report = report.with_order_list_id(OrderListId::from(id));
2493        }
2494
2495        report.with_contingency_type(contingency_type)
2496    }
2497
2498    #[rstest]
2499    fn test_sign_request_generates_correct_headers() {
2500        let client = BitmexRawHttpClient::with_credentials(
2501            "test_api_key".to_string(),
2502            "test_api_secret".to_string(),
2503            "http://localhost:8080".to_string(),
2504            Some(60),
2505            None, // max_retries
2506            None, // retry_delay_ms
2507            None, // retry_delay_max_ms
2508            None, // recv_window_ms
2509            None, // max_requests_per_second
2510            None, // max_requests_per_minute
2511            None, // proxy_url
2512        )
2513        .expect("Failed to create test client");
2514
2515        let headers = client
2516            .sign_request(&Method::GET, "/api/v1/order", None)
2517            .unwrap();
2518
2519        assert!(headers.contains_key("api-key"));
2520        assert!(headers.contains_key("api-signature"));
2521        assert!(headers.contains_key("api-expires"));
2522        assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
2523    }
2524
2525    #[rstest]
2526    fn test_sign_request_with_body() {
2527        let client = BitmexRawHttpClient::with_credentials(
2528            "test_api_key".to_string(),
2529            "test_api_secret".to_string(),
2530            "http://localhost:8080".to_string(),
2531            Some(60),
2532            None, // max_retries
2533            None, // retry_delay_ms
2534            None, // retry_delay_max_ms
2535            None, // recv_window_ms
2536            None, // max_requests_per_second
2537            None, // max_requests_per_minute
2538            None, // proxy_url
2539        )
2540        .expect("Failed to create test client");
2541
2542        let body = json!({"symbol": "XBTUSD", "orderQty": 100});
2543        let body_bytes = serde_json::to_vec(&body).unwrap();
2544
2545        let headers_without_body = client
2546            .sign_request(&Method::POST, "/api/v1/order", None)
2547            .unwrap();
2548        let headers_with_body = client
2549            .sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
2550            .unwrap();
2551
2552        // Signatures should be different when body is included
2553        assert_ne!(
2554            headers_without_body.get("api-signature").unwrap(),
2555            headers_with_body.get("api-signature").unwrap()
2556        );
2557    }
2558
2559    #[rstest]
2560    fn test_sign_request_uses_custom_recv_window() {
2561        let client_default = BitmexRawHttpClient::with_credentials(
2562            "test_api_key".to_string(),
2563            "test_api_secret".to_string(),
2564            "http://localhost:8080".to_string(),
2565            Some(60),
2566            None,
2567            None,
2568            None,
2569            None, // Use default recv_window_ms (10000ms = 10s)
2570            None, // max_requests_per_second
2571            None, // max_requests_per_minute
2572            None, // proxy_url
2573        )
2574        .expect("Failed to create test client");
2575
2576        let client_custom = BitmexRawHttpClient::with_credentials(
2577            "test_api_key".to_string(),
2578            "test_api_secret".to_string(),
2579            "http://localhost:8080".to_string(),
2580            Some(60),
2581            None,
2582            None,
2583            None,
2584            Some(30_000), // 30 seconds
2585            None,         // max_requests_per_second
2586            None,         // max_requests_per_minute
2587            None,         // proxy_url
2588        )
2589        .expect("Failed to create test client");
2590
2591        let headers_default = client_default
2592            .sign_request(&Method::GET, "/api/v1/order", None)
2593            .unwrap();
2594        let headers_custom = client_custom
2595            .sign_request(&Method::GET, "/api/v1/order", None)
2596            .unwrap();
2597
2598        // Parse expires timestamps
2599        let expires_default: i64 = headers_default.get("api-expires").unwrap().parse().unwrap();
2600        let expires_custom: i64 = headers_custom.get("api-expires").unwrap().parse().unwrap();
2601
2602        // Verify both are valid future timestamps
2603        let now = Utc::now().timestamp();
2604        assert!(expires_default > now);
2605        assert!(expires_custom > now);
2606
2607        // Custom window should be greater than default
2608        assert!(expires_custom > expires_default);
2609
2610        // The difference should be approximately 20 seconds (30s - 10s)
2611        // Allow wider tolerance for delays between calls on slow CI runners
2612        let diff = expires_custom - expires_default;
2613        assert!((18..=25).contains(&diff));
2614    }
2615
2616    #[rstest]
2617    fn test_populate_linked_order_ids_from_order_list() {
2618        let base = "O-20250922-002219-001-000";
2619        let entry = format!("{base}-1");
2620        let stop = format!("{base}-2");
2621        let take = format!("{base}-3");
2622
2623        let mut reports = vec![
2624            build_report(&entry, "V-1", ContingencyType::Oto, Some("OL-1")),
2625            build_report(&stop, "V-2", ContingencyType::Ouo, Some("OL-1")),
2626            build_report(&take, "V-3", ContingencyType::Ouo, Some("OL-1")),
2627        ];
2628
2629        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2630
2631        assert_eq!(
2632            reports[0].linked_order_ids,
2633            Some(vec![
2634                ClientOrderId::from(stop.as_str()),
2635                ClientOrderId::from(take.as_str()),
2636            ]),
2637        );
2638        assert_eq!(
2639            reports[1].linked_order_ids,
2640            Some(vec![
2641                ClientOrderId::from(entry.as_str()),
2642                ClientOrderId::from(take.as_str()),
2643            ]),
2644        );
2645        assert_eq!(
2646            reports[2].linked_order_ids,
2647            Some(vec![
2648                ClientOrderId::from(entry.as_str()),
2649                ClientOrderId::from(stop.as_str()),
2650            ]),
2651        );
2652    }
2653
2654    #[rstest]
2655    fn test_populate_linked_order_ids_from_id_prefix() {
2656        let base = "O-20250922-002220-001-000";
2657        let entry = format!("{base}-1");
2658        let stop = format!("{base}-2");
2659        let take = format!("{base}-3");
2660
2661        let mut reports = vec![
2662            build_report(&entry, "V-1", ContingencyType::Oto, None),
2663            build_report(&stop, "V-2", ContingencyType::Ouo, None),
2664            build_report(&take, "V-3", ContingencyType::Ouo, None),
2665        ];
2666
2667        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2668
2669        assert_eq!(
2670            reports[0].linked_order_ids,
2671            Some(vec![
2672                ClientOrderId::from(stop.as_str()),
2673                ClientOrderId::from(take.as_str()),
2674            ]),
2675        );
2676        assert_eq!(
2677            reports[1].linked_order_ids,
2678            Some(vec![
2679                ClientOrderId::from(entry.as_str()),
2680                ClientOrderId::from(take.as_str()),
2681            ]),
2682        );
2683        assert_eq!(
2684            reports[2].linked_order_ids,
2685            Some(vec![
2686                ClientOrderId::from(entry.as_str()),
2687                ClientOrderId::from(stop.as_str()),
2688            ]),
2689        );
2690    }
2691
2692    #[rstest]
2693    fn test_populate_linked_order_ids_respects_non_contingent_orders() {
2694        let base = "O-20250922-002221-001-000";
2695        let entry = format!("{base}-1");
2696        let passive = format!("{base}-2");
2697
2698        let mut reports = vec![
2699            build_report(&entry, "V-1", ContingencyType::NoContingency, None),
2700            build_report(&passive, "V-2", ContingencyType::Ouo, None),
2701        ];
2702
2703        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2704
2705        // Non-contingent orders should not be linked
2706        assert!(reports[0].linked_order_ids.is_none());
2707
2708        // A contingent order with no other contingent peers should have contingency reset
2709        assert!(reports[1].linked_order_ids.is_none());
2710        assert_eq!(reports[1].contingency_type, ContingencyType::NoContingency);
2711    }
2712}