1use std::{
26 collections::HashMap,
27 num::NonZeroU32,
28 sync::{
29 Arc, LazyLock,
30 atomic::{AtomicBool, Ordering},
31 },
32};
33
34use chrono::{DateTime, Utc};
35use dashmap::DashMap;
36use nautilus_core::{
37 UnixNanos,
38 consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
39 env::get_or_env_var_opt,
40 time::get_atomic_clock_realtime,
41};
42use nautilus_model::{
43 data::{Bar, BarType, TradeTick},
44 enums::{
45 AggregationSource, BarAggregation, ContingencyType, OrderSide, OrderType, PriceType,
46 TimeInForce, TriggerType,
47 },
48 events::AccountState,
49 identifiers::{AccountId, ClientOrderId, InstrumentId, OrderListId, VenueOrderId},
50 instruments::{Instrument as InstrumentTrait, InstrumentAny},
51 reports::{FillReport, OrderStatusReport, PositionStatusReport},
52 types::{Price, Quantity},
53};
54use nautilus_network::{
55 http::{HttpClient, Method, StatusCode, USER_AGENT},
56 ratelimiter::quota::Quota,
57 retry::{RetryConfig, RetryManager},
58};
59use serde::{Deserialize, Serialize, de::DeserializeOwned};
60use serde_json::Value;
61use tokio_util::sync::CancellationToken;
62use ustr::Ustr;
63
64use super::{
65 error::{BitmexErrorResponse, BitmexHttpError},
66 models::{
67 BitmexApiInfo, BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder,
68 BitmexPosition, BitmexTrade, BitmexTradeBin, BitmexWallet,
69 },
70 query::{
71 DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
72 GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeBucketedParams,
73 GetTradeBucketedParamsBuilder, GetTradeParams, GetTradeParamsBuilder, PostOrderParams,
74 PostPositionLeverageParams, PutOrderParams,
75 },
76};
77use crate::{
78 common::{
79 consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL},
80 credential::Credential,
81 enums::{BitmexContingencyType, BitmexOrderStatus, BitmexSide},
82 parse::{parse_account_state, quantity_to_u32},
83 },
84 http::{
85 parse::{
86 InstrumentParseResult, parse_fill_report, parse_instrument_any,
87 parse_order_status_report, parse_position_report, parse_trade, parse_trade_bin,
88 },
89 query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
90 },
91 websocket::messages::BitmexMarginMsg,
92};
93
94const BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 10;
100const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED: u32 = 120;
101const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED: u32 = 30;
102
103const BITMEX_GLOBAL_RATE_KEY: &str = "bitmex:global";
104const BITMEX_MINUTE_RATE_KEY: &str = "bitmex:minute";
105
106static RATE_LIMIT_KEYS: LazyLock<Vec<Ustr>> = LazyLock::new(|| {
107 vec![
108 Ustr::from(BITMEX_GLOBAL_RATE_KEY),
109 Ustr::from(BITMEX_MINUTE_RATE_KEY),
110 ]
111});
112
113#[derive(Debug, Serialize, Deserialize)]
115pub struct BitmexResponse<T> {
116 pub data: Vec<T>,
118}
119
120#[derive(Debug, Clone)]
140pub struct BitmexRawHttpClient {
141 base_url: String,
142 client: HttpClient,
143 credential: Option<Credential>,
144 recv_window_ms: u64,
145 retry_manager: RetryManager<BitmexHttpError>,
146 cancellation_token: CancellationToken,
147}
148
149impl Default for BitmexRawHttpClient {
150 fn default() -> Self {
151 Self::new(None, Some(60), None, None, None, None, None, None, None)
152 .expect("Failed to create default BitmexHttpInnerClient")
153 }
154}
155
156impl BitmexRawHttpClient {
157 #[allow(clippy::too_many_arguments)]
167 pub fn new(
168 base_url: Option<String>,
169 timeout_secs: Option<u64>,
170 max_retries: Option<u32>,
171 retry_delay_ms: Option<u64>,
172 retry_delay_max_ms: Option<u64>,
173 recv_window_ms: Option<u64>,
174 max_requests_per_second: Option<u32>,
175 max_requests_per_minute: Option<u32>,
176 proxy_url: Option<String>,
177 ) -> Result<Self, BitmexHttpError> {
178 let retry_config = RetryConfig {
179 max_retries: max_retries.unwrap_or(3),
180 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
181 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
182 backoff_factor: 2.0,
183 jitter_ms: 1000,
184 operation_timeout_ms: Some(60_000),
185 immediate_first: false,
186 max_elapsed_ms: Some(180_000),
187 };
188
189 let retry_manager = RetryManager::new(retry_config);
190
191 let max_req_per_sec =
192 max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
193 let max_req_per_min =
194 max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED);
195
196 Ok(Self {
197 base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
198 client: HttpClient::new(
199 Self::default_headers(),
200 vec![],
201 Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
202 Some(Self::default_quota(max_req_per_sec)),
203 timeout_secs,
204 proxy_url,
205 )
206 .map_err(|e| {
207 BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
208 })?,
209 credential: None,
210 recv_window_ms: recv_window_ms.unwrap_or(10_000),
211 retry_manager,
212 cancellation_token: CancellationToken::new(),
213 })
214 }
215
216 #[allow(clippy::too_many_arguments)]
223 pub fn with_credentials(
224 api_key: String,
225 api_secret: String,
226 base_url: String,
227 timeout_secs: Option<u64>,
228 max_retries: Option<u32>,
229 retry_delay_ms: Option<u64>,
230 retry_delay_max_ms: Option<u64>,
231 recv_window_ms: Option<u64>,
232 max_requests_per_second: Option<u32>,
233 max_requests_per_minute: Option<u32>,
234 proxy_url: Option<String>,
235 ) -> Result<Self, BitmexHttpError> {
236 let retry_config = RetryConfig {
237 max_retries: max_retries.unwrap_or(3),
238 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
239 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
240 backoff_factor: 2.0,
241 jitter_ms: 1000,
242 operation_timeout_ms: Some(60_000),
243 immediate_first: false,
244 max_elapsed_ms: Some(180_000),
245 };
246
247 let retry_manager = RetryManager::new(retry_config);
248
249 let max_req_per_sec =
250 max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
251 let max_req_per_min =
252 max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED);
253
254 Ok(Self {
255 base_url,
256 client: HttpClient::new(
257 Self::default_headers(),
258 vec![],
259 Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
260 Some(Self::default_quota(max_req_per_sec)),
261 timeout_secs,
262 proxy_url,
263 )
264 .map_err(|e| {
265 BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
266 })?,
267 credential: Some(Credential::new(api_key, api_secret)),
268 recv_window_ms: recv_window_ms.unwrap_or(10_000),
269 retry_manager,
270 cancellation_token: CancellationToken::new(),
271 })
272 }
273
274 fn default_headers() -> HashMap<String, String> {
275 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
276 }
277
278 fn default_quota(max_requests_per_second: u32) -> Quota {
279 Quota::per_second(
280 NonZeroU32::new(max_requests_per_second)
281 .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
282 )
283 }
284
285 fn rate_limiter_quotas(
286 max_requests_per_second: u32,
287 max_requests_per_minute: u32,
288 ) -> Vec<(String, Quota)> {
289 let per_sec_quota = Quota::per_second(
290 NonZeroU32::new(max_requests_per_second)
291 .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
292 );
293 let per_min_quota =
294 Quota::per_minute(NonZeroU32::new(max_requests_per_minute).unwrap_or_else(|| {
295 NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED).unwrap()
296 }));
297
298 vec![
299 (BITMEX_GLOBAL_RATE_KEY.to_string(), per_sec_quota),
300 (BITMEX_MINUTE_RATE_KEY.to_string(), per_min_quota),
301 ]
302 }
303
304 fn rate_limit_keys() -> Vec<Ustr> {
305 RATE_LIMIT_KEYS.clone()
306 }
307
308 pub fn cancel_all_requests(&self) {
310 self.cancellation_token.cancel();
311 }
312
313 pub fn cancellation_token(&self) -> &CancellationToken {
315 &self.cancellation_token
316 }
317
318 fn sign_request(
319 &self,
320 method: &Method,
321 endpoint: &str,
322 body: Option<&[u8]>,
323 ) -> Result<HashMap<String, String>, BitmexHttpError> {
324 let credential = self
325 .credential
326 .as_ref()
327 .ok_or(BitmexHttpError::MissingCredentials)?;
328
329 let expires = Utc::now().timestamp() + (self.recv_window_ms / 1000) as i64;
330 let body_str = body.and_then(|b| std::str::from_utf8(b).ok()).unwrap_or("");
331
332 let full_path = if endpoint.starts_with("/api/v1") {
333 endpoint.to_string()
334 } else {
335 format!("/api/v1{endpoint}")
336 };
337
338 let signature = credential.sign(method.as_str(), &full_path, expires, body_str);
339
340 let mut headers = HashMap::new();
341 headers.insert("api-expires".to_string(), expires.to_string());
342 headers.insert("api-key".to_string(), credential.api_key.to_string());
343 headers.insert("api-signature".to_string(), signature);
344
345 if body.is_some()
347 && (*method == Method::POST || *method == Method::PUT || *method == Method::DELETE)
348 {
349 headers.insert(
350 "Content-Type".to_string(),
351 "application/x-www-form-urlencoded".to_string(),
352 );
353 }
354
355 Ok(headers)
356 }
357
358 async fn send_request<T: DeserializeOwned, P: Serialize>(
359 &self,
360 method: Method,
361 endpoint: &str,
362 params: Option<&P>,
363 body: Option<Vec<u8>>,
364 authenticate: bool,
365 ) -> Result<T, BitmexHttpError> {
366 let endpoint = endpoint.to_string();
367 let method_clone = method.clone();
368 let body_clone = body.clone();
369
370 let params_str = if method == Method::GET || method == Method::DELETE {
373 params
374 .map(serde_urlencoded::to_string)
375 .transpose()
376 .map_err(|e| {
377 BitmexHttpError::JsonError(format!("Failed to serialize params: {e}"))
378 })?
379 } else {
380 None
381 };
382
383 let full_endpoint = match params_str {
384 Some(ref query) if !query.is_empty() => format!("{endpoint}?{query}"),
385 _ => endpoint.clone(),
386 };
387
388 let url = format!("{}{}", self.base_url, full_endpoint);
389
390 let operation = || {
391 let url = url.clone();
392 let method = method_clone.clone();
393 let body = body_clone.clone();
394 let full_endpoint = full_endpoint.clone();
395
396 async move {
397 let headers = if authenticate {
398 Some(self.sign_request(&method, &full_endpoint, body.as_deref())?)
399 } else {
400 None
401 };
402
403 let rate_keys = Self::rate_limit_keys();
404 let resp = self
405 .client
406 .request_with_ustr_keys(method, url, None, headers, body, None, Some(rate_keys))
407 .await?;
408
409 if resp.status.is_success() {
410 serde_json::from_slice(&resp.body).map_err(Into::into)
411 } else if let Ok(error_resp) =
412 serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
413 {
414 Err(error_resp.into())
415 } else {
416 Err(BitmexHttpError::UnexpectedStatus {
417 status: StatusCode::from_u16(resp.status.as_u16())
418 .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
419 body: String::from_utf8_lossy(&resp.body).to_string(),
420 })
421 }
422 }
423 };
424
425 let should_retry = |error: &BitmexHttpError| -> bool {
442 match error {
443 BitmexHttpError::NetworkError(_) => true,
444 BitmexHttpError::UnexpectedStatus { status, .. } => {
445 status.as_u16() >= 500 || status.as_u16() == 429
446 }
447 BitmexHttpError::BitmexError {
448 error_name,
449 message,
450 } => {
451 error_name == "RateLimitError"
452 || (error_name == "HTTPError"
453 && message.to_lowercase().contains("rate limit"))
454 }
455 _ => false,
456 }
457 };
458
459 let create_error = |msg: String| -> BitmexHttpError {
460 if msg == "canceled" {
461 BitmexHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
462 } else {
463 BitmexHttpError::NetworkError(msg)
464 }
465 };
466
467 self.retry_manager
468 .execute_with_retry_with_cancel(
469 endpoint.as_str(),
470 operation,
471 should_retry,
472 create_error,
473 &self.cancellation_token,
474 )
475 .await
476 }
477
478 pub async fn get_instruments(
484 &self,
485 active_only: bool,
486 ) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
487 let path = if active_only {
488 "/instrument/active"
489 } else {
490 "/instrument"
491 };
492 self.send_request::<_, ()>(Method::GET, path, None, None, false)
493 .await
494 }
495
496 pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
506 let response: BitmexApiInfo = self
507 .send_request::<_, ()>(Method::GET, "", None, None, false)
508 .await?;
509 Ok(response.timestamp)
510 }
511
512 pub async fn get_instrument(
523 &self,
524 symbol: &str,
525 ) -> Result<Option<BitmexInstrument>, BitmexHttpError> {
526 let path = &format!("/instrument?symbol={symbol}");
527 let instruments: Vec<BitmexInstrument> = self
528 .send_request::<_, ()>(Method::GET, path, None, None, false)
529 .await?;
530
531 Ok(instruments.into_iter().next())
532 }
533
534 pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
540 let endpoint = "/user/wallet";
541 self.send_request::<_, ()>(Method::GET, endpoint, None, None, true)
542 .await
543 }
544
545 pub async fn get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
551 let path = format!("/user/margin?currency={currency}");
552 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
553 .await
554 }
555
556 pub async fn get_trades(
566 &self,
567 params: GetTradeParams,
568 ) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
569 self.send_request(Method::GET, "/trade", Some(¶ms), None, true)
570 .await
571 }
572
573 pub async fn get_trade_bucketed(
579 &self,
580 params: GetTradeBucketedParams,
581 ) -> Result<Vec<BitmexTradeBin>, BitmexHttpError> {
582 self.send_request(Method::GET, "/trade/bucketed", Some(¶ms), None, true)
583 .await
584 }
585
586 pub async fn get_orders(
596 &self,
597 params: GetOrderParams,
598 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
599 self.send_request(Method::GET, "/order", Some(¶ms), None, true)
600 .await
601 }
602
603 pub async fn place_order(&self, params: PostOrderParams) -> Result<Value, BitmexHttpError> {
613 let body = serde_urlencoded::to_string(¶ms)
615 .map_err(|e| {
616 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
617 })?
618 .into_bytes();
619 let path = "/order";
620 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
621 .await
622 }
623
624 pub async fn cancel_orders(&self, params: DeleteOrderParams) -> Result<Value, BitmexHttpError> {
634 let body = serde_urlencoded::to_string(¶ms)
636 .map_err(|e| {
637 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
638 })?
639 .into_bytes();
640 let path = "/order";
641 self.send_request::<_, ()>(Method::DELETE, path, None, Some(body), true)
642 .await
643 }
644
645 pub async fn amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
655 let body = serde_urlencoded::to_string(¶ms)
657 .map_err(|e| {
658 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
659 })?
660 .into_bytes();
661 let path = "/order";
662 self.send_request::<_, ()>(Method::PUT, path, None, Some(body), true)
663 .await
664 }
665
666 pub async fn cancel_all_orders(
680 &self,
681 params: DeleteAllOrdersParams,
682 ) -> Result<Value, BitmexHttpError> {
683 self.send_request(Method::DELETE, "/order/all", Some(¶ms), None, true)
684 .await
685 }
686
687 pub async fn get_executions(
697 &self,
698 params: GetExecutionParams,
699 ) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
700 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
701 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
702 })?;
703 let path = format!("/execution/tradeHistory?{query}");
704 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
705 .await
706 }
707
708 pub async fn get_positions(
718 &self,
719 params: GetPositionParams,
720 ) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
721 self.send_request(Method::GET, "/position", Some(¶ms), None, true)
722 .await
723 }
724
725 pub async fn update_position_leverage(
735 &self,
736 params: PostPositionLeverageParams,
737 ) -> Result<BitmexPosition, BitmexHttpError> {
738 let body = serde_urlencoded::to_string(¶ms)
740 .map_err(|e| {
741 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
742 })?
743 .into_bytes();
744 let path = "/position/leverage";
745 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
746 .await
747 }
748}
749
750#[derive(Debug)]
755#[cfg_attr(
756 feature = "python",
757 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
758)]
759pub struct BitmexHttpClient {
760 inner: Arc<BitmexRawHttpClient>,
761 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
762 cache_initialized: AtomicBool,
763}
764
765impl Clone for BitmexHttpClient {
766 fn clone(&self) -> Self {
767 let cache_initialized = AtomicBool::new(false);
768
769 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
770 if is_initialized {
771 cache_initialized.store(true, Ordering::Release);
772 }
773
774 Self {
775 inner: self.inner.clone(),
776 instruments_cache: self.instruments_cache.clone(),
777 cache_initialized,
778 }
779 }
780}
781
782impl Default for BitmexHttpClient {
783 fn default() -> Self {
784 Self::new(
785 None,
786 None,
787 None,
788 false,
789 Some(60),
790 None,
791 None,
792 None,
793 None,
794 None,
795 None,
796 None, )
798 .expect("Failed to create default BitmexHttpClient")
799 }
800}
801
802impl BitmexHttpClient {
803 #[allow(clippy::too_many_arguments)]
809 pub fn new(
810 base_url: Option<String>,
811 api_key: Option<String>,
812 api_secret: Option<String>,
813 testnet: bool,
814 timeout_secs: Option<u64>,
815 max_retries: Option<u32>,
816 retry_delay_ms: Option<u64>,
817 retry_delay_max_ms: Option<u64>,
818 recv_window_ms: Option<u64>,
819 max_requests_per_second: Option<u32>,
820 max_requests_per_minute: Option<u32>,
821 proxy_url: Option<String>,
822 ) -> Result<Self, BitmexHttpError> {
823 let url = base_url.unwrap_or_else(|| {
825 if testnet {
826 BITMEX_HTTP_TESTNET_URL.to_string()
827 } else {
828 BITMEX_HTTP_URL.to_string()
829 }
830 });
831
832 let inner = match (api_key, api_secret) {
833 (Some(key), Some(secret)) => BitmexRawHttpClient::with_credentials(
834 key,
835 secret,
836 url,
837 timeout_secs,
838 max_retries,
839 retry_delay_ms,
840 retry_delay_max_ms,
841 recv_window_ms,
842 max_requests_per_second,
843 max_requests_per_minute,
844 proxy_url,
845 )?,
846 _ => BitmexRawHttpClient::new(
847 Some(url),
848 timeout_secs,
849 max_retries,
850 retry_delay_ms,
851 retry_delay_max_ms,
852 recv_window_ms,
853 max_requests_per_second,
854 max_requests_per_minute,
855 proxy_url,
856 )?,
857 };
858
859 Ok(Self {
860 inner: Arc::new(inner),
861 instruments_cache: Arc::new(DashMap::new()),
862 cache_initialized: AtomicBool::new(false),
863 })
864 }
865
866 pub fn from_env() -> anyhow::Result<Self> {
873 Self::with_credentials(
874 None, None, None, None, None, None, None, None, None, None, None,
875 )
876 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
877 }
878
879 #[allow(clippy::too_many_arguments)]
889 pub fn with_credentials(
890 api_key: Option<String>,
891 api_secret: Option<String>,
892 base_url: Option<String>,
893 timeout_secs: Option<u64>,
894 max_retries: Option<u32>,
895 retry_delay_ms: Option<u64>,
896 retry_delay_max_ms: Option<u64>,
897 recv_window_ms: Option<u64>,
898 max_requests_per_second: Option<u32>,
899 max_requests_per_minute: Option<u32>,
900 proxy_url: Option<String>,
901 ) -> anyhow::Result<Self> {
902 let testnet = base_url.as_ref().is_some_and(|url| url.contains("testnet"));
904
905 let (key_var, secret_var) = if testnet {
907 ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
908 } else {
909 ("BITMEX_API_KEY", "BITMEX_API_SECRET")
910 };
911
912 let api_key = get_or_env_var_opt(api_key, key_var);
913 let api_secret = get_or_env_var_opt(api_secret, secret_var);
914
915 if api_key.is_some() && api_secret.is_none() {
917 anyhow::bail!("{secret_var} is required when {key_var} is provided");
918 }
919 if api_key.is_none() && api_secret.is_some() {
920 anyhow::bail!("{key_var} is required when {secret_var} is provided");
921 }
922
923 Self::new(
924 base_url,
925 api_key,
926 api_secret,
927 testnet,
928 timeout_secs,
929 max_retries,
930 retry_delay_ms,
931 retry_delay_max_ms,
932 recv_window_ms,
933 max_requests_per_second,
934 max_requests_per_minute,
935 proxy_url,
936 )
937 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
938 }
939
940 #[must_use]
942 pub fn base_url(&self) -> &str {
943 self.inner.base_url.as_str()
944 }
945
946 #[must_use]
948 pub fn api_key(&self) -> Option<&str> {
949 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
950 }
951
952 #[must_use]
954 pub fn api_key_masked(&self) -> Option<String> {
955 self.inner.credential.as_ref().map(|c| c.api_key_masked())
956 }
957
958 pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
966 self.inner.get_server_time().await
967 }
968
969 fn generate_ts_init(&self) -> UnixNanos {
971 get_atomic_clock_realtime().get_time_ns()
972 }
973
974 fn is_contingent_order(contingency_type: ContingencyType) -> bool {
976 matches!(
977 contingency_type,
978 ContingencyType::Oco | ContingencyType::Oto | ContingencyType::Ouo
979 )
980 }
981
982 fn is_parent_contingency(contingency_type: ContingencyType) -> bool {
984 matches!(
985 contingency_type,
986 ContingencyType::Oco | ContingencyType::Oto
987 )
988 }
989
990 fn populate_linked_order_ids(reports: &mut [OrderStatusReport]) {
992 let mut order_list_groups: HashMap<OrderListId, Vec<ClientOrderId>> = HashMap::new();
993 let mut order_list_parents: HashMap<OrderListId, ClientOrderId> = HashMap::new();
994 let mut prefix_groups: HashMap<String, Vec<ClientOrderId>> = HashMap::new();
995 let mut prefix_parents: HashMap<String, ClientOrderId> = HashMap::new();
996
997 for report in reports.iter() {
998 let Some(client_order_id) = report.client_order_id else {
999 continue;
1000 };
1001
1002 if let Some(order_list_id) = report.order_list_id {
1003 order_list_groups
1004 .entry(order_list_id)
1005 .or_default()
1006 .push(client_order_id);
1007
1008 if Self::is_parent_contingency(report.contingency_type) {
1009 order_list_parents
1010 .entry(order_list_id)
1011 .or_insert(client_order_id);
1012 }
1013 }
1014
1015 if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1016 && Self::is_contingent_order(report.contingency_type)
1017 {
1018 prefix_groups
1019 .entry(base.to_owned())
1020 .or_default()
1021 .push(client_order_id);
1022
1023 if Self::is_parent_contingency(report.contingency_type) {
1024 prefix_parents
1025 .entry(base.to_owned())
1026 .or_insert(client_order_id);
1027 }
1028 }
1029 }
1030
1031 for report in reports.iter_mut() {
1032 let Some(client_order_id) = report.client_order_id else {
1033 continue;
1034 };
1035
1036 if report.linked_order_ids.is_some() {
1037 continue;
1038 }
1039
1040 if !Self::is_contingent_order(report.contingency_type) {
1042 continue;
1043 }
1044
1045 if let Some(order_list_id) = report.order_list_id
1046 && let Some(group) = order_list_groups.get(&order_list_id)
1047 {
1048 let mut linked: Vec<ClientOrderId> = group
1049 .iter()
1050 .copied()
1051 .filter(|candidate| candidate != &client_order_id)
1052 .collect();
1053
1054 if !linked.is_empty() {
1055 if let Some(parent_id) = order_list_parents.get(&order_list_id) {
1056 if client_order_id != *parent_id {
1057 linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1058 report.parent_order_id = Some(*parent_id);
1059 } else {
1060 report.parent_order_id = None;
1061 }
1062 } else {
1063 report.parent_order_id = None;
1064 }
1065
1066 tracing::trace!(
1067 client_order_id = ?client_order_id,
1068 order_list_id = ?order_list_id,
1069 contingency_type = ?report.contingency_type,
1070 linked_order_ids = ?linked,
1071 "BitMEX linked ids sourced from order list id",
1072 );
1073 report.linked_order_ids = Some(linked);
1074 continue;
1075 }
1076
1077 tracing::trace!(
1078 client_order_id = ?client_order_id,
1079 order_list_id = ?order_list_id,
1080 contingency_type = ?report.contingency_type,
1081 order_list_group = ?group,
1082 "BitMEX order list id group had no peers",
1083 );
1084 report.parent_order_id = None;
1085 } else if report.order_list_id.is_none() {
1086 report.parent_order_id = None;
1087 }
1088
1089 if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1090 && let Some(group) = prefix_groups.get(base)
1091 {
1092 let mut linked: Vec<ClientOrderId> = group
1093 .iter()
1094 .copied()
1095 .filter(|candidate| candidate != &client_order_id)
1096 .collect();
1097
1098 if !linked.is_empty() {
1099 if let Some(parent_id) = prefix_parents.get(base) {
1100 if client_order_id != *parent_id {
1101 linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1102 report.parent_order_id = Some(*parent_id);
1103 } else {
1104 report.parent_order_id = None;
1105 }
1106 } else {
1107 report.parent_order_id = None;
1108 }
1109
1110 tracing::trace!(
1111 client_order_id = ?client_order_id,
1112 contingency_type = ?report.contingency_type,
1113 base = base,
1114 linked_order_ids = ?linked,
1115 "BitMEX linked ids constructed from client order id prefix",
1116 );
1117 report.linked_order_ids = Some(linked);
1118 continue;
1119 }
1120
1121 tracing::trace!(
1122 client_order_id = ?client_order_id,
1123 contingency_type = ?report.contingency_type,
1124 base = base,
1125 prefix_group = ?group,
1126 "BitMEX client order id prefix group had no peers",
1127 );
1128 report.parent_order_id = None;
1129 } else if client_order_id.as_str().contains('-') {
1130 report.parent_order_id = None;
1131 }
1132
1133 if Self::is_contingent_order(report.contingency_type) {
1134 tracing::warn!(
1135 client_order_id = ?report.client_order_id,
1136 order_list_id = ?report.order_list_id,
1137 contingency_type = ?report.contingency_type,
1138 "BitMEX order status report missing linked ids after grouping",
1139 );
1140 report.contingency_type = ContingencyType::NoContingency;
1141 report.parent_order_id = None;
1142 }
1143
1144 report.linked_order_ids = None;
1145 }
1146 }
1147
1148 pub fn cancel_all_requests(&self) {
1150 self.inner.cancel_all_requests();
1151 }
1152
1153 pub fn cancellation_token(&self) -> CancellationToken {
1155 self.inner.cancellation_token().clone()
1156 }
1157
1158 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1162 self.instruments_cache
1163 .insert(instrument.raw_symbol().inner(), instrument);
1164 self.cache_initialized.store(true, Ordering::Release);
1165 }
1166
1167 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1169 self.instruments_cache
1170 .get(symbol)
1171 .map(|entry| entry.value().clone())
1172 }
1173
1174 pub async fn request_instrument(
1182 &self,
1183 instrument_id: InstrumentId,
1184 ) -> anyhow::Result<Option<InstrumentAny>> {
1185 let response = self
1186 .inner
1187 .get_instrument(instrument_id.symbol.as_str())
1188 .await?;
1189
1190 let instrument = match response {
1191 Some(instrument) => instrument,
1192 None => return Ok(None),
1193 };
1194
1195 let ts_init = self.generate_ts_init();
1196
1197 match parse_instrument_any(&instrument, ts_init) {
1198 InstrumentParseResult::Ok(inst) => Ok(Some(*inst)),
1199 InstrumentParseResult::Unsupported {
1200 symbol,
1201 instrument_type,
1202 } => {
1203 tracing::debug!(
1204 "Instrument {symbol} has unsupported type {instrument_type:?}, returning None"
1205 );
1206 Ok(None)
1207 }
1208 InstrumentParseResult::Failed {
1209 symbol,
1210 instrument_type,
1211 error,
1212 } => {
1213 tracing::error!(
1214 "Failed to parse instrument {symbol} (type={instrument_type:?}): {error}"
1215 );
1216 Ok(None)
1217 }
1218 }
1219 }
1220
1221 pub async fn request_instruments(
1227 &self,
1228 active_only: bool,
1229 ) -> anyhow::Result<Vec<InstrumentAny>> {
1230 let instruments = self.inner.get_instruments(active_only).await?;
1231 let ts_init = self.generate_ts_init();
1232
1233 let mut parsed_instruments = Vec::new();
1234 let mut skipped_count = 0;
1235 let mut failed_count = 0;
1236 let total_count = instruments.len();
1237
1238 for inst in instruments {
1239 match parse_instrument_any(&inst, ts_init) {
1240 InstrumentParseResult::Ok(instrument_any) => {
1241 parsed_instruments.push(*instrument_any);
1242 }
1243 InstrumentParseResult::Unsupported {
1244 symbol,
1245 instrument_type,
1246 } => {
1247 skipped_count += 1;
1248 tracing::debug!(
1249 "Skipping unsupported instrument type: symbol={symbol}, type={instrument_type:?}"
1250 );
1251 }
1252 InstrumentParseResult::Failed {
1253 symbol,
1254 instrument_type,
1255 error,
1256 } => {
1257 failed_count += 1;
1258 tracing::error!(
1259 "Failed to parse instrument: symbol={symbol}, type={instrument_type:?}, error={error}"
1260 );
1261 }
1262 }
1263 }
1264
1265 if skipped_count > 0 {
1266 tracing::info!(
1267 "Skipped {skipped_count} unsupported instrument type(s) out of {total_count} total"
1268 );
1269 }
1270
1271 if failed_count > 0 {
1272 tracing::error!(
1273 "Instrument parse failures: {failed_count} failed out of {total_count} total ({} successfully parsed)",
1274 parsed_instruments.len()
1275 );
1276 }
1277
1278 Ok(parsed_instruments)
1279 }
1280
1281 pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
1291 let inner = self.inner.clone();
1292 inner.get_wallet().await
1293 }
1294
1295 pub async fn get_orders(
1305 &self,
1306 params: GetOrderParams,
1307 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
1308 let inner = self.inner.clone();
1309 inner.get_orders(params).await
1310 }
1311
1312 fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1318 self.get_instrument(&symbol).ok_or_else(|| {
1319 anyhow::anyhow!(
1320 "Instrument {symbol} not found in cache, ensure instruments loaded first"
1321 )
1322 })
1323 }
1324
1325 pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
1332 self.instrument_from_cache(symbol)
1333 .map(|instrument| instrument.price_precision())
1334 }
1335
1336 pub async fn get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
1342 self.inner
1343 .get_margin(currency)
1344 .await
1345 .map_err(|e| anyhow::anyhow!(e))
1346 }
1347
1348 pub async fn request_account_state(
1354 &self,
1355 account_id: AccountId,
1356 ) -> anyhow::Result<AccountState> {
1357 let margin = self
1359 .inner
1360 .get_margin("XBt")
1361 .await
1362 .map_err(|e| anyhow::anyhow!(e))?;
1363
1364 let ts_init =
1365 UnixNanos::from(chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64);
1366
1367 let margin_msg = BitmexMarginMsg {
1369 account: margin.account,
1370 currency: margin.currency,
1371 risk_limit: margin.risk_limit,
1372 amount: margin.amount,
1373 prev_realised_pnl: margin.prev_realised_pnl,
1374 gross_comm: margin.gross_comm,
1375 gross_open_cost: margin.gross_open_cost,
1376 gross_open_premium: margin.gross_open_premium,
1377 gross_exec_cost: margin.gross_exec_cost,
1378 gross_mark_value: margin.gross_mark_value,
1379 risk_value: margin.risk_value,
1380 init_margin: margin.init_margin,
1381 maint_margin: margin.maint_margin,
1382 target_excess_margin: margin.target_excess_margin,
1383 realised_pnl: margin.realised_pnl,
1384 unrealised_pnl: margin.unrealised_pnl,
1385 wallet_balance: margin.wallet_balance,
1386 margin_balance: margin.margin_balance,
1387 margin_leverage: margin.margin_leverage,
1388 margin_used_pcnt: margin.margin_used_pcnt,
1389 excess_margin: margin.excess_margin,
1390 available_margin: margin.available_margin,
1391 withdrawable_margin: margin.withdrawable_margin,
1392 maker_fee_discount: None, taker_fee_discount: None, timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
1395 foreign_margin_balance: None,
1396 foreign_requirement: None,
1397 };
1398
1399 parse_account_state(&margin_msg, account_id, ts_init)
1400 }
1401
1402 #[allow(clippy::too_many_arguments)]
1409 pub async fn submit_order(
1410 &self,
1411 instrument_id: InstrumentId,
1412 client_order_id: ClientOrderId,
1413 order_side: OrderSide,
1414 order_type: OrderType,
1415 quantity: Quantity,
1416 time_in_force: TimeInForce,
1417 price: Option<Price>,
1418 trigger_price: Option<Price>,
1419 trigger_type: Option<TriggerType>,
1420 display_qty: Option<Quantity>,
1421 post_only: bool,
1422 reduce_only: bool,
1423 order_list_id: Option<OrderListId>,
1424 contingency_type: Option<ContingencyType>,
1425 ) -> anyhow::Result<OrderStatusReport> {
1426 use crate::common::enums::{
1427 BitmexExecInstruction, BitmexOrderType, BitmexSide, BitmexTimeInForce,
1428 };
1429
1430 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1431
1432 let mut params = super::query::PostOrderParamsBuilder::default();
1433 params.text(NAUTILUS_TRADER);
1434 params.symbol(instrument_id.symbol.as_str());
1435 params.cl_ord_id(client_order_id.as_str());
1436
1437 let side = BitmexSide::try_from_order_side(order_side)?;
1438 params.side(side);
1439
1440 let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
1441 params.ord_type(ord_type);
1442
1443 params.order_qty(quantity_to_u32(&quantity, &instrument));
1444
1445 let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
1446 params.time_in_force(tif);
1447
1448 if let Some(price) = price {
1449 params.price(price.as_f64());
1450 }
1451
1452 if let Some(trigger_price) = trigger_price {
1453 params.stop_px(trigger_price.as_f64());
1454 }
1455
1456 if let Some(display_qty) = display_qty {
1457 params.display_qty(quantity_to_u32(&display_qty, &instrument));
1458 }
1459
1460 if let Some(order_list_id) = order_list_id {
1461 params.cl_ord_link_id(order_list_id.as_str());
1462 }
1463
1464 let mut exec_inst = Vec::new();
1465
1466 if post_only {
1467 exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
1468 }
1469
1470 if reduce_only {
1471 exec_inst.push(BitmexExecInstruction::ReduceOnly);
1472 }
1473
1474 if trigger_price.is_some()
1475 && let Some(trigger_type) = trigger_type
1476 {
1477 match trigger_type {
1478 TriggerType::LastPrice => exec_inst.push(BitmexExecInstruction::LastPrice),
1479 TriggerType::MarkPrice => exec_inst.push(BitmexExecInstruction::MarkPrice),
1480 TriggerType::IndexPrice => exec_inst.push(BitmexExecInstruction::IndexPrice),
1481 _ => {} }
1483 }
1484
1485 if !exec_inst.is_empty() {
1486 params.exec_inst(exec_inst);
1487 }
1488
1489 if let Some(contingency_type) = contingency_type {
1490 let bitmex_contingency = BitmexContingencyType::try_from(contingency_type)?;
1491 params.contingency_type(bitmex_contingency);
1492 }
1493
1494 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1495
1496 let response = self.inner.place_order(params).await?;
1497
1498 let order: BitmexOrder = serde_json::from_value(response)?;
1499
1500 if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1501 let reason = order
1502 .ord_rej_reason
1503 .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1504 anyhow::bail!("Order rejected: {reason}");
1505 }
1506
1507 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1508 let ts_init = self.generate_ts_init();
1509
1510 parse_order_status_report(&order, &instrument, ts_init)
1511 }
1512
1513 pub async fn cancel_order(
1523 &self,
1524 instrument_id: InstrumentId,
1525 client_order_id: Option<ClientOrderId>,
1526 venue_order_id: Option<VenueOrderId>,
1527 ) -> anyhow::Result<OrderStatusReport> {
1528 let mut params = super::query::DeleteOrderParamsBuilder::default();
1529 params.text(NAUTILUS_TRADER);
1530
1531 if let Some(venue_order_id) = venue_order_id {
1532 params.order_id(vec![venue_order_id.as_str().to_string()]);
1533 } else if let Some(client_order_id) = client_order_id {
1534 params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
1535 } else {
1536 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1537 }
1538
1539 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1540
1541 let response = self.inner.cancel_orders(params).await?;
1542
1543 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1544 let order = orders
1545 .into_iter()
1546 .next()
1547 .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1548
1549 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1550 let ts_init = self.generate_ts_init();
1551
1552 parse_order_status_report(&order, &instrument, ts_init)
1553 }
1554
1555 pub async fn cancel_orders(
1565 &self,
1566 instrument_id: InstrumentId,
1567 client_order_ids: Option<Vec<ClientOrderId>>,
1568 venue_order_ids: Option<Vec<VenueOrderId>>,
1569 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1570 let mut params = super::query::DeleteOrderParamsBuilder::default();
1571 params.text(NAUTILUS_TRADER);
1572
1573 if let Some(venue_order_ids) = venue_order_ids {
1576 if venue_order_ids.is_empty() {
1577 anyhow::bail!("venue_order_ids cannot be empty");
1578 }
1579 params.order_id(
1580 venue_order_ids
1581 .iter()
1582 .map(|id| id.to_string())
1583 .collect::<Vec<_>>(),
1584 );
1585 } else if let Some(client_order_ids) = client_order_ids {
1586 if client_order_ids.is_empty() {
1587 anyhow::bail!("client_order_ids cannot be empty");
1588 }
1589 params.cl_ord_id(
1590 client_order_ids
1591 .iter()
1592 .map(|id| id.to_string())
1593 .collect::<Vec<_>>(),
1594 );
1595 } else {
1596 anyhow::bail!("Either client_order_ids or venue_order_ids must be provided");
1597 }
1598
1599 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1600
1601 let response = self.inner.cancel_orders(params).await?;
1602
1603 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1604
1605 let ts_init = self.generate_ts_init();
1606 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1607
1608 let mut reports = Vec::new();
1609
1610 for order in orders {
1611 reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1612 }
1613
1614 Self::populate_linked_order_ids(&mut reports);
1615
1616 Ok(reports)
1617 }
1618
1619 pub async fn cancel_all_orders(
1629 &self,
1630 instrument_id: InstrumentId,
1631 order_side: Option<OrderSide>,
1632 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1633 let mut params = DeleteAllOrdersParamsBuilder::default();
1634 params.text(NAUTILUS_TRADER);
1635 params.symbol(instrument_id.symbol.as_str());
1636
1637 if let Some(side) = order_side {
1638 let side = BitmexSide::try_from_order_side(side)?;
1639 params.filter(serde_json::json!({
1640 "side": side
1641 }));
1642 }
1643
1644 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1645
1646 let response = self.inner.cancel_all_orders(params).await?;
1647
1648 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1649
1650 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1651 let ts_init = self.generate_ts_init();
1652
1653 let mut reports = Vec::new();
1654
1655 for order in orders {
1656 reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1657 }
1658
1659 Self::populate_linked_order_ids(&mut reports);
1660
1661 Ok(reports)
1662 }
1663
1664 pub async fn modify_order(
1675 &self,
1676 instrument_id: InstrumentId,
1677 client_order_id: Option<ClientOrderId>,
1678 venue_order_id: Option<VenueOrderId>,
1679 quantity: Option<Quantity>,
1680 price: Option<Price>,
1681 trigger_price: Option<Price>,
1682 ) -> anyhow::Result<OrderStatusReport> {
1683 let mut params = PutOrderParamsBuilder::default();
1684 params.text(NAUTILUS_TRADER);
1685
1686 if let Some(venue_order_id) = venue_order_id {
1688 params.order_id(venue_order_id.as_str());
1689 } else if let Some(client_order_id) = client_order_id {
1690 params.orig_cl_ord_id(client_order_id.as_str());
1691 } else {
1692 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1693 }
1694
1695 if let Some(quantity) = quantity {
1696 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1697 params.order_qty(quantity_to_u32(&quantity, &instrument));
1698 }
1699
1700 if let Some(price) = price {
1701 params.price(price.as_f64());
1702 }
1703
1704 if let Some(trigger_price) = trigger_price {
1705 params.stop_px(trigger_price.as_f64());
1706 }
1707
1708 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1709
1710 let response = self.inner.amend_order(params).await?;
1711
1712 let order: BitmexOrder = serde_json::from_value(response)?;
1713
1714 if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1715 let reason = order
1716 .ord_rej_reason
1717 .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1718 anyhow::bail!("Order modification rejected: {reason}");
1719 }
1720
1721 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1722 let ts_init = self.generate_ts_init();
1723
1724 parse_order_status_report(&order, &instrument, ts_init)
1725 }
1726
1727 pub async fn query_order(
1736 &self,
1737 instrument_id: InstrumentId,
1738 client_order_id: Option<ClientOrderId>,
1739 venue_order_id: Option<VenueOrderId>,
1740 ) -> anyhow::Result<Option<OrderStatusReport>> {
1741 let mut params = GetOrderParamsBuilder::default();
1742
1743 let filter_json = if let Some(client_order_id) = client_order_id {
1744 serde_json::json!({
1745 "clOrdID": client_order_id.to_string()
1746 })
1747 } else if let Some(venue_order_id) = venue_order_id {
1748 serde_json::json!({
1749 "orderID": venue_order_id.to_string()
1750 })
1751 } else {
1752 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1753 };
1754
1755 params.filter(filter_json);
1756 params.count(1); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1759
1760 let response = self.inner.get_orders(params).await?;
1761
1762 if response.is_empty() {
1763 return Ok(None);
1764 }
1765
1766 let order = &response[0];
1767
1768 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1769 let ts_init = self.generate_ts_init();
1770
1771 let report = parse_order_status_report(order, &instrument, ts_init)?;
1772
1773 Ok(Some(report))
1774 }
1775
1776 pub async fn request_order_status_report(
1785 &self,
1786 instrument_id: InstrumentId,
1787 client_order_id: Option<ClientOrderId>,
1788 venue_order_id: Option<VenueOrderId>,
1789 ) -> anyhow::Result<OrderStatusReport> {
1790 let mut params = GetOrderParamsBuilder::default();
1791 params.symbol(instrument_id.symbol.as_str());
1792
1793 if let Some(venue_order_id) = venue_order_id {
1794 params.filter(serde_json::json!({
1795 "orderID": venue_order_id.as_str()
1796 }));
1797 } else if let Some(client_order_id) = client_order_id {
1798 params.filter(serde_json::json!({
1799 "clOrdID": client_order_id.as_str()
1800 }));
1801 }
1802
1803 params.count(1i32);
1804 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1805
1806 let response = self.inner.get_orders(params).await?;
1807
1808 let order = response
1809 .into_iter()
1810 .next()
1811 .ok_or_else(|| anyhow::anyhow!("Order not found"))?;
1812
1813 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1814 let ts_init = self.generate_ts_init();
1815
1816 parse_order_status_report(&order, &instrument, ts_init)
1817 }
1818
1819 pub async fn request_order_status_reports(
1828 &self,
1829 instrument_id: Option<InstrumentId>,
1830 open_only: bool,
1831 limit: Option<u32>,
1832 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1833 let mut params = GetOrderParamsBuilder::default();
1834
1835 if let Some(instrument_id) = &instrument_id {
1836 params.symbol(instrument_id.symbol.as_str());
1837 }
1838
1839 if open_only {
1840 params.filter(serde_json::json!({
1841 "open": true
1842 }));
1843 }
1844
1845 if let Some(limit) = limit {
1846 params.count(limit as i32);
1847 } else {
1848 params.count(500); }
1850
1851 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1854
1855 let response = self.inner.get_orders(params).await?;
1856
1857 let ts_init = self.generate_ts_init();
1858
1859 let mut reports = Vec::new();
1860
1861 for order in response {
1862 let Some(symbol) = order.symbol else {
1864 tracing::warn!("Order response missing symbol, skipping");
1865 continue;
1866 };
1867
1868 let Ok(instrument) = self.instrument_from_cache(symbol) else {
1869 tracing::debug!(
1870 symbol = %symbol,
1871 "Skipping order report for instrument not in cache"
1872 );
1873 continue;
1874 };
1875
1876 match parse_order_status_report(&order, &instrument, ts_init) {
1877 Ok(report) => reports.push(report),
1878 Err(e) => tracing::error!("Failed to parse order status report: {e}"),
1879 }
1880 }
1881
1882 Self::populate_linked_order_ids(&mut reports);
1883
1884 Ok(reports)
1885 }
1886
1887 pub async fn request_trades(
1893 &self,
1894 instrument_id: InstrumentId,
1895 start: Option<DateTime<Utc>>,
1896 end: Option<DateTime<Utc>>,
1897 limit: Option<u32>,
1898 ) -> anyhow::Result<Vec<TradeTick>> {
1899 let mut params = GetTradeParamsBuilder::default();
1900 params.symbol(instrument_id.symbol.as_str());
1901
1902 if let Some(start) = start {
1903 params.start_time(start);
1904 }
1905
1906 if let Some(end) = end {
1907 params.end_time(end);
1908 }
1909
1910 if let (Some(start), Some(end)) = (start, end) {
1911 anyhow::ensure!(
1912 start < end,
1913 "Invalid time range: start={start:?} end={end:?}",
1914 );
1915 }
1916
1917 if let Some(limit) = limit {
1918 let clamped_limit = limit.min(1000);
1919 if limit > 1000 {
1920 tracing::warn!(
1921 limit,
1922 clamped_limit,
1923 "BitMEX trade request limit exceeds venue maximum; clamping",
1924 );
1925 }
1926 params.count(i32::try_from(clamped_limit).unwrap_or(1000));
1927 }
1928 params.reverse(false);
1929 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1930
1931 let response = self.inner.get_trades(params).await?;
1932
1933 let ts_init = self.generate_ts_init();
1934
1935 let mut parsed_trades = Vec::new();
1936
1937 for trade in response {
1938 if let Some(start) = start
1939 && trade.timestamp < start
1940 {
1941 continue;
1942 }
1943
1944 if let Some(end) = end
1945 && trade.timestamp > end
1946 {
1947 continue;
1948 }
1949
1950 let price_precision = self.get_price_precision(trade.symbol)?;
1951
1952 match parse_trade(trade, price_precision, ts_init) {
1953 Ok(trade) => parsed_trades.push(trade),
1954 Err(e) => tracing::error!("Failed to parse trade: {e}"),
1955 }
1956 }
1957
1958 Ok(parsed_trades)
1959 }
1960
1961 pub async fn request_bars(
1968 &self,
1969 mut bar_type: BarType,
1970 start: Option<DateTime<Utc>>,
1971 end: Option<DateTime<Utc>>,
1972 limit: Option<u32>,
1973 partial: bool,
1974 ) -> anyhow::Result<Vec<Bar>> {
1975 bar_type = bar_type.standard();
1976
1977 anyhow::ensure!(
1978 bar_type.aggregation_source() == AggregationSource::External,
1979 "Only EXTERNAL aggregation bars are supported"
1980 );
1981 anyhow::ensure!(
1982 bar_type.spec().price_type == PriceType::Last,
1983 "Only LAST price type bars are supported"
1984 );
1985 if let (Some(start), Some(end)) = (start, end) {
1986 anyhow::ensure!(
1987 start < end,
1988 "Invalid time range: start={start:?} end={end:?}"
1989 );
1990 }
1991
1992 let spec = bar_type.spec();
1993 let bin_size = match (spec.aggregation, spec.step.get()) {
1994 (BarAggregation::Minute, 1) => "1m",
1995 (BarAggregation::Minute, 5) => "5m",
1996 (BarAggregation::Hour, 1) => "1h",
1997 (BarAggregation::Day, 1) => "1d",
1998 _ => anyhow::bail!(
1999 "BitMEX does not support {}-{:?}-{:?} bars",
2000 spec.step.get(),
2001 spec.aggregation,
2002 spec.price_type,
2003 ),
2004 };
2005
2006 let instrument_id = bar_type.instrument_id();
2007 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2008
2009 let mut params = GetTradeBucketedParamsBuilder::default();
2010 params.symbol(instrument_id.symbol.as_str());
2011 params.bin_size(bin_size);
2012 if partial {
2013 params.partial(true);
2014 }
2015 if let Some(start) = start {
2016 params.start_time(start);
2017 }
2018 if let Some(end) = end {
2019 params.end_time(end);
2020 }
2021 if let Some(limit) = limit {
2022 let clamped_limit = limit.min(1000);
2023 if limit > 1000 {
2024 tracing::warn!(
2025 limit,
2026 clamped_limit,
2027 "BitMEX bar request limit exceeds venue maximum; clamping",
2028 );
2029 }
2030 params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2031 }
2032 params.reverse(false);
2033 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2034
2035 let response = self.inner.get_trade_bucketed(params).await?;
2036 let ts_init = self.generate_ts_init();
2037 let mut bars = Vec::new();
2038
2039 for bin in response {
2040 if let Some(start) = start
2041 && bin.timestamp < start
2042 {
2043 continue;
2044 }
2045 if let Some(end) = end
2046 && bin.timestamp > end
2047 {
2048 continue;
2049 }
2050 if bin.symbol != instrument_id.symbol.inner() {
2051 tracing::warn!(
2052 symbol = %bin.symbol,
2053 expected = %instrument_id.symbol,
2054 "Skipping trade bin for unexpected symbol",
2055 );
2056 continue;
2057 }
2058
2059 match parse_trade_bin(bin, &instrument, &bar_type, ts_init) {
2060 Ok(bar) => bars.push(bar),
2061 Err(e) => tracing::warn!("Failed to parse trade bin: {e}"),
2062 }
2063 }
2064
2065 Ok(bars)
2066 }
2067
2068 pub async fn request_fill_reports(
2074 &self,
2075 instrument_id: Option<InstrumentId>,
2076 limit: Option<u32>,
2077 ) -> anyhow::Result<Vec<FillReport>> {
2078 let mut params = GetExecutionParamsBuilder::default();
2079 if let Some(instrument_id) = instrument_id {
2080 params.symbol(instrument_id.symbol.as_str());
2081 }
2082 if let Some(limit) = limit {
2083 params.count(limit as i32);
2084 } else {
2085 params.count(500); }
2087 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2090
2091 let response = self.inner.get_executions(params).await?;
2092
2093 let ts_init = self.generate_ts_init();
2094
2095 let mut reports = Vec::new();
2096
2097 for exec in response {
2098 let Some(symbol) = exec.symbol else {
2100 tracing::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
2101 continue;
2102 };
2103 let symbol_str = symbol.to_string();
2104
2105 let instrument = match self.instrument_from_cache(symbol) {
2106 Ok(instrument) => instrument,
2107 Err(e) => {
2108 tracing::error!(symbol = %symbol_str, "Instrument not found in cache for execution parsing: {e}");
2109 continue;
2110 }
2111 };
2112
2113 match parse_fill_report(exec, &instrument, ts_init) {
2114 Ok(report) => reports.push(report),
2115 Err(e) => {
2116 let error_msg = e.to_string();
2118 if error_msg.starts_with("Skipping non-trade execution")
2119 || error_msg.starts_with("Skipping execution without order_id")
2120 {
2121 tracing::debug!("{e}");
2122 } else {
2123 tracing::error!("Failed to parse fill report: {e}");
2124 }
2125 }
2126 }
2127 }
2128
2129 Ok(reports)
2130 }
2131
2132 pub async fn request_position_status_reports(
2138 &self,
2139 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2140 let params = GetPositionParamsBuilder::default()
2141 .count(500) .build()
2143 .map_err(|e| anyhow::anyhow!(e))?;
2144
2145 let response = self.inner.get_positions(params).await?;
2146
2147 let ts_init = self.generate_ts_init();
2148
2149 let mut reports = Vec::new();
2150
2151 for pos in response {
2152 let symbol = pos.symbol;
2153 let instrument = match self.instrument_from_cache(symbol) {
2154 Ok(instrument) => instrument,
2155 Err(e) => {
2156 tracing::error!(
2157 symbol = pos.symbol.as_str(),
2158 "Instrument not found in cache for position parsing: {e}"
2159 );
2160 continue;
2161 }
2162 };
2163
2164 match parse_position_report(pos, &instrument, ts_init) {
2165 Ok(report) => reports.push(report),
2166 Err(e) => tracing::error!("Failed to parse position report: {e}"),
2167 }
2168 }
2169
2170 Ok(reports)
2171 }
2172
2173 pub async fn update_position_leverage(
2181 &self,
2182 symbol: &str,
2183 leverage: f64,
2184 ) -> anyhow::Result<PositionStatusReport> {
2185 let params = PostPositionLeverageParams {
2186 symbol: symbol.to_string(),
2187 leverage,
2188 target_account_id: None,
2189 };
2190
2191 let response = self.inner.update_position_leverage(params).await?;
2192
2193 let instrument = self.instrument_from_cache(Ustr::from(symbol))?;
2194 let ts_init = self.generate_ts_init();
2195
2196 parse_position_report(response, &instrument, ts_init)
2197 }
2198}
2199
2200#[cfg(test)]
2201mod tests {
2202 use nautilus_core::UUID4;
2203 use nautilus_model::enums::OrderStatus;
2204 use rstest::rstest;
2205 use serde_json::json;
2206
2207 use super::*;
2208
2209 fn build_report(
2210 client_order_id: &str,
2211 venue_order_id: &str,
2212 contingency_type: ContingencyType,
2213 order_list_id: Option<&str>,
2214 ) -> OrderStatusReport {
2215 let mut report = OrderStatusReport::new(
2216 AccountId::from("BITMEX-1"),
2217 InstrumentId::from("XBTUSD.BITMEX"),
2218 Some(ClientOrderId::from(client_order_id)),
2219 VenueOrderId::from(venue_order_id),
2220 OrderSide::Buy,
2221 OrderType::Limit,
2222 TimeInForce::Gtc,
2223 OrderStatus::Accepted,
2224 Quantity::new(100.0, 0),
2225 Quantity::default(),
2226 UnixNanos::from(1_u64),
2227 UnixNanos::from(1_u64),
2228 UnixNanos::from(1_u64),
2229 Some(UUID4::new()),
2230 );
2231
2232 if let Some(id) = order_list_id {
2233 report = report.with_order_list_id(OrderListId::from(id));
2234 }
2235
2236 report.with_contingency_type(contingency_type)
2237 }
2238
2239 #[rstest]
2240 fn test_sign_request_generates_correct_headers() {
2241 let client = BitmexRawHttpClient::with_credentials(
2242 "test_api_key".to_string(),
2243 "test_api_secret".to_string(),
2244 "http://localhost:8080".to_string(),
2245 Some(60),
2246 None, None, None, None, None, None, None, )
2254 .expect("Failed to create test client");
2255
2256 let headers = client
2257 .sign_request(&Method::GET, "/api/v1/order", None)
2258 .unwrap();
2259
2260 assert!(headers.contains_key("api-key"));
2261 assert!(headers.contains_key("api-signature"));
2262 assert!(headers.contains_key("api-expires"));
2263 assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
2264 }
2265
2266 #[rstest]
2267 fn test_sign_request_with_body() {
2268 let client = BitmexRawHttpClient::with_credentials(
2269 "test_api_key".to_string(),
2270 "test_api_secret".to_string(),
2271 "http://localhost:8080".to_string(),
2272 Some(60),
2273 None, None, None, None, None, None, None, )
2281 .expect("Failed to create test client");
2282
2283 let body = json!({"symbol": "XBTUSD", "orderQty": 100});
2284 let body_bytes = serde_json::to_vec(&body).unwrap();
2285
2286 let headers_without_body = client
2287 .sign_request(&Method::POST, "/api/v1/order", None)
2288 .unwrap();
2289 let headers_with_body = client
2290 .sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
2291 .unwrap();
2292
2293 assert_ne!(
2295 headers_without_body.get("api-signature").unwrap(),
2296 headers_with_body.get("api-signature").unwrap()
2297 );
2298 }
2299
2300 #[rstest]
2301 fn test_sign_request_uses_custom_recv_window() {
2302 let client_default = BitmexRawHttpClient::with_credentials(
2303 "test_api_key".to_string(),
2304 "test_api_secret".to_string(),
2305 "http://localhost:8080".to_string(),
2306 Some(60),
2307 None,
2308 None,
2309 None,
2310 None, None, None, None, )
2315 .expect("Failed to create test client");
2316
2317 let client_custom = BitmexRawHttpClient::with_credentials(
2318 "test_api_key".to_string(),
2319 "test_api_secret".to_string(),
2320 "http://localhost:8080".to_string(),
2321 Some(60),
2322 None,
2323 None,
2324 None,
2325 Some(30_000), None, None, None, )
2330 .expect("Failed to create test client");
2331
2332 let headers_default = client_default
2333 .sign_request(&Method::GET, "/api/v1/order", None)
2334 .unwrap();
2335 let headers_custom = client_custom
2336 .sign_request(&Method::GET, "/api/v1/order", None)
2337 .unwrap();
2338
2339 let expires_default: i64 = headers_default.get("api-expires").unwrap().parse().unwrap();
2341 let expires_custom: i64 = headers_custom.get("api-expires").unwrap().parse().unwrap();
2342
2343 let now = Utc::now().timestamp();
2345 assert!(expires_default > now);
2346 assert!(expires_custom > now);
2347
2348 assert!(expires_custom > expires_default);
2350
2351 let diff = expires_custom - expires_default;
2354 assert!((18..=25).contains(&diff));
2355 }
2356
2357 #[rstest]
2358 fn test_populate_linked_order_ids_from_order_list() {
2359 let base = "O-20250922-002219-001-000";
2360 let entry = format!("{base}-1");
2361 let stop = format!("{base}-2");
2362 let take = format!("{base}-3");
2363
2364 let mut reports = vec![
2365 build_report(&entry, "V-1", ContingencyType::Oto, Some("OL-1")),
2366 build_report(&stop, "V-2", ContingencyType::Ouo, Some("OL-1")),
2367 build_report(&take, "V-3", ContingencyType::Ouo, Some("OL-1")),
2368 ];
2369
2370 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2371
2372 assert_eq!(
2373 reports[0].linked_order_ids,
2374 Some(vec![
2375 ClientOrderId::from(stop.as_str()),
2376 ClientOrderId::from(take.as_str()),
2377 ]),
2378 );
2379 assert_eq!(
2380 reports[1].linked_order_ids,
2381 Some(vec![
2382 ClientOrderId::from(entry.as_str()),
2383 ClientOrderId::from(take.as_str()),
2384 ]),
2385 );
2386 assert_eq!(
2387 reports[2].linked_order_ids,
2388 Some(vec![
2389 ClientOrderId::from(entry.as_str()),
2390 ClientOrderId::from(stop.as_str()),
2391 ]),
2392 );
2393 }
2394
2395 #[rstest]
2396 fn test_populate_linked_order_ids_from_id_prefix() {
2397 let base = "O-20250922-002220-001-000";
2398 let entry = format!("{base}-1");
2399 let stop = format!("{base}-2");
2400 let take = format!("{base}-3");
2401
2402 let mut reports = vec![
2403 build_report(&entry, "V-1", ContingencyType::Oto, None),
2404 build_report(&stop, "V-2", ContingencyType::Ouo, None),
2405 build_report(&take, "V-3", ContingencyType::Ouo, None),
2406 ];
2407
2408 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2409
2410 assert_eq!(
2411 reports[0].linked_order_ids,
2412 Some(vec![
2413 ClientOrderId::from(stop.as_str()),
2414 ClientOrderId::from(take.as_str()),
2415 ]),
2416 );
2417 assert_eq!(
2418 reports[1].linked_order_ids,
2419 Some(vec![
2420 ClientOrderId::from(entry.as_str()),
2421 ClientOrderId::from(take.as_str()),
2422 ]),
2423 );
2424 assert_eq!(
2425 reports[2].linked_order_ids,
2426 Some(vec![
2427 ClientOrderId::from(entry.as_str()),
2428 ClientOrderId::from(stop.as_str()),
2429 ]),
2430 );
2431 }
2432
2433 #[rstest]
2434 fn test_populate_linked_order_ids_respects_non_contingent_orders() {
2435 let base = "O-20250922-002221-001-000";
2436 let entry = format!("{base}-1");
2437 let passive = format!("{base}-2");
2438
2439 let mut reports = vec![
2440 build_report(&entry, "V-1", ContingencyType::NoContingency, None),
2441 build_report(&passive, "V-2", ContingencyType::Ouo, None),
2442 ];
2443
2444 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2445
2446 assert!(reports[0].linked_order_ids.is_none());
2448
2449 assert!(reports[1].linked_order_ids.is_none());
2451 assert_eq!(reports[1].contingency_type, ContingencyType::NoContingency);
2452 }
2453}