1use std::{
26 collections::HashMap,
27 num::NonZeroU32,
28 sync::{
29 Arc,
30 atomic::{AtomicBool, Ordering},
31 },
32};
33
34use chrono::{DateTime, Utc};
35use dashmap::DashMap;
36use nautilus_core::{
37 UnixNanos,
38 consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
39 env::get_env_var,
40 time::get_atomic_clock_realtime,
41};
42use nautilus_model::{
43 data::{Bar, BarType, TradeTick},
44 enums::{
45 AggregationSource, BarAggregation, ContingencyType, OrderSide, OrderType, PriceType,
46 TimeInForce, TriggerType,
47 },
48 events::AccountState,
49 identifiers::{AccountId, ClientOrderId, InstrumentId, OrderListId, VenueOrderId},
50 instruments::{Instrument as InstrumentTrait, InstrumentAny},
51 reports::{FillReport, OrderStatusReport, PositionStatusReport},
52 types::{Price, Quantity},
53};
54use nautilus_network::{
55 http::HttpClient,
56 ratelimiter::quota::Quota,
57 retry::{RetryConfig, RetryManager},
58};
59use reqwest::{Method, StatusCode, header::USER_AGENT};
60use serde::{Deserialize, Serialize, de::DeserializeOwned};
61use serde_json::Value;
62use tokio_util::sync::CancellationToken;
63use ustr::Ustr;
64
65use super::{
66 error::{BitmexErrorResponse, BitmexHttpError},
67 models::{
68 BitmexApiInfo, BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder,
69 BitmexPosition, BitmexTrade, BitmexTradeBin, BitmexWallet,
70 },
71 query::{
72 DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
73 GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeBucketedParams,
74 GetTradeBucketedParamsBuilder, GetTradeParams, GetTradeParamsBuilder, PostOrderParams,
75 PostPositionLeverageParams, PutOrderParams,
76 },
77};
78use crate::{
79 common::{
80 consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL},
81 credential::Credential,
82 enums::{BitmexContingencyType, BitmexOrderStatus, BitmexSide},
83 parse::{parse_account_state, quantity_to_u32},
84 },
85 http::{
86 parse::{
87 parse_fill_report, parse_instrument_any, parse_order_status_report,
88 parse_position_report, parse_trade, parse_trade_bin,
89 },
90 query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
91 },
92 websocket::messages::BitmexMarginMsg,
93};
94
95const BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 10;
101const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED: u32 = 120;
102const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED: u32 = 30;
103
104const BITMEX_GLOBAL_RATE_KEY: &str = "bitmex:global";
105const BITMEX_MINUTE_RATE_KEY: &str = "bitmex:minute";
106
107#[derive(Debug, Serialize, Deserialize)]
109pub struct BitmexResponse<T> {
110 pub data: Vec<T>,
112}
113
114#[derive(Debug, Clone)]
134pub struct BitmexRawHttpClient {
135 base_url: String,
136 client: HttpClient,
137 credential: Option<Credential>,
138 recv_window_ms: u64,
139 retry_manager: RetryManager<BitmexHttpError>,
140 cancellation_token: CancellationToken,
141}
142
143impl Default for BitmexRawHttpClient {
144 fn default() -> Self {
145 Self::new(None, Some(60), None, None, None, None, None, None, None)
146 .expect("Failed to create default BitmexHttpInnerClient")
147 }
148}
149
150impl BitmexRawHttpClient {
151 #[allow(clippy::too_many_arguments)]
161 pub fn new(
162 base_url: Option<String>,
163 timeout_secs: Option<u64>,
164 max_retries: Option<u32>,
165 retry_delay_ms: Option<u64>,
166 retry_delay_max_ms: Option<u64>,
167 recv_window_ms: Option<u64>,
168 max_requests_per_second: Option<u32>,
169 max_requests_per_minute: Option<u32>,
170 proxy_url: Option<String>,
171 ) -> Result<Self, BitmexHttpError> {
172 let retry_config = RetryConfig {
173 max_retries: max_retries.unwrap_or(3),
174 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
175 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
176 backoff_factor: 2.0,
177 jitter_ms: 1000,
178 operation_timeout_ms: Some(60_000),
179 immediate_first: false,
180 max_elapsed_ms: Some(180_000),
181 };
182
183 let retry_manager = RetryManager::new(retry_config);
184
185 let max_req_per_sec =
186 max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
187 let max_req_per_min =
188 max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED);
189
190 Ok(Self {
191 base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
192 client: HttpClient::new(
193 Self::default_headers(),
194 vec![],
195 Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
196 Some(Self::default_quota(max_req_per_sec)),
197 timeout_secs,
198 proxy_url,
199 )
200 .map_err(|e| {
201 BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
202 })?,
203 credential: None,
204 recv_window_ms: recv_window_ms.unwrap_or(10_000),
205 retry_manager,
206 cancellation_token: CancellationToken::new(),
207 })
208 }
209
210 #[allow(clippy::too_many_arguments)]
217 pub fn with_credentials(
218 api_key: String,
219 api_secret: String,
220 base_url: String,
221 timeout_secs: Option<u64>,
222 max_retries: Option<u32>,
223 retry_delay_ms: Option<u64>,
224 retry_delay_max_ms: Option<u64>,
225 recv_window_ms: Option<u64>,
226 max_requests_per_second: Option<u32>,
227 max_requests_per_minute: Option<u32>,
228 proxy_url: Option<String>,
229 ) -> Result<Self, BitmexHttpError> {
230 let retry_config = RetryConfig {
231 max_retries: max_retries.unwrap_or(3),
232 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
233 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
234 backoff_factor: 2.0,
235 jitter_ms: 1000,
236 operation_timeout_ms: Some(60_000),
237 immediate_first: false,
238 max_elapsed_ms: Some(180_000),
239 };
240
241 let retry_manager = RetryManager::new(retry_config);
242
243 let max_req_per_sec =
244 max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
245 let max_req_per_min =
246 max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED);
247
248 Ok(Self {
249 base_url,
250 client: HttpClient::new(
251 Self::default_headers(),
252 vec![],
253 Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
254 Some(Self::default_quota(max_req_per_sec)),
255 timeout_secs,
256 proxy_url,
257 )
258 .map_err(|e| {
259 BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
260 })?,
261 credential: Some(Credential::new(api_key, api_secret)),
262 recv_window_ms: recv_window_ms.unwrap_or(10_000),
263 retry_manager,
264 cancellation_token: CancellationToken::new(),
265 })
266 }
267
268 fn default_headers() -> HashMap<String, String> {
269 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
270 }
271
272 fn default_quota(max_requests_per_second: u32) -> Quota {
273 Quota::per_second(
274 NonZeroU32::new(max_requests_per_second)
275 .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
276 )
277 }
278
279 fn rate_limiter_quotas(
280 max_requests_per_second: u32,
281 max_requests_per_minute: u32,
282 ) -> Vec<(String, Quota)> {
283 let per_sec_quota = Quota::per_second(
284 NonZeroU32::new(max_requests_per_second)
285 .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
286 );
287 let per_min_quota =
288 Quota::per_minute(NonZeroU32::new(max_requests_per_minute).unwrap_or_else(|| {
289 NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED).unwrap()
290 }));
291
292 vec![
293 (BITMEX_GLOBAL_RATE_KEY.to_string(), per_sec_quota),
294 (BITMEX_MINUTE_RATE_KEY.to_string(), per_min_quota),
295 ]
296 }
297
298 fn rate_limit_keys() -> Vec<Ustr> {
299 vec![
300 Ustr::from(BITMEX_GLOBAL_RATE_KEY),
301 Ustr::from(BITMEX_MINUTE_RATE_KEY),
302 ]
303 }
304
305 pub fn cancel_all_requests(&self) {
307 self.cancellation_token.cancel();
308 }
309
310 pub fn cancellation_token(&self) -> &CancellationToken {
312 &self.cancellation_token
313 }
314
315 fn sign_request(
316 &self,
317 method: &Method,
318 endpoint: &str,
319 body: Option<&[u8]>,
320 ) -> Result<HashMap<String, String>, BitmexHttpError> {
321 let credential = self
322 .credential
323 .as_ref()
324 .ok_or(BitmexHttpError::MissingCredentials)?;
325
326 let expires = Utc::now().timestamp() + (self.recv_window_ms / 1000) as i64;
327 let body_str = body.and_then(|b| std::str::from_utf8(b).ok()).unwrap_or("");
328
329 let full_path = if endpoint.starts_with("/api/v1") {
330 endpoint.to_string()
331 } else {
332 format!("/api/v1{endpoint}")
333 };
334
335 let signature = credential.sign(method.as_str(), &full_path, expires, body_str);
336
337 let mut headers = HashMap::new();
338 headers.insert("api-expires".to_string(), expires.to_string());
339 headers.insert("api-key".to_string(), credential.api_key.to_string());
340 headers.insert("api-signature".to_string(), signature);
341
342 if body.is_some()
344 && (*method == Method::POST || *method == Method::PUT || *method == Method::DELETE)
345 {
346 headers.insert(
347 "Content-Type".to_string(),
348 "application/x-www-form-urlencoded".to_string(),
349 );
350 }
351
352 Ok(headers)
353 }
354
355 async fn send_request<T: DeserializeOwned, P: Serialize>(
356 &self,
357 method: Method,
358 endpoint: &str,
359 params: Option<&P>,
360 body: Option<Vec<u8>>,
361 authenticate: bool,
362 ) -> Result<T, BitmexHttpError> {
363 let endpoint = endpoint.to_string();
364 let method_clone = method.clone();
365 let body_clone = body.clone();
366
367 let params_str = if method == Method::GET || method == Method::DELETE {
370 params
371 .map(serde_urlencoded::to_string)
372 .transpose()
373 .map_err(|e| {
374 BitmexHttpError::JsonError(format!("Failed to serialize params: {e}"))
375 })?
376 } else {
377 None
378 };
379
380 let full_endpoint = if let Some(ref query) = params_str {
381 if query.is_empty() {
382 endpoint.clone()
383 } else {
384 format!("{endpoint}?{query}")
385 }
386 } else {
387 endpoint.clone()
388 };
389
390 let url = format!("{}{}", self.base_url, full_endpoint);
391
392 let operation = || {
393 let url = url.clone();
394 let method = method_clone.clone();
395 let body = body_clone.clone();
396 let full_endpoint = full_endpoint.clone();
397
398 async move {
399 let headers = if authenticate {
400 Some(self.sign_request(&method, &full_endpoint, body.as_deref())?)
401 } else {
402 None
403 };
404
405 let rate_keys = Self::rate_limit_keys();
406 let resp = self
407 .client
408 .request_with_ustr_keys(method, url, None, headers, body, None, Some(rate_keys))
409 .await?;
410
411 if resp.status.is_success() {
412 serde_json::from_slice(&resp.body).map_err(Into::into)
413 } else if let Ok(error_resp) =
414 serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
415 {
416 Err(error_resp.into())
417 } else {
418 Err(BitmexHttpError::UnexpectedStatus {
419 status: StatusCode::from_u16(resp.status.as_u16())
420 .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
421 body: String::from_utf8_lossy(&resp.body).to_string(),
422 })
423 }
424 }
425 };
426
427 let should_retry = |error: &BitmexHttpError| -> bool {
444 match error {
445 BitmexHttpError::NetworkError(_) => true,
446 BitmexHttpError::UnexpectedStatus { status, .. } => {
447 status.as_u16() >= 500 || status.as_u16() == 429
448 }
449 BitmexHttpError::BitmexError {
450 error_name,
451 message,
452 } => {
453 error_name == "RateLimitError"
454 || (error_name == "HTTPError"
455 && message.to_lowercase().contains("rate limit"))
456 }
457 _ => false,
458 }
459 };
460
461 let create_error = |msg: String| -> BitmexHttpError {
462 if msg == "canceled" {
463 BitmexHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
464 } else {
465 BitmexHttpError::NetworkError(msg)
466 }
467 };
468
469 self.retry_manager
470 .execute_with_retry_with_cancel(
471 endpoint.as_str(),
472 operation,
473 should_retry,
474 create_error,
475 &self.cancellation_token,
476 )
477 .await
478 }
479
480 pub async fn get_instruments(
486 &self,
487 active_only: bool,
488 ) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
489 let path = if active_only {
490 "/instrument/active"
491 } else {
492 "/instrument"
493 };
494 self.send_request::<_, ()>(Method::GET, path, None, None, false)
495 .await
496 }
497
498 pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
508 let response: BitmexApiInfo = self
509 .send_request::<_, ()>(Method::GET, "", None, None, false)
510 .await?;
511 Ok(response.timestamp)
512 }
513
514 pub async fn get_instrument(
525 &self,
526 symbol: &str,
527 ) -> Result<Option<BitmexInstrument>, BitmexHttpError> {
528 let path = &format!("/instrument?symbol={symbol}");
529 let instruments: Vec<BitmexInstrument> = self
530 .send_request::<_, ()>(Method::GET, path, None, None, false)
531 .await?;
532
533 Ok(instruments.into_iter().next())
534 }
535
536 pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
542 let endpoint = "/user/wallet";
543 self.send_request::<_, ()>(Method::GET, endpoint, None, None, true)
544 .await
545 }
546
547 pub async fn get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
553 let path = format!("/user/margin?currency={currency}");
554 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
555 .await
556 }
557
558 pub async fn get_trades(
568 &self,
569 params: GetTradeParams,
570 ) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
571 self.send_request(Method::GET, "/trade", Some(¶ms), None, true)
572 .await
573 }
574
575 pub async fn get_trade_bucketed(
581 &self,
582 params: GetTradeBucketedParams,
583 ) -> Result<Vec<BitmexTradeBin>, BitmexHttpError> {
584 self.send_request(Method::GET, "/trade/bucketed", Some(¶ms), None, true)
585 .await
586 }
587
588 pub async fn get_orders(
598 &self,
599 params: GetOrderParams,
600 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
601 self.send_request(Method::GET, "/order", Some(¶ms), None, true)
602 .await
603 }
604
605 pub async fn place_order(&self, params: PostOrderParams) -> Result<Value, BitmexHttpError> {
615 let body = serde_urlencoded::to_string(¶ms)
617 .map_err(|e| {
618 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
619 })?
620 .into_bytes();
621 let path = "/order";
622 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
623 .await
624 }
625
626 pub async fn cancel_orders(&self, params: DeleteOrderParams) -> Result<Value, BitmexHttpError> {
636 let body = serde_urlencoded::to_string(¶ms)
638 .map_err(|e| {
639 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
640 })?
641 .into_bytes();
642 let path = "/order";
643 self.send_request::<_, ()>(Method::DELETE, path, None, Some(body), true)
644 .await
645 }
646
647 pub async fn amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
657 let body = serde_urlencoded::to_string(¶ms)
659 .map_err(|e| {
660 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
661 })?
662 .into_bytes();
663 let path = "/order";
664 self.send_request::<_, ()>(Method::PUT, path, None, Some(body), true)
665 .await
666 }
667
668 pub async fn cancel_all_orders(
682 &self,
683 params: DeleteAllOrdersParams,
684 ) -> Result<Value, BitmexHttpError> {
685 self.send_request(Method::DELETE, "/order/all", Some(¶ms), None, true)
686 .await
687 }
688
689 pub async fn get_executions(
699 &self,
700 params: GetExecutionParams,
701 ) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
702 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
703 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
704 })?;
705 let path = format!("/execution/tradeHistory?{query}");
706 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
707 .await
708 }
709
710 pub async fn get_positions(
720 &self,
721 params: GetPositionParams,
722 ) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
723 self.send_request(Method::GET, "/position", Some(¶ms), None, true)
724 .await
725 }
726
727 pub async fn update_position_leverage(
737 &self,
738 params: PostPositionLeverageParams,
739 ) -> Result<BitmexPosition, BitmexHttpError> {
740 let body = serde_urlencoded::to_string(¶ms)
742 .map_err(|e| {
743 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
744 })?
745 .into_bytes();
746 let path = "/position/leverage";
747 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
748 .await
749 }
750}
751
752#[derive(Debug)]
757#[cfg_attr(
758 feature = "python",
759 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
760)]
761pub struct BitmexHttpClient {
762 inner: Arc<BitmexRawHttpClient>,
763 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
764 cache_initialized: AtomicBool,
765}
766
767impl Clone for BitmexHttpClient {
768 fn clone(&self) -> Self {
769 let cache_initialized = AtomicBool::new(false);
770
771 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
772 if is_initialized {
773 cache_initialized.store(true, Ordering::Release);
774 }
775
776 Self {
777 inner: self.inner.clone(),
778 instruments_cache: self.instruments_cache.clone(),
779 cache_initialized,
780 }
781 }
782}
783
784impl Default for BitmexHttpClient {
785 fn default() -> Self {
786 Self::new(
787 None,
788 None,
789 None,
790 false,
791 Some(60),
792 None,
793 None,
794 None,
795 None,
796 None,
797 None,
798 None, )
800 .expect("Failed to create default BitmexHttpClient")
801 }
802}
803
804impl BitmexHttpClient {
805 #[allow(clippy::too_many_arguments)]
811 pub fn new(
812 base_url: Option<String>,
813 api_key: Option<String>,
814 api_secret: Option<String>,
815 testnet: bool,
816 timeout_secs: Option<u64>,
817 max_retries: Option<u32>,
818 retry_delay_ms: Option<u64>,
819 retry_delay_max_ms: Option<u64>,
820 recv_window_ms: Option<u64>,
821 max_requests_per_second: Option<u32>,
822 max_requests_per_minute: Option<u32>,
823 proxy_url: Option<String>,
824 ) -> Result<Self, BitmexHttpError> {
825 let url = base_url.unwrap_or_else(|| {
827 if testnet {
828 BITMEX_HTTP_TESTNET_URL.to_string()
829 } else {
830 BITMEX_HTTP_URL.to_string()
831 }
832 });
833
834 let inner = match (api_key, api_secret) {
835 (Some(key), Some(secret)) => BitmexRawHttpClient::with_credentials(
836 key,
837 secret,
838 url,
839 timeout_secs,
840 max_retries,
841 retry_delay_ms,
842 retry_delay_max_ms,
843 recv_window_ms,
844 max_requests_per_second,
845 max_requests_per_minute,
846 proxy_url,
847 )?,
848 _ => BitmexRawHttpClient::new(
849 Some(url),
850 timeout_secs,
851 max_retries,
852 retry_delay_ms,
853 retry_delay_max_ms,
854 recv_window_ms,
855 max_requests_per_second,
856 max_requests_per_minute,
857 proxy_url,
858 )?,
859 };
860
861 Ok(Self {
862 inner: Arc::new(inner),
863 instruments_cache: Arc::new(DashMap::new()),
864 cache_initialized: AtomicBool::new(false),
865 })
866 }
867
868 pub fn from_env() -> anyhow::Result<Self> {
875 Self::with_credentials(
876 None, None, None, None, None, None, None, None, None, None, None,
877 )
878 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
879 }
880
881 #[allow(clippy::too_many_arguments)]
891 pub fn with_credentials(
892 api_key: Option<String>,
893 api_secret: Option<String>,
894 base_url: Option<String>,
895 timeout_secs: Option<u64>,
896 max_retries: Option<u32>,
897 retry_delay_ms: Option<u64>,
898 retry_delay_max_ms: Option<u64>,
899 recv_window_ms: Option<u64>,
900 max_requests_per_second: Option<u32>,
901 max_requests_per_minute: Option<u32>,
902 proxy_url: Option<String>,
903 ) -> anyhow::Result<Self> {
904 let testnet = base_url.as_ref().is_some_and(|url| url.contains("testnet"));
906
907 let (key_var, secret_var) = if testnet {
909 ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
910 } else {
911 ("BITMEX_API_KEY", "BITMEX_API_SECRET")
912 };
913
914 let api_key = api_key.or_else(|| get_env_var(key_var).ok());
915 let api_secret = api_secret.or_else(|| get_env_var(secret_var).ok());
916
917 if api_key.is_some() && api_secret.is_none() {
919 anyhow::bail!("{secret_var} is required when {key_var} is provided");
920 }
921 if api_key.is_none() && api_secret.is_some() {
922 anyhow::bail!("{key_var} is required when {secret_var} is provided");
923 }
924
925 Self::new(
926 base_url,
927 api_key,
928 api_secret,
929 testnet,
930 timeout_secs,
931 max_retries,
932 retry_delay_ms,
933 retry_delay_max_ms,
934 recv_window_ms,
935 max_requests_per_second,
936 max_requests_per_minute,
937 proxy_url,
938 )
939 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
940 }
941
942 #[must_use]
944 pub fn base_url(&self) -> &str {
945 self.inner.base_url.as_str()
946 }
947
948 #[must_use]
950 pub fn api_key(&self) -> Option<&str> {
951 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
952 }
953
954 #[must_use]
956 pub fn api_key_masked(&self) -> Option<String> {
957 self.inner.credential.as_ref().map(|c| c.api_key_masked())
958 }
959
960 pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
968 self.inner.get_server_time().await
969 }
970
971 fn generate_ts_init(&self) -> UnixNanos {
973 get_atomic_clock_realtime().get_time_ns()
974 }
975
976 fn is_contingent_order(contingency_type: ContingencyType) -> bool {
978 matches!(
979 contingency_type,
980 ContingencyType::Oco | ContingencyType::Oto | ContingencyType::Ouo
981 )
982 }
983
984 fn is_parent_contingency(contingency_type: ContingencyType) -> bool {
986 matches!(
987 contingency_type,
988 ContingencyType::Oco | ContingencyType::Oto
989 )
990 }
991
992 fn populate_linked_order_ids(reports: &mut [OrderStatusReport]) {
994 let mut order_list_groups: HashMap<OrderListId, Vec<ClientOrderId>> = HashMap::new();
995 let mut order_list_parents: HashMap<OrderListId, ClientOrderId> = HashMap::new();
996 let mut prefix_groups: HashMap<String, Vec<ClientOrderId>> = HashMap::new();
997 let mut prefix_parents: HashMap<String, ClientOrderId> = HashMap::new();
998
999 for report in reports.iter() {
1000 let Some(client_order_id) = report.client_order_id else {
1001 continue;
1002 };
1003
1004 if let Some(order_list_id) = report.order_list_id {
1005 order_list_groups
1006 .entry(order_list_id)
1007 .or_default()
1008 .push(client_order_id);
1009
1010 if Self::is_parent_contingency(report.contingency_type) {
1011 order_list_parents
1012 .entry(order_list_id)
1013 .or_insert(client_order_id);
1014 }
1015 }
1016
1017 if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1018 && Self::is_contingent_order(report.contingency_type)
1019 {
1020 prefix_groups
1021 .entry(base.to_owned())
1022 .or_default()
1023 .push(client_order_id);
1024
1025 if Self::is_parent_contingency(report.contingency_type) {
1026 prefix_parents
1027 .entry(base.to_owned())
1028 .or_insert(client_order_id);
1029 }
1030 }
1031 }
1032
1033 for report in reports.iter_mut() {
1034 let Some(client_order_id) = report.client_order_id else {
1035 continue;
1036 };
1037
1038 if report.linked_order_ids.is_some() {
1039 continue;
1040 }
1041
1042 if !Self::is_contingent_order(report.contingency_type) {
1044 continue;
1045 }
1046
1047 if let Some(order_list_id) = report.order_list_id
1048 && let Some(group) = order_list_groups.get(&order_list_id)
1049 {
1050 let mut linked: Vec<ClientOrderId> = group
1051 .iter()
1052 .copied()
1053 .filter(|candidate| candidate != &client_order_id)
1054 .collect();
1055
1056 if !linked.is_empty() {
1057 if let Some(parent_id) = order_list_parents.get(&order_list_id) {
1058 if client_order_id != *parent_id {
1059 linked.sort_by_key(
1060 |candidate| {
1061 if candidate == parent_id { 0 } else { 1 }
1062 },
1063 );
1064 report.parent_order_id = Some(*parent_id);
1065 } else {
1066 report.parent_order_id = None;
1067 }
1068 } else {
1069 report.parent_order_id = None;
1070 }
1071
1072 tracing::trace!(
1073 client_order_id = ?client_order_id,
1074 order_list_id = ?order_list_id,
1075 contingency_type = ?report.contingency_type,
1076 linked_order_ids = ?linked,
1077 "BitMEX linked ids sourced from order list id",
1078 );
1079 report.linked_order_ids = Some(linked);
1080 continue;
1081 }
1082
1083 tracing::trace!(
1084 client_order_id = ?client_order_id,
1085 order_list_id = ?order_list_id,
1086 contingency_type = ?report.contingency_type,
1087 order_list_group = ?group,
1088 "BitMEX order list id group had no peers",
1089 );
1090 report.parent_order_id = None;
1091 } else if report.order_list_id.is_none() {
1092 report.parent_order_id = None;
1093 }
1094
1095 if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1096 && let Some(group) = prefix_groups.get(base)
1097 {
1098 let mut linked: Vec<ClientOrderId> = group
1099 .iter()
1100 .copied()
1101 .filter(|candidate| candidate != &client_order_id)
1102 .collect();
1103
1104 if !linked.is_empty() {
1105 if let Some(parent_id) = prefix_parents.get(base) {
1106 if client_order_id != *parent_id {
1107 linked.sort_by_key(
1108 |candidate| {
1109 if candidate == parent_id { 0 } else { 1 }
1110 },
1111 );
1112 report.parent_order_id = Some(*parent_id);
1113 } else {
1114 report.parent_order_id = None;
1115 }
1116 } else {
1117 report.parent_order_id = None;
1118 }
1119
1120 tracing::trace!(
1121 client_order_id = ?client_order_id,
1122 contingency_type = ?report.contingency_type,
1123 base = base,
1124 linked_order_ids = ?linked,
1125 "BitMEX linked ids constructed from client order id prefix",
1126 );
1127 report.linked_order_ids = Some(linked);
1128 continue;
1129 }
1130
1131 tracing::trace!(
1132 client_order_id = ?client_order_id,
1133 contingency_type = ?report.contingency_type,
1134 base = base,
1135 prefix_group = ?group,
1136 "BitMEX client order id prefix group had no peers",
1137 );
1138 report.parent_order_id = None;
1139 } else if client_order_id.as_str().contains('-') {
1140 report.parent_order_id = None;
1141 }
1142
1143 if Self::is_contingent_order(report.contingency_type) {
1144 tracing::warn!(
1145 client_order_id = ?report.client_order_id,
1146 order_list_id = ?report.order_list_id,
1147 contingency_type = ?report.contingency_type,
1148 "BitMEX order status report missing linked ids after grouping",
1149 );
1150 report.contingency_type = ContingencyType::NoContingency;
1151 report.parent_order_id = None;
1152 }
1153
1154 report.linked_order_ids = None;
1155 }
1156 }
1157
1158 pub fn cancel_all_requests(&self) {
1160 self.inner.cancel_all_requests();
1161 }
1162
1163 pub fn cancellation_token(&self) -> CancellationToken {
1165 self.inner.cancellation_token().clone()
1166 }
1167
1168 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1172 self.instruments_cache
1173 .insert(instrument.raw_symbol().inner(), instrument);
1174 self.cache_initialized.store(true, Ordering::Release);
1175 }
1176
1177 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1179 self.instruments_cache
1180 .get(symbol)
1181 .map(|entry| entry.value().clone())
1182 }
1183
1184 pub async fn request_instrument(
1192 &self,
1193 instrument_id: InstrumentId,
1194 ) -> anyhow::Result<Option<InstrumentAny>> {
1195 let response = self
1196 .inner
1197 .get_instrument(instrument_id.symbol.as_str())
1198 .await?;
1199
1200 let instrument = match response {
1201 Some(instrument) => instrument,
1202 None => return Ok(None),
1203 };
1204
1205 let ts_init = self.generate_ts_init();
1206
1207 Ok(parse_instrument_any(&instrument, ts_init))
1208 }
1209
1210 pub async fn request_instruments(
1216 &self,
1217 active_only: bool,
1218 ) -> anyhow::Result<Vec<InstrumentAny>> {
1219 let instruments = self.inner.get_instruments(active_only).await?;
1220 let ts_init = self.generate_ts_init();
1221
1222 let mut parsed_instruments = Vec::new();
1223 let mut failed_count = 0;
1224 let total_count = instruments.len();
1225
1226 for inst in instruments {
1227 if let Some(instrument_any) = parse_instrument_any(&inst, ts_init) {
1228 parsed_instruments.push(instrument_any);
1229 } else {
1230 failed_count += 1;
1231 tracing::error!(
1232 "Failed to parse instrument: symbol={}, type={:?}, state={:?} - instrument will not be cached",
1233 inst.symbol,
1234 inst.instrument_type,
1235 inst.state
1236 );
1237 }
1238 }
1239
1240 if failed_count > 0 {
1241 tracing::error!(
1242 "Instrument parse failures: {} failed out of {} total ({} successfully parsed)",
1243 failed_count,
1244 total_count,
1245 parsed_instruments.len()
1246 );
1247 }
1248
1249 Ok(parsed_instruments)
1250 }
1251
1252 pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
1262 let inner = self.inner.clone();
1263 inner.get_wallet().await
1264 }
1265
1266 pub async fn get_orders(
1276 &self,
1277 params: GetOrderParams,
1278 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
1279 let inner = self.inner.clone();
1280 inner.get_orders(params).await
1281 }
1282
1283 fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1289 self.get_instrument(&symbol).ok_or_else(|| {
1290 anyhow::anyhow!(
1291 "Instrument {symbol} not found in cache, ensure instruments loaded first"
1292 )
1293 })
1294 }
1295
1296 pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
1303 self.instrument_from_cache(symbol)
1304 .map(|instrument| instrument.price_precision())
1305 }
1306
1307 pub async fn get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
1313 self.inner
1314 .get_margin(currency)
1315 .await
1316 .map_err(|e| anyhow::anyhow!(e))
1317 }
1318
1319 pub async fn request_account_state(
1325 &self,
1326 account_id: AccountId,
1327 ) -> anyhow::Result<AccountState> {
1328 let margin = self
1330 .inner
1331 .get_margin("XBt")
1332 .await
1333 .map_err(|e| anyhow::anyhow!(e))?;
1334
1335 let ts_init = nautilus_core::nanos::UnixNanos::from(
1336 chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64,
1337 );
1338
1339 let margin_msg = BitmexMarginMsg {
1341 account: margin.account,
1342 currency: margin.currency,
1343 risk_limit: margin.risk_limit,
1344 amount: margin.amount,
1345 prev_realised_pnl: margin.prev_realised_pnl,
1346 gross_comm: margin.gross_comm,
1347 gross_open_cost: margin.gross_open_cost,
1348 gross_open_premium: margin.gross_open_premium,
1349 gross_exec_cost: margin.gross_exec_cost,
1350 gross_mark_value: margin.gross_mark_value,
1351 risk_value: margin.risk_value,
1352 init_margin: margin.init_margin,
1353 maint_margin: margin.maint_margin,
1354 target_excess_margin: margin.target_excess_margin,
1355 realised_pnl: margin.realised_pnl,
1356 unrealised_pnl: margin.unrealised_pnl,
1357 wallet_balance: margin.wallet_balance,
1358 margin_balance: margin.margin_balance,
1359 margin_leverage: margin.margin_leverage,
1360 margin_used_pcnt: margin.margin_used_pcnt,
1361 excess_margin: margin.excess_margin,
1362 available_margin: margin.available_margin,
1363 withdrawable_margin: margin.withdrawable_margin,
1364 maker_fee_discount: None, taker_fee_discount: None, timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
1367 foreign_margin_balance: None,
1368 foreign_requirement: None,
1369 };
1370
1371 parse_account_state(&margin_msg, account_id, ts_init)
1372 }
1373
1374 #[allow(clippy::too_many_arguments)]
1381 pub async fn submit_order(
1382 &self,
1383 instrument_id: InstrumentId,
1384 client_order_id: ClientOrderId,
1385 order_side: OrderSide,
1386 order_type: OrderType,
1387 quantity: Quantity,
1388 time_in_force: TimeInForce,
1389 price: Option<Price>,
1390 trigger_price: Option<Price>,
1391 trigger_type: Option<TriggerType>,
1392 display_qty: Option<Quantity>,
1393 post_only: bool,
1394 reduce_only: bool,
1395 order_list_id: Option<OrderListId>,
1396 contingency_type: Option<ContingencyType>,
1397 ) -> anyhow::Result<OrderStatusReport> {
1398 use crate::common::enums::{
1399 BitmexExecInstruction, BitmexOrderType, BitmexSide, BitmexTimeInForce,
1400 };
1401
1402 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1403
1404 let mut params = super::query::PostOrderParamsBuilder::default();
1405 params.text(NAUTILUS_TRADER);
1406 params.symbol(instrument_id.symbol.as_str());
1407 params.cl_ord_id(client_order_id.as_str());
1408
1409 let side = BitmexSide::try_from_order_side(order_side)?;
1410 params.side(side);
1411
1412 let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
1413 params.ord_type(ord_type);
1414
1415 params.order_qty(quantity_to_u32(&quantity, &instrument));
1416
1417 let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
1418 params.time_in_force(tif);
1419
1420 if let Some(price) = price {
1421 params.price(price.as_f64());
1422 }
1423
1424 if let Some(trigger_price) = trigger_price {
1425 params.stop_px(trigger_price.as_f64());
1426 }
1427
1428 if let Some(display_qty) = display_qty {
1429 params.display_qty(quantity_to_u32(&display_qty, &instrument));
1430 }
1431
1432 if let Some(order_list_id) = order_list_id {
1433 params.cl_ord_link_id(order_list_id.as_str());
1434 }
1435
1436 let mut exec_inst = Vec::new();
1437
1438 if post_only {
1439 exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
1440 }
1441
1442 if reduce_only {
1443 exec_inst.push(BitmexExecInstruction::ReduceOnly);
1444 }
1445
1446 if trigger_price.is_some()
1447 && let Some(trigger_type) = trigger_type
1448 {
1449 match trigger_type {
1450 TriggerType::LastPrice => exec_inst.push(BitmexExecInstruction::LastPrice),
1451 TriggerType::MarkPrice => exec_inst.push(BitmexExecInstruction::MarkPrice),
1452 TriggerType::IndexPrice => exec_inst.push(BitmexExecInstruction::IndexPrice),
1453 _ => {} }
1455 }
1456
1457 if !exec_inst.is_empty() {
1458 params.exec_inst(exec_inst);
1459 }
1460
1461 if let Some(contingency_type) = contingency_type {
1462 let bitmex_contingency = BitmexContingencyType::try_from(contingency_type)?;
1463 params.contingency_type(bitmex_contingency);
1464 }
1465
1466 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1467
1468 let response = self.inner.place_order(params).await?;
1469
1470 let order: BitmexOrder = serde_json::from_value(response)?;
1471
1472 if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1473 let reason = order
1474 .ord_rej_reason
1475 .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1476 anyhow::bail!("Order rejected: {reason}");
1477 }
1478
1479 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1480 let ts_init = self.generate_ts_init();
1481
1482 parse_order_status_report(&order, &instrument, ts_init)
1483 }
1484
1485 pub async fn cancel_order(
1495 &self,
1496 instrument_id: InstrumentId,
1497 client_order_id: Option<ClientOrderId>,
1498 venue_order_id: Option<VenueOrderId>,
1499 ) -> anyhow::Result<OrderStatusReport> {
1500 let mut params = super::query::DeleteOrderParamsBuilder::default();
1501 params.text(NAUTILUS_TRADER);
1502
1503 if let Some(venue_order_id) = venue_order_id {
1504 params.order_id(vec![venue_order_id.as_str().to_string()]);
1505 } else if let Some(client_order_id) = client_order_id {
1506 params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
1507 } else {
1508 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1509 }
1510
1511 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1512
1513 let response = self.inner.cancel_orders(params).await?;
1514
1515 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1516 let order = orders
1517 .into_iter()
1518 .next()
1519 .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1520
1521 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1522 let ts_init = self.generate_ts_init();
1523
1524 parse_order_status_report(&order, &instrument, ts_init)
1525 }
1526
1527 pub async fn cancel_orders(
1537 &self,
1538 instrument_id: InstrumentId,
1539 client_order_ids: Option<Vec<ClientOrderId>>,
1540 venue_order_ids: Option<Vec<VenueOrderId>>,
1541 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1542 let mut params = super::query::DeleteOrderParamsBuilder::default();
1543 params.text(NAUTILUS_TRADER);
1544
1545 if let Some(venue_order_ids) = venue_order_ids {
1548 if venue_order_ids.is_empty() {
1549 anyhow::bail!("venue_order_ids cannot be empty");
1550 }
1551 params.order_id(
1552 venue_order_ids
1553 .iter()
1554 .map(|id| id.to_string())
1555 .collect::<Vec<_>>(),
1556 );
1557 } else if let Some(client_order_ids) = client_order_ids {
1558 if client_order_ids.is_empty() {
1559 anyhow::bail!("client_order_ids cannot be empty");
1560 }
1561 params.cl_ord_id(
1562 client_order_ids
1563 .iter()
1564 .map(|id| id.to_string())
1565 .collect::<Vec<_>>(),
1566 );
1567 } else {
1568 anyhow::bail!("Either client_order_ids or venue_order_ids must be provided");
1569 }
1570
1571 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1572
1573 let response = self.inner.cancel_orders(params).await?;
1574
1575 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1576
1577 let ts_init = self.generate_ts_init();
1578 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1579
1580 let mut reports = Vec::new();
1581
1582 for order in orders {
1583 reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1584 }
1585
1586 Self::populate_linked_order_ids(&mut reports);
1587
1588 Ok(reports)
1589 }
1590
1591 pub async fn cancel_all_orders(
1601 &self,
1602 instrument_id: InstrumentId,
1603 order_side: Option<OrderSide>,
1604 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1605 let mut params = DeleteAllOrdersParamsBuilder::default();
1606 params.text(NAUTILUS_TRADER);
1607 params.symbol(instrument_id.symbol.as_str());
1608
1609 if let Some(side) = order_side {
1610 let side = BitmexSide::try_from_order_side(side)?;
1611 params.filter(serde_json::json!({
1612 "side": side
1613 }));
1614 }
1615
1616 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1617
1618 let response = self.inner.cancel_all_orders(params).await?;
1619
1620 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1621
1622 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1623 let ts_init = self.generate_ts_init();
1624
1625 let mut reports = Vec::new();
1626
1627 for order in orders {
1628 reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1629 }
1630
1631 Self::populate_linked_order_ids(&mut reports);
1632
1633 Ok(reports)
1634 }
1635
1636 pub async fn modify_order(
1647 &self,
1648 instrument_id: InstrumentId,
1649 client_order_id: Option<ClientOrderId>,
1650 venue_order_id: Option<VenueOrderId>,
1651 quantity: Option<Quantity>,
1652 price: Option<Price>,
1653 trigger_price: Option<Price>,
1654 ) -> anyhow::Result<OrderStatusReport> {
1655 let mut params = PutOrderParamsBuilder::default();
1656 params.text(NAUTILUS_TRADER);
1657
1658 if let Some(venue_order_id) = venue_order_id {
1660 params.order_id(venue_order_id.as_str());
1661 } else if let Some(client_order_id) = client_order_id {
1662 params.orig_cl_ord_id(client_order_id.as_str());
1663 } else {
1664 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1665 }
1666
1667 if let Some(quantity) = quantity {
1668 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1669 params.order_qty(quantity_to_u32(&quantity, &instrument));
1670 }
1671
1672 if let Some(price) = price {
1673 params.price(price.as_f64());
1674 }
1675
1676 if let Some(trigger_price) = trigger_price {
1677 params.stop_px(trigger_price.as_f64());
1678 }
1679
1680 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1681
1682 let response = self.inner.amend_order(params).await?;
1683
1684 let order: BitmexOrder = serde_json::from_value(response)?;
1685
1686 if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1687 let reason = order
1688 .ord_rej_reason
1689 .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1690 anyhow::bail!("Order modification rejected: {reason}");
1691 }
1692
1693 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1694 let ts_init = self.generate_ts_init();
1695
1696 parse_order_status_report(&order, &instrument, ts_init)
1697 }
1698
1699 pub async fn query_order(
1708 &self,
1709 instrument_id: InstrumentId,
1710 client_order_id: Option<ClientOrderId>,
1711 venue_order_id: Option<VenueOrderId>,
1712 ) -> anyhow::Result<Option<OrderStatusReport>> {
1713 let mut params = GetOrderParamsBuilder::default();
1714
1715 let filter_json = if let Some(client_order_id) = client_order_id {
1716 serde_json::json!({
1717 "clOrdID": client_order_id.to_string()
1718 })
1719 } else if let Some(venue_order_id) = venue_order_id {
1720 serde_json::json!({
1721 "orderID": venue_order_id.to_string()
1722 })
1723 } else {
1724 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1725 };
1726
1727 params.filter(filter_json);
1728 params.count(1); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1731
1732 let response = self.inner.get_orders(params).await?;
1733
1734 if response.is_empty() {
1735 return Ok(None);
1736 }
1737
1738 let order = &response[0];
1739
1740 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1741 let ts_init = self.generate_ts_init();
1742
1743 let report = parse_order_status_report(order, &instrument, ts_init)?;
1744
1745 Ok(Some(report))
1746 }
1747
1748 pub async fn request_order_status_report(
1757 &self,
1758 instrument_id: InstrumentId,
1759 client_order_id: Option<ClientOrderId>,
1760 venue_order_id: Option<VenueOrderId>,
1761 ) -> anyhow::Result<OrderStatusReport> {
1762 let mut params = GetOrderParamsBuilder::default();
1763 params.symbol(instrument_id.symbol.as_str());
1764
1765 if let Some(venue_order_id) = venue_order_id {
1766 params.filter(serde_json::json!({
1767 "orderID": venue_order_id.as_str()
1768 }));
1769 } else if let Some(client_order_id) = client_order_id {
1770 params.filter(serde_json::json!({
1771 "clOrdID": client_order_id.as_str()
1772 }));
1773 }
1774
1775 params.count(1i32);
1776 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1777
1778 let response = self.inner.get_orders(params).await?;
1779
1780 let order = response
1781 .into_iter()
1782 .next()
1783 .ok_or_else(|| anyhow::anyhow!("Order not found"))?;
1784
1785 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1786 let ts_init = self.generate_ts_init();
1787
1788 parse_order_status_report(&order, &instrument, ts_init)
1789 }
1790
1791 pub async fn request_order_status_reports(
1800 &self,
1801 instrument_id: Option<InstrumentId>,
1802 open_only: bool,
1803 limit: Option<u32>,
1804 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1805 let mut params = GetOrderParamsBuilder::default();
1806
1807 if let Some(instrument_id) = &instrument_id {
1808 params.symbol(instrument_id.symbol.as_str());
1809 }
1810
1811 if open_only {
1812 params.filter(serde_json::json!({
1813 "open": true
1814 }));
1815 }
1816
1817 if let Some(limit) = limit {
1818 params.count(limit as i32);
1819 } else {
1820 params.count(500); }
1822
1823 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1826
1827 let response = self.inner.get_orders(params).await?;
1828
1829 let ts_init = self.generate_ts_init();
1830
1831 let mut reports = Vec::new();
1832
1833 for order in response {
1834 let Some(symbol) = order.symbol else {
1836 tracing::warn!("Order response missing symbol, skipping");
1837 continue;
1838 };
1839
1840 let Ok(instrument) = self.instrument_from_cache(symbol) else {
1841 tracing::debug!(
1842 symbol = %symbol,
1843 "Skipping order report for instrument not in cache"
1844 );
1845 continue;
1846 };
1847
1848 match parse_order_status_report(&order, &instrument, ts_init) {
1849 Ok(report) => reports.push(report),
1850 Err(e) => tracing::error!("Failed to parse order status report: {e}"),
1851 }
1852 }
1853
1854 Self::populate_linked_order_ids(&mut reports);
1855
1856 Ok(reports)
1857 }
1858
1859 pub async fn request_trades(
1865 &self,
1866 instrument_id: InstrumentId,
1867 start: Option<DateTime<Utc>>,
1868 end: Option<DateTime<Utc>>,
1869 limit: Option<u32>,
1870 ) -> anyhow::Result<Vec<TradeTick>> {
1871 let mut params = GetTradeParamsBuilder::default();
1872 params.symbol(instrument_id.symbol.as_str());
1873
1874 if let Some(start) = start {
1875 params.start_time(start);
1876 }
1877
1878 if let Some(end) = end {
1879 params.end_time(end);
1880 }
1881
1882 if let (Some(start), Some(end)) = (start, end) {
1883 anyhow::ensure!(
1884 start < end,
1885 "Invalid time range: start={start:?} end={end:?}",
1886 );
1887 }
1888
1889 if let Some(limit) = limit {
1890 let clamped_limit = limit.min(1000);
1891 if limit > 1000 {
1892 tracing::warn!(
1893 limit,
1894 clamped_limit,
1895 "BitMEX trade request limit exceeds venue maximum; clamping",
1896 );
1897 }
1898 params.count(i32::try_from(clamped_limit).unwrap_or(1000));
1899 }
1900 params.reverse(false);
1901 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1902
1903 let response = self.inner.get_trades(params).await?;
1904
1905 let ts_init = self.generate_ts_init();
1906
1907 let mut parsed_trades = Vec::new();
1908
1909 for trade in response {
1910 if let Some(start) = start
1911 && trade.timestamp < start
1912 {
1913 continue;
1914 }
1915
1916 if let Some(end) = end
1917 && trade.timestamp > end
1918 {
1919 continue;
1920 }
1921
1922 let price_precision = self.get_price_precision(trade.symbol)?;
1923
1924 match parse_trade(trade, price_precision, ts_init) {
1925 Ok(trade) => parsed_trades.push(trade),
1926 Err(e) => tracing::error!("Failed to parse trade: {e}"),
1927 }
1928 }
1929
1930 Ok(parsed_trades)
1931 }
1932
1933 pub async fn request_bars(
1940 &self,
1941 mut bar_type: BarType,
1942 start: Option<DateTime<Utc>>,
1943 end: Option<DateTime<Utc>>,
1944 limit: Option<u32>,
1945 partial: bool,
1946 ) -> anyhow::Result<Vec<Bar>> {
1947 bar_type = bar_type.standard();
1948
1949 anyhow::ensure!(
1950 bar_type.aggregation_source() == AggregationSource::External,
1951 "Only EXTERNAL aggregation bars are supported"
1952 );
1953 anyhow::ensure!(
1954 bar_type.spec().price_type == PriceType::Last,
1955 "Only LAST price type bars are supported"
1956 );
1957 if let (Some(start), Some(end)) = (start, end) {
1958 anyhow::ensure!(
1959 start < end,
1960 "Invalid time range: start={start:?} end={end:?}"
1961 );
1962 }
1963
1964 let spec = bar_type.spec();
1965 let bin_size = match (spec.aggregation, spec.step.get()) {
1966 (BarAggregation::Minute, 1) => "1m",
1967 (BarAggregation::Minute, 5) => "5m",
1968 (BarAggregation::Hour, 1) => "1h",
1969 (BarAggregation::Day, 1) => "1d",
1970 _ => anyhow::bail!(
1971 "BitMEX does not support {}-{:?}-{:?} bars",
1972 spec.step.get(),
1973 spec.aggregation,
1974 spec.price_type,
1975 ),
1976 };
1977
1978 let instrument_id = bar_type.instrument_id();
1979 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1980
1981 let mut params = GetTradeBucketedParamsBuilder::default();
1982 params.symbol(instrument_id.symbol.as_str());
1983 params.bin_size(bin_size);
1984 if partial {
1985 params.partial(true);
1986 }
1987 if let Some(start) = start {
1988 params.start_time(start);
1989 }
1990 if let Some(end) = end {
1991 params.end_time(end);
1992 }
1993 if let Some(limit) = limit {
1994 let clamped_limit = limit.min(1000);
1995 if limit > 1000 {
1996 tracing::warn!(
1997 limit,
1998 clamped_limit,
1999 "BitMEX bar request limit exceeds venue maximum; clamping",
2000 );
2001 }
2002 params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2003 }
2004 params.reverse(false);
2005 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2006
2007 let response = self.inner.get_trade_bucketed(params).await?;
2008 let ts_init = self.generate_ts_init();
2009 let mut bars = Vec::new();
2010
2011 for bin in response {
2012 if let Some(start) = start
2013 && bin.timestamp < start
2014 {
2015 continue;
2016 }
2017 if let Some(end) = end
2018 && bin.timestamp > end
2019 {
2020 continue;
2021 }
2022 if bin.symbol != instrument_id.symbol.inner() {
2023 tracing::warn!(
2024 symbol = %bin.symbol,
2025 expected = %instrument_id.symbol,
2026 "Skipping trade bin for unexpected symbol",
2027 );
2028 continue;
2029 }
2030
2031 match parse_trade_bin(bin, &instrument, &bar_type, ts_init) {
2032 Ok(bar) => bars.push(bar),
2033 Err(e) => tracing::warn!("Failed to parse trade bin: {e}"),
2034 }
2035 }
2036
2037 Ok(bars)
2038 }
2039
2040 pub async fn request_fill_reports(
2046 &self,
2047 instrument_id: Option<InstrumentId>,
2048 limit: Option<u32>,
2049 ) -> anyhow::Result<Vec<FillReport>> {
2050 let mut params = GetExecutionParamsBuilder::default();
2051 if let Some(instrument_id) = instrument_id {
2052 params.symbol(instrument_id.symbol.as_str());
2053 }
2054 if let Some(limit) = limit {
2055 params.count(limit as i32);
2056 } else {
2057 params.count(500); }
2059 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2062
2063 let response = self.inner.get_executions(params).await?;
2064
2065 let ts_init = self.generate_ts_init();
2066
2067 let mut reports = Vec::new();
2068
2069 for exec in response {
2070 let Some(symbol) = exec.symbol else {
2072 tracing::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
2073 continue;
2074 };
2075 let symbol_str = symbol.to_string();
2076
2077 let instrument = match self.instrument_from_cache(symbol) {
2078 Ok(instrument) => instrument,
2079 Err(e) => {
2080 tracing::error!(symbol = %symbol_str, "Instrument not found in cache for execution parsing: {e}");
2081 continue;
2082 }
2083 };
2084
2085 match parse_fill_report(exec, &instrument, ts_init) {
2086 Ok(report) => reports.push(report),
2087 Err(e) => {
2088 let error_msg = e.to_string();
2090 if error_msg.starts_with("Skipping non-trade execution")
2091 || error_msg.starts_with("Skipping execution without order_id")
2092 {
2093 tracing::debug!("{e}");
2094 } else {
2095 tracing::error!("Failed to parse fill report: {e}");
2096 }
2097 }
2098 }
2099 }
2100
2101 Ok(reports)
2102 }
2103
2104 pub async fn request_position_status_reports(
2110 &self,
2111 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2112 let params = GetPositionParamsBuilder::default()
2113 .count(500) .build()
2115 .map_err(|e| anyhow::anyhow!(e))?;
2116
2117 let response = self.inner.get_positions(params).await?;
2118
2119 let ts_init = self.generate_ts_init();
2120
2121 let mut reports = Vec::new();
2122
2123 for pos in response {
2124 let symbol = pos.symbol;
2125 let instrument = match self.instrument_from_cache(symbol) {
2126 Ok(instrument) => instrument,
2127 Err(e) => {
2128 tracing::error!(
2129 symbol = pos.symbol.as_str(),
2130 "Instrument not found in cache for position parsing: {e}"
2131 );
2132 continue;
2133 }
2134 };
2135
2136 match parse_position_report(pos, &instrument, ts_init) {
2137 Ok(report) => reports.push(report),
2138 Err(e) => tracing::error!("Failed to parse position report: {e}"),
2139 }
2140 }
2141
2142 Ok(reports)
2143 }
2144
2145 pub async fn update_position_leverage(
2153 &self,
2154 symbol: &str,
2155 leverage: f64,
2156 ) -> anyhow::Result<PositionStatusReport> {
2157 let params = PostPositionLeverageParams {
2158 symbol: symbol.to_string(),
2159 leverage,
2160 target_account_id: None,
2161 };
2162
2163 let response = self.inner.update_position_leverage(params).await?;
2164
2165 let instrument = self.instrument_from_cache(Ustr::from(symbol))?;
2166 let ts_init = self.generate_ts_init();
2167
2168 parse_position_report(response, &instrument, ts_init)
2169 }
2170}
2171
2172#[cfg(test)]
2177mod tests {
2178 use nautilus_core::UUID4;
2179 use nautilus_model::enums::OrderStatus;
2180 use rstest::rstest;
2181 use serde_json::json;
2182
2183 use super::*;
2184
2185 fn build_report(
2186 client_order_id: &str,
2187 venue_order_id: &str,
2188 contingency_type: ContingencyType,
2189 order_list_id: Option<&str>,
2190 ) -> OrderStatusReport {
2191 let mut report = OrderStatusReport::new(
2192 AccountId::from("BITMEX-1"),
2193 InstrumentId::from("XBTUSD.BITMEX"),
2194 Some(ClientOrderId::from(client_order_id)),
2195 VenueOrderId::from(venue_order_id),
2196 OrderSide::Buy,
2197 OrderType::Limit,
2198 TimeInForce::Gtc,
2199 OrderStatus::Accepted,
2200 Quantity::new(100.0, 0),
2201 Quantity::default(),
2202 UnixNanos::from(1_u64),
2203 UnixNanos::from(1_u64),
2204 UnixNanos::from(1_u64),
2205 Some(UUID4::new()),
2206 );
2207
2208 if let Some(id) = order_list_id {
2209 report = report.with_order_list_id(OrderListId::from(id));
2210 }
2211
2212 report.with_contingency_type(contingency_type)
2213 }
2214
2215 #[rstest]
2216 fn test_sign_request_generates_correct_headers() {
2217 let client = BitmexRawHttpClient::with_credentials(
2218 "test_api_key".to_string(),
2219 "test_api_secret".to_string(),
2220 "http://localhost:8080".to_string(),
2221 Some(60),
2222 None, None, None, None, None, None, None, )
2230 .expect("Failed to create test client");
2231
2232 let headers = client
2233 .sign_request(&Method::GET, "/api/v1/order", None)
2234 .unwrap();
2235
2236 assert!(headers.contains_key("api-key"));
2237 assert!(headers.contains_key("api-signature"));
2238 assert!(headers.contains_key("api-expires"));
2239 assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
2240 }
2241
2242 #[rstest]
2243 fn test_sign_request_with_body() {
2244 let client = BitmexRawHttpClient::with_credentials(
2245 "test_api_key".to_string(),
2246 "test_api_secret".to_string(),
2247 "http://localhost:8080".to_string(),
2248 Some(60),
2249 None, None, None, None, None, None, None, )
2257 .expect("Failed to create test client");
2258
2259 let body = json!({"symbol": "XBTUSD", "orderQty": 100});
2260 let body_bytes = serde_json::to_vec(&body).unwrap();
2261
2262 let headers_without_body = client
2263 .sign_request(&Method::POST, "/api/v1/order", None)
2264 .unwrap();
2265 let headers_with_body = client
2266 .sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
2267 .unwrap();
2268
2269 assert_ne!(
2271 headers_without_body.get("api-signature").unwrap(),
2272 headers_with_body.get("api-signature").unwrap()
2273 );
2274 }
2275
2276 #[rstest]
2277 fn test_sign_request_uses_custom_recv_window() {
2278 let client_default = BitmexRawHttpClient::with_credentials(
2279 "test_api_key".to_string(),
2280 "test_api_secret".to_string(),
2281 "http://localhost:8080".to_string(),
2282 Some(60),
2283 None,
2284 None,
2285 None,
2286 None, None, None, None, )
2291 .expect("Failed to create test client");
2292
2293 let client_custom = BitmexRawHttpClient::with_credentials(
2294 "test_api_key".to_string(),
2295 "test_api_secret".to_string(),
2296 "http://localhost:8080".to_string(),
2297 Some(60),
2298 None,
2299 None,
2300 None,
2301 Some(30_000), None, None, None, )
2306 .expect("Failed to create test client");
2307
2308 let headers_default = client_default
2309 .sign_request(&Method::GET, "/api/v1/order", None)
2310 .unwrap();
2311 let headers_custom = client_custom
2312 .sign_request(&Method::GET, "/api/v1/order", None)
2313 .unwrap();
2314
2315 let expires_default: i64 = headers_default.get("api-expires").unwrap().parse().unwrap();
2317 let expires_custom: i64 = headers_custom.get("api-expires").unwrap().parse().unwrap();
2318
2319 let now = Utc::now().timestamp();
2321 assert!(expires_default > now);
2322 assert!(expires_custom > now);
2323
2324 assert!(expires_custom > expires_default);
2326
2327 let diff = expires_custom - expires_default;
2330 assert!((18..=25).contains(&diff));
2331 }
2332
2333 #[rstest]
2334 fn test_populate_linked_order_ids_from_order_list() {
2335 let base = "O-20250922-002219-001-000";
2336 let entry = format!("{base}-1");
2337 let stop = format!("{base}-2");
2338 let take = format!("{base}-3");
2339
2340 let mut reports = vec![
2341 build_report(&entry, "V-1", ContingencyType::Oto, Some("OL-1")),
2342 build_report(&stop, "V-2", ContingencyType::Ouo, Some("OL-1")),
2343 build_report(&take, "V-3", ContingencyType::Ouo, Some("OL-1")),
2344 ];
2345
2346 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2347
2348 assert_eq!(
2349 reports[0].linked_order_ids,
2350 Some(vec![
2351 ClientOrderId::from(stop.as_str()),
2352 ClientOrderId::from(take.as_str()),
2353 ]),
2354 );
2355 assert_eq!(
2356 reports[1].linked_order_ids,
2357 Some(vec![
2358 ClientOrderId::from(entry.as_str()),
2359 ClientOrderId::from(take.as_str()),
2360 ]),
2361 );
2362 assert_eq!(
2363 reports[2].linked_order_ids,
2364 Some(vec![
2365 ClientOrderId::from(entry.as_str()),
2366 ClientOrderId::from(stop.as_str()),
2367 ]),
2368 );
2369 }
2370
2371 #[rstest]
2372 fn test_populate_linked_order_ids_from_id_prefix() {
2373 let base = "O-20250922-002220-001-000";
2374 let entry = format!("{base}-1");
2375 let stop = format!("{base}-2");
2376 let take = format!("{base}-3");
2377
2378 let mut reports = vec![
2379 build_report(&entry, "V-1", ContingencyType::Oto, None),
2380 build_report(&stop, "V-2", ContingencyType::Ouo, None),
2381 build_report(&take, "V-3", ContingencyType::Ouo, None),
2382 ];
2383
2384 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2385
2386 assert_eq!(
2387 reports[0].linked_order_ids,
2388 Some(vec![
2389 ClientOrderId::from(stop.as_str()),
2390 ClientOrderId::from(take.as_str()),
2391 ]),
2392 );
2393 assert_eq!(
2394 reports[1].linked_order_ids,
2395 Some(vec![
2396 ClientOrderId::from(entry.as_str()),
2397 ClientOrderId::from(take.as_str()),
2398 ]),
2399 );
2400 assert_eq!(
2401 reports[2].linked_order_ids,
2402 Some(vec![
2403 ClientOrderId::from(entry.as_str()),
2404 ClientOrderId::from(stop.as_str()),
2405 ]),
2406 );
2407 }
2408
2409 #[rstest]
2410 fn test_populate_linked_order_ids_respects_non_contingent_orders() {
2411 let base = "O-20250922-002221-001-000";
2412 let entry = format!("{base}-1");
2413 let passive = format!("{base}-2");
2414
2415 let mut reports = vec![
2416 build_report(&entry, "V-1", ContingencyType::NoContingency, None),
2417 build_report(&passive, "V-2", ContingencyType::Ouo, None),
2418 ];
2419
2420 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2421
2422 assert!(reports[0].linked_order_ids.is_none());
2424
2425 assert!(reports[1].linked_order_ids.is_none());
2427 assert_eq!(reports[1].contingency_type, ContingencyType::NoContingency);
2428 }
2429}