1use std::{
26 collections::HashMap,
27 num::NonZeroU32,
28 sync::{
29 Arc, LazyLock,
30 atomic::{AtomicBool, Ordering},
31 },
32};
33
34use chrono::{DateTime, Utc};
35use dashmap::DashMap;
36use nautilus_core::{
37 UnixNanos,
38 consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
39 env::get_or_env_var_opt,
40 time::get_atomic_clock_realtime,
41};
42use nautilus_model::{
43 data::{Bar, BarType, TradeTick},
44 enums::{
45 AggregationSource, BarAggregation, ContingencyType, OrderSide, OrderType, PriceType,
46 TimeInForce, TriggerType,
47 },
48 events::AccountState,
49 identifiers::{AccountId, ClientOrderId, InstrumentId, OrderListId, VenueOrderId},
50 instruments::{Instrument as InstrumentTrait, InstrumentAny},
51 reports::{FillReport, OrderStatusReport, PositionStatusReport},
52 types::{Price, Quantity},
53};
54use nautilus_network::{
55 http::{HttpClient, Method, StatusCode, USER_AGENT},
56 ratelimiter::quota::Quota,
57 retry::{RetryConfig, RetryManager},
58};
59use serde::{Deserialize, Serialize, de::DeserializeOwned};
60use serde_json::Value;
61use tokio_util::sync::CancellationToken;
62use ustr::Ustr;
63
64use super::{
65 error::{BitmexErrorResponse, BitmexHttpError},
66 models::{
67 BitmexApiInfo, BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder,
68 BitmexPosition, BitmexTrade, BitmexTradeBin, BitmexWallet,
69 },
70 query::{
71 DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
72 GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeBucketedParams,
73 GetTradeBucketedParamsBuilder, GetTradeParams, GetTradeParamsBuilder, PostOrderParams,
74 PostPositionLeverageParams, PutOrderParams,
75 },
76};
77use crate::{
78 common::{
79 consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL},
80 credential::Credential,
81 enums::{BitmexContingencyType, BitmexOrderStatus, BitmexSide},
82 parse::{parse_account_state, quantity_to_u32},
83 },
84 http::{
85 parse::{
86 InstrumentParseResult, parse_fill_report, parse_instrument_any,
87 parse_order_status_report, parse_position_report, parse_trade, parse_trade_bin,
88 },
89 query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
90 },
91 websocket::messages::BitmexMarginMsg,
92};
93
94const BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 10;
100const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED: u32 = 120;
101const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED: u32 = 30;
102
103const BITMEX_GLOBAL_RATE_KEY: &str = "bitmex:global";
104const BITMEX_MINUTE_RATE_KEY: &str = "bitmex:minute";
105
106static RATE_LIMIT_KEYS: LazyLock<Vec<Ustr>> = LazyLock::new(|| {
107 vec![
108 Ustr::from(BITMEX_GLOBAL_RATE_KEY),
109 Ustr::from(BITMEX_MINUTE_RATE_KEY),
110 ]
111});
112
113#[derive(Debug, Serialize, Deserialize)]
115pub struct BitmexResponse<T> {
116 pub data: Vec<T>,
118}
119
120#[derive(Debug, Clone)]
140pub struct BitmexRawHttpClient {
141 base_url: String,
142 client: HttpClient,
143 credential: Option<Credential>,
144 recv_window_ms: u64,
145 retry_manager: RetryManager<BitmexHttpError>,
146 cancellation_token: CancellationToken,
147}
148
149impl Default for BitmexRawHttpClient {
150 fn default() -> Self {
151 Self::new(None, Some(60), None, None, None, None, None, None, None)
152 .expect("Failed to create default BitmexHttpInnerClient")
153 }
154}
155
156impl BitmexRawHttpClient {
157 #[allow(clippy::too_many_arguments)]
167 pub fn new(
168 base_url: Option<String>,
169 timeout_secs: Option<u64>,
170 max_retries: Option<u32>,
171 retry_delay_ms: Option<u64>,
172 retry_delay_max_ms: Option<u64>,
173 recv_window_ms: Option<u64>,
174 max_requests_per_second: Option<u32>,
175 max_requests_per_minute: Option<u32>,
176 proxy_url: Option<String>,
177 ) -> Result<Self, BitmexHttpError> {
178 let retry_config = RetryConfig {
179 max_retries: max_retries.unwrap_or(3),
180 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
181 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
182 backoff_factor: 2.0,
183 jitter_ms: 1000,
184 operation_timeout_ms: Some(60_000),
185 immediate_first: false,
186 max_elapsed_ms: Some(180_000),
187 };
188
189 let retry_manager = RetryManager::new(retry_config);
190
191 let max_req_per_sec =
192 max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
193 let max_req_per_min =
194 max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED);
195
196 Ok(Self {
197 base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
198 client: HttpClient::new(
199 Self::default_headers(),
200 vec![],
201 Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
202 Some(Self::default_quota(max_req_per_sec)),
203 timeout_secs,
204 proxy_url,
205 )
206 .map_err(|e| {
207 BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
208 })?,
209 credential: None,
210 recv_window_ms: recv_window_ms.unwrap_or(10_000),
211 retry_manager,
212 cancellation_token: CancellationToken::new(),
213 })
214 }
215
216 #[allow(clippy::too_many_arguments)]
223 pub fn with_credentials(
224 api_key: String,
225 api_secret: String,
226 base_url: String,
227 timeout_secs: Option<u64>,
228 max_retries: Option<u32>,
229 retry_delay_ms: Option<u64>,
230 retry_delay_max_ms: Option<u64>,
231 recv_window_ms: Option<u64>,
232 max_requests_per_second: Option<u32>,
233 max_requests_per_minute: Option<u32>,
234 proxy_url: Option<String>,
235 ) -> Result<Self, BitmexHttpError> {
236 let retry_config = RetryConfig {
237 max_retries: max_retries.unwrap_or(3),
238 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
239 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
240 backoff_factor: 2.0,
241 jitter_ms: 1000,
242 operation_timeout_ms: Some(60_000),
243 immediate_first: false,
244 max_elapsed_ms: Some(180_000),
245 };
246
247 let retry_manager = RetryManager::new(retry_config);
248
249 let max_req_per_sec =
250 max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
251 let max_req_per_min =
252 max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED);
253
254 Ok(Self {
255 base_url,
256 client: HttpClient::new(
257 Self::default_headers(),
258 vec![],
259 Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
260 Some(Self::default_quota(max_req_per_sec)),
261 timeout_secs,
262 proxy_url,
263 )
264 .map_err(|e| {
265 BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
266 })?,
267 credential: Some(Credential::new(api_key, api_secret)),
268 recv_window_ms: recv_window_ms.unwrap_or(10_000),
269 retry_manager,
270 cancellation_token: CancellationToken::new(),
271 })
272 }
273
274 fn default_headers() -> HashMap<String, String> {
275 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
276 }
277
278 fn default_quota(max_requests_per_second: u32) -> Quota {
279 Quota::per_second(
280 NonZeroU32::new(max_requests_per_second)
281 .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
282 )
283 }
284
285 fn rate_limiter_quotas(
286 max_requests_per_second: u32,
287 max_requests_per_minute: u32,
288 ) -> Vec<(String, Quota)> {
289 let per_sec_quota = Quota::per_second(
290 NonZeroU32::new(max_requests_per_second)
291 .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
292 );
293 let per_min_quota =
294 Quota::per_minute(NonZeroU32::new(max_requests_per_minute).unwrap_or_else(|| {
295 NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED).unwrap()
296 }));
297
298 vec![
299 (BITMEX_GLOBAL_RATE_KEY.to_string(), per_sec_quota),
300 (BITMEX_MINUTE_RATE_KEY.to_string(), per_min_quota),
301 ]
302 }
303
304 fn rate_limit_keys() -> Vec<Ustr> {
305 RATE_LIMIT_KEYS.clone()
306 }
307
308 pub fn cancel_all_requests(&self) {
310 self.cancellation_token.cancel();
311 }
312
313 pub fn cancellation_token(&self) -> &CancellationToken {
315 &self.cancellation_token
316 }
317
318 fn sign_request(
319 &self,
320 method: &Method,
321 endpoint: &str,
322 body: Option<&[u8]>,
323 ) -> Result<HashMap<String, String>, BitmexHttpError> {
324 let credential = self
325 .credential
326 .as_ref()
327 .ok_or(BitmexHttpError::MissingCredentials)?;
328
329 let expires = Utc::now().timestamp() + (self.recv_window_ms / 1000) as i64;
330 let body_str = body.and_then(|b| std::str::from_utf8(b).ok()).unwrap_or("");
331
332 let full_path = if endpoint.starts_with("/api/v1") {
333 endpoint.to_string()
334 } else {
335 format!("/api/v1{endpoint}")
336 };
337
338 let signature = credential.sign(method.as_str(), &full_path, expires, body_str);
339
340 let mut headers = HashMap::new();
341 headers.insert("api-expires".to_string(), expires.to_string());
342 headers.insert("api-key".to_string(), credential.api_key.to_string());
343 headers.insert("api-signature".to_string(), signature);
344
345 if body.is_some()
347 && (*method == Method::POST || *method == Method::PUT || *method == Method::DELETE)
348 {
349 headers.insert(
350 "Content-Type".to_string(),
351 "application/x-www-form-urlencoded".to_string(),
352 );
353 }
354
355 Ok(headers)
356 }
357
358 async fn send_request<T: DeserializeOwned, P: Serialize>(
359 &self,
360 method: Method,
361 endpoint: &str,
362 params: Option<&P>,
363 body: Option<Vec<u8>>,
364 authenticate: bool,
365 ) -> Result<T, BitmexHttpError> {
366 let endpoint = endpoint.to_string();
367 let method_clone = method.clone();
368 let body_clone = body.clone();
369
370 let params_str = if method == Method::GET || method == Method::DELETE {
373 params
374 .map(serde_urlencoded::to_string)
375 .transpose()
376 .map_err(|e| {
377 BitmexHttpError::JsonError(format!("Failed to serialize params: {e}"))
378 })?
379 } else {
380 None
381 };
382
383 let full_endpoint = match params_str {
384 Some(ref query) if !query.is_empty() => format!("{endpoint}?{query}"),
385 _ => endpoint.clone(),
386 };
387
388 let url = format!("{}{}", self.base_url, full_endpoint);
389
390 let operation = || {
391 let url = url.clone();
392 let method = method_clone.clone();
393 let body = body_clone.clone();
394 let full_endpoint = full_endpoint.clone();
395
396 async move {
397 let headers = if authenticate {
398 Some(self.sign_request(&method, &full_endpoint, body.as_deref())?)
399 } else {
400 None
401 };
402
403 let rate_keys = Self::rate_limit_keys();
404 let resp = self
405 .client
406 .request_with_ustr_keys(method, url, None, headers, body, None, Some(rate_keys))
407 .await?;
408
409 if resp.status.is_success() {
410 serde_json::from_slice(&resp.body).map_err(Into::into)
411 } else if let Ok(error_resp) =
412 serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
413 {
414 Err(error_resp.into())
415 } else {
416 Err(BitmexHttpError::UnexpectedStatus {
417 status: StatusCode::from_u16(resp.status.as_u16())
418 .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
419 body: String::from_utf8_lossy(&resp.body).to_string(),
420 })
421 }
422 }
423 };
424
425 let should_retry = |error: &BitmexHttpError| -> bool {
442 match error {
443 BitmexHttpError::NetworkError(_) => true,
444 BitmexHttpError::UnexpectedStatus { status, .. } => {
445 status.as_u16() >= 500 || status.as_u16() == 429
446 }
447 BitmexHttpError::BitmexError {
448 error_name,
449 message,
450 } => {
451 error_name == "RateLimitError"
452 || (error_name == "HTTPError"
453 && message.to_lowercase().contains("rate limit"))
454 }
455 _ => false,
456 }
457 };
458
459 let create_error = |msg: String| -> BitmexHttpError {
460 if msg == "canceled" {
461 BitmexHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
462 } else {
463 BitmexHttpError::NetworkError(msg)
464 }
465 };
466
467 self.retry_manager
468 .execute_with_retry_with_cancel(
469 endpoint.as_str(),
470 operation,
471 should_retry,
472 create_error,
473 &self.cancellation_token,
474 )
475 .await
476 }
477
478 pub async fn get_instruments(
484 &self,
485 active_only: bool,
486 ) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
487 let path = if active_only {
488 "/instrument/active"
489 } else {
490 "/instrument"
491 };
492 self.send_request::<_, ()>(Method::GET, path, None, None, false)
493 .await
494 }
495
496 pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
506 let response: BitmexApiInfo = self
507 .send_request::<_, ()>(Method::GET, "", None, None, false)
508 .await?;
509 Ok(response.timestamp)
510 }
511
512 pub async fn get_instrument(
523 &self,
524 symbol: &str,
525 ) -> Result<Option<BitmexInstrument>, BitmexHttpError> {
526 let path = &format!("/instrument?symbol={symbol}");
527 let instruments: Vec<BitmexInstrument> = self
528 .send_request::<_, ()>(Method::GET, path, None, None, false)
529 .await?;
530
531 Ok(instruments.into_iter().next())
532 }
533
534 pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
540 let endpoint = "/user/wallet";
541 self.send_request::<_, ()>(Method::GET, endpoint, None, None, true)
542 .await
543 }
544
545 pub async fn get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
551 let path = format!("/user/margin?currency={currency}");
552 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
553 .await
554 }
555
556 pub async fn get_trades(
566 &self,
567 params: GetTradeParams,
568 ) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
569 self.send_request(Method::GET, "/trade", Some(¶ms), None, true)
570 .await
571 }
572
573 pub async fn get_trade_bucketed(
579 &self,
580 params: GetTradeBucketedParams,
581 ) -> Result<Vec<BitmexTradeBin>, BitmexHttpError> {
582 self.send_request(Method::GET, "/trade/bucketed", Some(¶ms), None, true)
583 .await
584 }
585
586 pub async fn get_orders(
596 &self,
597 params: GetOrderParams,
598 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
599 self.send_request(Method::GET, "/order", Some(¶ms), None, true)
600 .await
601 }
602
603 pub async fn place_order(&self, params: PostOrderParams) -> Result<Value, BitmexHttpError> {
613 let body = serde_urlencoded::to_string(¶ms)
615 .map_err(|e| {
616 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
617 })?
618 .into_bytes();
619 let path = "/order";
620 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
621 .await
622 }
623
624 pub async fn cancel_orders(&self, params: DeleteOrderParams) -> Result<Value, BitmexHttpError> {
634 let body = serde_urlencoded::to_string(¶ms)
636 .map_err(|e| {
637 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
638 })?
639 .into_bytes();
640 let path = "/order";
641 self.send_request::<_, ()>(Method::DELETE, path, None, Some(body), true)
642 .await
643 }
644
645 pub async fn amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
655 let body = serde_urlencoded::to_string(¶ms)
657 .map_err(|e| {
658 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
659 })?
660 .into_bytes();
661 let path = "/order";
662 self.send_request::<_, ()>(Method::PUT, path, None, Some(body), true)
663 .await
664 }
665
666 pub async fn cancel_all_orders(
680 &self,
681 params: DeleteAllOrdersParams,
682 ) -> Result<Value, BitmexHttpError> {
683 self.send_request(Method::DELETE, "/order/all", Some(¶ms), None, true)
684 .await
685 }
686
687 pub async fn get_executions(
697 &self,
698 params: GetExecutionParams,
699 ) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
700 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
701 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
702 })?;
703 let path = format!("/execution/tradeHistory?{query}");
704 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
705 .await
706 }
707
708 pub async fn get_positions(
718 &self,
719 params: GetPositionParams,
720 ) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
721 self.send_request(Method::GET, "/position", Some(¶ms), None, true)
722 .await
723 }
724
725 pub async fn update_position_leverage(
735 &self,
736 params: PostPositionLeverageParams,
737 ) -> Result<BitmexPosition, BitmexHttpError> {
738 let body = serde_urlencoded::to_string(¶ms)
740 .map_err(|e| {
741 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
742 })?
743 .into_bytes();
744 let path = "/position/leverage";
745 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
746 .await
747 }
748}
749
750#[derive(Debug)]
755#[cfg_attr(
756 feature = "python",
757 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.bitmex")
758)]
759pub struct BitmexHttpClient {
760 inner: Arc<BitmexRawHttpClient>,
761 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
762 cache_initialized: AtomicBool,
763}
764
765impl Clone for BitmexHttpClient {
766 fn clone(&self) -> Self {
767 let cache_initialized = AtomicBool::new(false);
768
769 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
770 if is_initialized {
771 cache_initialized.store(true, Ordering::Release);
772 }
773
774 Self {
775 inner: self.inner.clone(),
776 instruments_cache: self.instruments_cache.clone(),
777 cache_initialized,
778 }
779 }
780}
781
782impl Default for BitmexHttpClient {
783 fn default() -> Self {
784 Self::new(
785 None,
786 None,
787 None,
788 false,
789 Some(60),
790 None,
791 None,
792 None,
793 None,
794 None,
795 None,
796 None, )
798 .expect("Failed to create default BitmexHttpClient")
799 }
800}
801
802impl BitmexHttpClient {
803 #[allow(clippy::too_many_arguments)]
809 pub fn new(
810 base_url: Option<String>,
811 api_key: Option<String>,
812 api_secret: Option<String>,
813 testnet: bool,
814 timeout_secs: Option<u64>,
815 max_retries: Option<u32>,
816 retry_delay_ms: Option<u64>,
817 retry_delay_max_ms: Option<u64>,
818 recv_window_ms: Option<u64>,
819 max_requests_per_second: Option<u32>,
820 max_requests_per_minute: Option<u32>,
821 proxy_url: Option<String>,
822 ) -> Result<Self, BitmexHttpError> {
823 let url = base_url.unwrap_or_else(|| {
825 if testnet {
826 BITMEX_HTTP_TESTNET_URL.to_string()
827 } else {
828 BITMEX_HTTP_URL.to_string()
829 }
830 });
831
832 let inner = match (api_key, api_secret) {
833 (Some(key), Some(secret)) => BitmexRawHttpClient::with_credentials(
834 key,
835 secret,
836 url,
837 timeout_secs,
838 max_retries,
839 retry_delay_ms,
840 retry_delay_max_ms,
841 recv_window_ms,
842 max_requests_per_second,
843 max_requests_per_minute,
844 proxy_url,
845 )?,
846 _ => BitmexRawHttpClient::new(
847 Some(url),
848 timeout_secs,
849 max_retries,
850 retry_delay_ms,
851 retry_delay_max_ms,
852 recv_window_ms,
853 max_requests_per_second,
854 max_requests_per_minute,
855 proxy_url,
856 )?,
857 };
858
859 Ok(Self {
860 inner: Arc::new(inner),
861 instruments_cache: Arc::new(DashMap::new()),
862 cache_initialized: AtomicBool::new(false),
863 })
864 }
865
866 pub fn from_env() -> anyhow::Result<Self> {
873 Self::with_credentials(
874 None, None, None, None, None, None, None, None, None, None, None,
875 )
876 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
877 }
878
879 #[allow(clippy::too_many_arguments)]
889 pub fn with_credentials(
890 api_key: Option<String>,
891 api_secret: Option<String>,
892 base_url: Option<String>,
893 timeout_secs: Option<u64>,
894 max_retries: Option<u32>,
895 retry_delay_ms: Option<u64>,
896 retry_delay_max_ms: Option<u64>,
897 recv_window_ms: Option<u64>,
898 max_requests_per_second: Option<u32>,
899 max_requests_per_minute: Option<u32>,
900 proxy_url: Option<String>,
901 ) -> anyhow::Result<Self> {
902 let testnet = base_url.as_ref().is_some_and(|url| url.contains("testnet"));
904
905 let (key_var, secret_var) = if testnet {
907 ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
908 } else {
909 ("BITMEX_API_KEY", "BITMEX_API_SECRET")
910 };
911
912 let api_key = get_or_env_var_opt(api_key, key_var);
913 let api_secret = get_or_env_var_opt(api_secret, secret_var);
914
915 if api_key.is_some() && api_secret.is_none() {
917 anyhow::bail!("{secret_var} is required when {key_var} is provided");
918 }
919 if api_key.is_none() && api_secret.is_some() {
920 anyhow::bail!("{key_var} is required when {secret_var} is provided");
921 }
922
923 Self::new(
924 base_url,
925 api_key,
926 api_secret,
927 testnet,
928 timeout_secs,
929 max_retries,
930 retry_delay_ms,
931 retry_delay_max_ms,
932 recv_window_ms,
933 max_requests_per_second,
934 max_requests_per_minute,
935 proxy_url,
936 )
937 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
938 }
939
940 #[must_use]
942 pub fn base_url(&self) -> &str {
943 self.inner.base_url.as_str()
944 }
945
946 #[must_use]
948 pub fn api_key(&self) -> Option<&str> {
949 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
950 }
951
952 #[must_use]
954 pub fn api_key_masked(&self) -> Option<String> {
955 self.inner.credential.as_ref().map(|c| c.api_key_masked())
956 }
957
958 pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
966 self.inner.get_server_time().await
967 }
968
969 fn generate_ts_init(&self) -> UnixNanos {
971 get_atomic_clock_realtime().get_time_ns()
972 }
973
974 fn is_contingent_order(contingency_type: ContingencyType) -> bool {
976 matches!(
977 contingency_type,
978 ContingencyType::Oco | ContingencyType::Oto | ContingencyType::Ouo
979 )
980 }
981
982 fn is_parent_contingency(contingency_type: ContingencyType) -> bool {
984 matches!(
985 contingency_type,
986 ContingencyType::Oco | ContingencyType::Oto
987 )
988 }
989
990 fn populate_linked_order_ids(reports: &mut [OrderStatusReport]) {
992 let mut order_list_groups: HashMap<OrderListId, Vec<ClientOrderId>> = HashMap::new();
993 let mut order_list_parents: HashMap<OrderListId, ClientOrderId> = HashMap::new();
994 let mut prefix_groups: HashMap<String, Vec<ClientOrderId>> = HashMap::new();
995 let mut prefix_parents: HashMap<String, ClientOrderId> = HashMap::new();
996
997 for report in reports.iter() {
998 let Some(client_order_id) = report.client_order_id else {
999 continue;
1000 };
1001
1002 if let Some(order_list_id) = report.order_list_id {
1003 order_list_groups
1004 .entry(order_list_id)
1005 .or_default()
1006 .push(client_order_id);
1007
1008 if Self::is_parent_contingency(report.contingency_type) {
1009 order_list_parents
1010 .entry(order_list_id)
1011 .or_insert(client_order_id);
1012 }
1013 }
1014
1015 if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1016 && Self::is_contingent_order(report.contingency_type)
1017 {
1018 prefix_groups
1019 .entry(base.to_owned())
1020 .or_default()
1021 .push(client_order_id);
1022
1023 if Self::is_parent_contingency(report.contingency_type) {
1024 prefix_parents
1025 .entry(base.to_owned())
1026 .or_insert(client_order_id);
1027 }
1028 }
1029 }
1030
1031 for report in reports.iter_mut() {
1032 let Some(client_order_id) = report.client_order_id else {
1033 continue;
1034 };
1035
1036 if report.linked_order_ids.is_some() {
1037 continue;
1038 }
1039
1040 if !Self::is_contingent_order(report.contingency_type) {
1042 continue;
1043 }
1044
1045 if let Some(order_list_id) = report.order_list_id
1046 && let Some(group) = order_list_groups.get(&order_list_id)
1047 {
1048 let mut linked: Vec<ClientOrderId> = group
1049 .iter()
1050 .copied()
1051 .filter(|candidate| candidate != &client_order_id)
1052 .collect();
1053
1054 if !linked.is_empty() {
1055 if let Some(parent_id) = order_list_parents.get(&order_list_id) {
1056 if client_order_id == *parent_id {
1057 report.parent_order_id = None;
1058 } else {
1059 linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1060 report.parent_order_id = Some(*parent_id);
1061 }
1062 } else {
1063 report.parent_order_id = None;
1064 }
1065
1066 log::trace!(
1067 "BitMEX linked ids sourced from order list id: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}, linked_order_ids={:?}",
1068 client_order_id,
1069 order_list_id,
1070 report.contingency_type,
1071 linked,
1072 );
1073 report.linked_order_ids = Some(linked);
1074 continue;
1075 }
1076
1077 log::trace!(
1078 "BitMEX order list id group had no peers: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}, order_list_group={:?}",
1079 client_order_id,
1080 order_list_id,
1081 report.contingency_type,
1082 group,
1083 );
1084 report.parent_order_id = None;
1085 } else if report.order_list_id.is_none() {
1086 report.parent_order_id = None;
1087 }
1088
1089 if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1090 && let Some(group) = prefix_groups.get(base)
1091 {
1092 let mut linked: Vec<ClientOrderId> = group
1093 .iter()
1094 .copied()
1095 .filter(|candidate| candidate != &client_order_id)
1096 .collect();
1097
1098 if !linked.is_empty() {
1099 if let Some(parent_id) = prefix_parents.get(base) {
1100 if client_order_id == *parent_id {
1101 report.parent_order_id = None;
1102 } else {
1103 linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1104 report.parent_order_id = Some(*parent_id);
1105 }
1106 } else {
1107 report.parent_order_id = None;
1108 }
1109
1110 log::trace!(
1111 "BitMEX linked ids constructed from client order id prefix: client_order_id={:?}, contingency_type={:?}, base={}, linked_order_ids={:?}",
1112 client_order_id,
1113 report.contingency_type,
1114 base,
1115 linked,
1116 );
1117 report.linked_order_ids = Some(linked);
1118 continue;
1119 }
1120
1121 log::trace!(
1122 "BitMEX client order id prefix group had no peers: client_order_id={:?}, contingency_type={:?}, base={}, prefix_group={:?}",
1123 client_order_id,
1124 report.contingency_type,
1125 base,
1126 group,
1127 );
1128 report.parent_order_id = None;
1129 } else if client_order_id.as_str().contains('-') {
1130 report.parent_order_id = None;
1131 }
1132
1133 if Self::is_contingent_order(report.contingency_type) {
1134 log::warn!(
1135 "BitMEX order status report missing linked ids after grouping: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}",
1136 report.client_order_id,
1137 report.order_list_id,
1138 report.contingency_type,
1139 );
1140 report.contingency_type = ContingencyType::NoContingency;
1141 report.parent_order_id = None;
1142 }
1143
1144 report.linked_order_ids = None;
1145 }
1146 }
1147
1148 pub fn cancel_all_requests(&self) {
1150 self.inner.cancel_all_requests();
1151 }
1152
1153 pub fn cancellation_token(&self) -> CancellationToken {
1155 self.inner.cancellation_token().clone()
1156 }
1157
1158 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1162 self.instruments_cache
1163 .insert(instrument.raw_symbol().inner(), instrument);
1164 self.cache_initialized.store(true, Ordering::Release);
1165 }
1166
1167 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1169 self.instruments_cache
1170 .get(symbol)
1171 .map(|entry| entry.value().clone())
1172 }
1173
1174 pub async fn request_instrument(
1182 &self,
1183 instrument_id: InstrumentId,
1184 ) -> anyhow::Result<Option<InstrumentAny>> {
1185 let response = self
1186 .inner
1187 .get_instrument(instrument_id.symbol.as_str())
1188 .await?;
1189
1190 let instrument = match response {
1191 Some(instrument) => instrument,
1192 None => return Ok(None),
1193 };
1194
1195 let ts_init = self.generate_ts_init();
1196
1197 match parse_instrument_any(&instrument, ts_init) {
1198 InstrumentParseResult::Ok(inst) => Ok(Some(*inst)),
1199 InstrumentParseResult::Unsupported {
1200 symbol,
1201 instrument_type,
1202 } => {
1203 log::debug!(
1204 "Instrument {symbol} has unsupported type {instrument_type:?}, returning None"
1205 );
1206 Ok(None)
1207 }
1208 InstrumentParseResult::Inactive { symbol, state } => {
1209 log::debug!("Instrument {symbol} is inactive (state={state}), returning None");
1210 Ok(None)
1211 }
1212 InstrumentParseResult::Failed {
1213 symbol,
1214 instrument_type,
1215 error,
1216 } => {
1217 log::error!(
1218 "Failed to parse instrument {symbol} (type={instrument_type:?}): {error}"
1219 );
1220 Ok(None)
1221 }
1222 }
1223 }
1224
1225 pub async fn request_instruments(
1231 &self,
1232 active_only: bool,
1233 ) -> anyhow::Result<Vec<InstrumentAny>> {
1234 let instruments = self.inner.get_instruments(active_only).await?;
1235 let ts_init = self.generate_ts_init();
1236
1237 let mut parsed_instruments = Vec::new();
1238 let mut skipped_count = 0;
1239 let mut inactive_count = 0;
1240 let mut failed_count = 0;
1241 let total_count = instruments.len();
1242
1243 for inst in instruments {
1244 match parse_instrument_any(&inst, ts_init) {
1245 InstrumentParseResult::Ok(instrument_any) => {
1246 parsed_instruments.push(*instrument_any);
1247 }
1248 InstrumentParseResult::Unsupported {
1249 symbol,
1250 instrument_type,
1251 } => {
1252 skipped_count += 1;
1253 log::debug!(
1254 "Skipping unsupported instrument type: symbol={symbol}, type={instrument_type:?}"
1255 );
1256 }
1257 InstrumentParseResult::Inactive { symbol, state } => {
1258 inactive_count += 1;
1259 log::debug!("Skipping inactive instrument: symbol={symbol}, state={state}");
1260 }
1261 InstrumentParseResult::Failed {
1262 symbol,
1263 instrument_type,
1264 error,
1265 } => {
1266 failed_count += 1;
1267 log::error!(
1268 "Failed to parse instrument: symbol={symbol}, type={instrument_type:?}, error={error}"
1269 );
1270 }
1271 }
1272 }
1273
1274 if skipped_count > 0 {
1275 log::info!(
1276 "Skipped {skipped_count} unsupported instrument type(s) out of {total_count} total"
1277 );
1278 }
1279
1280 if inactive_count > 0 {
1281 log::info!(
1282 "Skipped {inactive_count} inactive instrument(s) out of {total_count} total"
1283 );
1284 }
1285
1286 if failed_count > 0 {
1287 log::error!(
1288 "Instrument parse failures: {failed_count} failed out of {total_count} total ({} successfully parsed)",
1289 parsed_instruments.len()
1290 );
1291 }
1292
1293 Ok(parsed_instruments)
1294 }
1295
1296 pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
1306 let inner = self.inner.clone();
1307 inner.get_wallet().await
1308 }
1309
1310 pub async fn get_orders(
1320 &self,
1321 params: GetOrderParams,
1322 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
1323 let inner = self.inner.clone();
1324 inner.get_orders(params).await
1325 }
1326
1327 fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1333 self.get_instrument(&symbol).ok_or_else(|| {
1334 anyhow::anyhow!(
1335 "Instrument {symbol} not found in cache, ensure instruments loaded first"
1336 )
1337 })
1338 }
1339
1340 pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
1347 self.instrument_from_cache(symbol)
1348 .map(|instrument| instrument.price_precision())
1349 }
1350
1351 pub async fn get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
1357 self.inner
1358 .get_margin(currency)
1359 .await
1360 .map_err(|e| anyhow::anyhow!(e))
1361 }
1362
1363 pub async fn request_account_state(
1369 &self,
1370 account_id: AccountId,
1371 ) -> anyhow::Result<AccountState> {
1372 let margin = self
1374 .inner
1375 .get_margin("XBt")
1376 .await
1377 .map_err(|e| anyhow::anyhow!(e))?;
1378
1379 let ts_init =
1380 UnixNanos::from(chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64);
1381
1382 let margin_msg = BitmexMarginMsg {
1384 account: margin.account,
1385 currency: margin.currency,
1386 risk_limit: margin.risk_limit,
1387 amount: margin.amount,
1388 prev_realised_pnl: margin.prev_realised_pnl,
1389 gross_comm: margin.gross_comm,
1390 gross_open_cost: margin.gross_open_cost,
1391 gross_open_premium: margin.gross_open_premium,
1392 gross_exec_cost: margin.gross_exec_cost,
1393 gross_mark_value: margin.gross_mark_value,
1394 risk_value: margin.risk_value,
1395 init_margin: margin.init_margin,
1396 maint_margin: margin.maint_margin,
1397 target_excess_margin: margin.target_excess_margin,
1398 realised_pnl: margin.realised_pnl,
1399 unrealised_pnl: margin.unrealised_pnl,
1400 wallet_balance: margin.wallet_balance,
1401 margin_balance: margin.margin_balance,
1402 margin_leverage: margin.margin_leverage,
1403 margin_used_pcnt: margin.margin_used_pcnt,
1404 excess_margin: margin.excess_margin,
1405 available_margin: margin.available_margin,
1406 withdrawable_margin: margin.withdrawable_margin,
1407 maker_fee_discount: None, taker_fee_discount: None, timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
1410 foreign_margin_balance: None,
1411 foreign_requirement: None,
1412 };
1413
1414 parse_account_state(&margin_msg, account_id, ts_init)
1415 }
1416
1417 #[allow(clippy::too_many_arguments)]
1424 pub async fn submit_order(
1425 &self,
1426 instrument_id: InstrumentId,
1427 client_order_id: ClientOrderId,
1428 order_side: OrderSide,
1429 order_type: OrderType,
1430 quantity: Quantity,
1431 time_in_force: TimeInForce,
1432 price: Option<Price>,
1433 trigger_price: Option<Price>,
1434 trigger_type: Option<TriggerType>,
1435 display_qty: Option<Quantity>,
1436 post_only: bool,
1437 reduce_only: bool,
1438 order_list_id: Option<OrderListId>,
1439 contingency_type: Option<ContingencyType>,
1440 ) -> anyhow::Result<OrderStatusReport> {
1441 use crate::common::enums::{
1442 BitmexExecInstruction, BitmexOrderType, BitmexSide, BitmexTimeInForce,
1443 };
1444
1445 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1446
1447 let mut params = super::query::PostOrderParamsBuilder::default();
1448 params.text(NAUTILUS_TRADER);
1449 params.symbol(instrument_id.symbol.as_str());
1450 params.cl_ord_id(client_order_id.as_str());
1451
1452 let side = BitmexSide::try_from_order_side(order_side)?;
1453 params.side(side);
1454
1455 let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
1456 params.ord_type(ord_type);
1457
1458 params.order_qty(quantity_to_u32(&quantity, &instrument));
1459
1460 let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
1461 params.time_in_force(tif);
1462
1463 if let Some(price) = price {
1464 params.price(price.as_f64());
1465 }
1466
1467 if let Some(trigger_price) = trigger_price {
1468 params.stop_px(trigger_price.as_f64());
1469 }
1470
1471 if let Some(display_qty) = display_qty {
1472 params.display_qty(quantity_to_u32(&display_qty, &instrument));
1473 }
1474
1475 if let Some(order_list_id) = order_list_id {
1476 params.cl_ord_link_id(order_list_id.as_str());
1477 }
1478
1479 let mut exec_inst = Vec::new();
1480
1481 if post_only {
1482 exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
1483 }
1484
1485 if reduce_only {
1486 exec_inst.push(BitmexExecInstruction::ReduceOnly);
1487 }
1488
1489 if trigger_price.is_some()
1490 && let Some(trigger_type) = trigger_type
1491 {
1492 match trigger_type {
1493 TriggerType::LastPrice => exec_inst.push(BitmexExecInstruction::LastPrice),
1494 TriggerType::MarkPrice => exec_inst.push(BitmexExecInstruction::MarkPrice),
1495 TriggerType::IndexPrice => exec_inst.push(BitmexExecInstruction::IndexPrice),
1496 _ => {} }
1498 }
1499
1500 if !exec_inst.is_empty() {
1501 params.exec_inst(exec_inst);
1502 }
1503
1504 if let Some(contingency_type) = contingency_type {
1505 let bitmex_contingency = BitmexContingencyType::try_from(contingency_type)?;
1506 params.contingency_type(bitmex_contingency);
1507 }
1508
1509 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1510
1511 let response = self.inner.place_order(params).await?;
1512
1513 let order: BitmexOrder = serde_json::from_value(response)?;
1514
1515 if order.ord_status == Some(BitmexOrderStatus::Rejected) {
1516 let reason = order
1517 .ord_rej_reason
1518 .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1519 anyhow::bail!("Order rejected: {reason}");
1520 }
1521
1522 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1523 let ts_init = self.generate_ts_init();
1524
1525 parse_order_status_report(&order, &instrument, ts_init)
1526 }
1527
1528 pub async fn cancel_order(
1538 &self,
1539 instrument_id: InstrumentId,
1540 client_order_id: Option<ClientOrderId>,
1541 venue_order_id: Option<VenueOrderId>,
1542 ) -> anyhow::Result<OrderStatusReport> {
1543 let mut params = super::query::DeleteOrderParamsBuilder::default();
1544 params.text(NAUTILUS_TRADER);
1545
1546 if let Some(venue_order_id) = venue_order_id {
1547 params.order_id(vec![venue_order_id.as_str().to_string()]);
1548 } else if let Some(client_order_id) = client_order_id {
1549 params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
1550 } else {
1551 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1552 }
1553
1554 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1555
1556 let response = self.inner.cancel_orders(params).await?;
1557
1558 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1559 let order = orders
1560 .into_iter()
1561 .next()
1562 .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1563
1564 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1565 let ts_init = self.generate_ts_init();
1566
1567 parse_order_status_report(&order, &instrument, ts_init)
1568 }
1569
1570 pub async fn cancel_orders(
1580 &self,
1581 instrument_id: InstrumentId,
1582 client_order_ids: Option<Vec<ClientOrderId>>,
1583 venue_order_ids: Option<Vec<VenueOrderId>>,
1584 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1585 let mut params = super::query::DeleteOrderParamsBuilder::default();
1586 params.text(NAUTILUS_TRADER);
1587
1588 if let Some(venue_order_ids) = venue_order_ids {
1591 if venue_order_ids.is_empty() {
1592 anyhow::bail!("venue_order_ids cannot be empty");
1593 }
1594 params.order_id(
1595 venue_order_ids
1596 .iter()
1597 .map(|id| id.to_string())
1598 .collect::<Vec<_>>(),
1599 );
1600 } else if let Some(client_order_ids) = client_order_ids {
1601 if client_order_ids.is_empty() {
1602 anyhow::bail!("client_order_ids cannot be empty");
1603 }
1604 params.cl_ord_id(
1605 client_order_ids
1606 .iter()
1607 .map(|id| id.to_string())
1608 .collect::<Vec<_>>(),
1609 );
1610 } else {
1611 anyhow::bail!("Either client_order_ids or venue_order_ids must be provided");
1612 }
1613
1614 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1615
1616 let response = self.inner.cancel_orders(params).await?;
1617
1618 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1619
1620 let ts_init = self.generate_ts_init();
1621 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1622
1623 let mut reports = Vec::new();
1624
1625 for order in orders {
1626 reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1627 }
1628
1629 Self::populate_linked_order_ids(&mut reports);
1630
1631 Ok(reports)
1632 }
1633
1634 pub async fn cancel_all_orders(
1644 &self,
1645 instrument_id: InstrumentId,
1646 order_side: Option<OrderSide>,
1647 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1648 let mut params = DeleteAllOrdersParamsBuilder::default();
1649 params.text(NAUTILUS_TRADER);
1650 params.symbol(instrument_id.symbol.as_str());
1651
1652 if let Some(side) = order_side {
1653 let side = BitmexSide::try_from_order_side(side)?;
1654 params.filter(serde_json::json!({
1655 "side": side
1656 }));
1657 }
1658
1659 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1660
1661 let response = self.inner.cancel_all_orders(params).await?;
1662
1663 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1664
1665 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1666 let ts_init = self.generate_ts_init();
1667
1668 let mut reports = Vec::new();
1669
1670 for order in orders {
1671 reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1672 }
1673
1674 Self::populate_linked_order_ids(&mut reports);
1675
1676 Ok(reports)
1677 }
1678
1679 pub async fn modify_order(
1690 &self,
1691 instrument_id: InstrumentId,
1692 client_order_id: Option<ClientOrderId>,
1693 venue_order_id: Option<VenueOrderId>,
1694 quantity: Option<Quantity>,
1695 price: Option<Price>,
1696 trigger_price: Option<Price>,
1697 ) -> anyhow::Result<OrderStatusReport> {
1698 let mut params = PutOrderParamsBuilder::default();
1699 params.text(NAUTILUS_TRADER);
1700
1701 if let Some(venue_order_id) = venue_order_id {
1703 params.order_id(venue_order_id.as_str());
1704 } else if let Some(client_order_id) = client_order_id {
1705 params.orig_cl_ord_id(client_order_id.as_str());
1706 } else {
1707 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1708 }
1709
1710 if let Some(quantity) = quantity {
1711 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1712 params.order_qty(quantity_to_u32(&quantity, &instrument));
1713 }
1714
1715 if let Some(price) = price {
1716 params.price(price.as_f64());
1717 }
1718
1719 if let Some(trigger_price) = trigger_price {
1720 params.stop_px(trigger_price.as_f64());
1721 }
1722
1723 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1724
1725 let response = self.inner.amend_order(params).await?;
1726
1727 let order: BitmexOrder = serde_json::from_value(response)?;
1728
1729 if order.ord_status == Some(BitmexOrderStatus::Rejected) {
1730 let reason = order
1731 .ord_rej_reason
1732 .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1733 anyhow::bail!("Order modification rejected: {reason}");
1734 }
1735
1736 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1737 let ts_init = self.generate_ts_init();
1738
1739 parse_order_status_report(&order, &instrument, ts_init)
1740 }
1741
1742 pub async fn query_order(
1751 &self,
1752 instrument_id: InstrumentId,
1753 client_order_id: Option<ClientOrderId>,
1754 venue_order_id: Option<VenueOrderId>,
1755 ) -> anyhow::Result<Option<OrderStatusReport>> {
1756 let mut params = GetOrderParamsBuilder::default();
1757
1758 let filter_json = if let Some(client_order_id) = client_order_id {
1759 serde_json::json!({
1760 "clOrdID": client_order_id.to_string()
1761 })
1762 } else if let Some(venue_order_id) = venue_order_id {
1763 serde_json::json!({
1764 "orderID": venue_order_id.to_string()
1765 })
1766 } else {
1767 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1768 };
1769
1770 params.filter(filter_json);
1771 params.count(1); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1774
1775 let response = self.inner.get_orders(params).await?;
1776
1777 if response.is_empty() {
1778 return Ok(None);
1779 }
1780
1781 let order = &response[0];
1782
1783 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1784 let ts_init = self.generate_ts_init();
1785
1786 let report = parse_order_status_report(order, &instrument, ts_init)?;
1787
1788 Ok(Some(report))
1789 }
1790
1791 pub async fn request_order_status_report(
1800 &self,
1801 instrument_id: InstrumentId,
1802 client_order_id: Option<ClientOrderId>,
1803 venue_order_id: Option<VenueOrderId>,
1804 ) -> anyhow::Result<OrderStatusReport> {
1805 let mut params = GetOrderParamsBuilder::default();
1806 params.symbol(instrument_id.symbol.as_str());
1807
1808 if let Some(venue_order_id) = venue_order_id {
1809 params.filter(serde_json::json!({
1810 "orderID": venue_order_id.as_str()
1811 }));
1812 } else if let Some(client_order_id) = client_order_id {
1813 params.filter(serde_json::json!({
1814 "clOrdID": client_order_id.as_str()
1815 }));
1816 }
1817
1818 params.count(1i32);
1819 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1820
1821 let response = self.inner.get_orders(params).await?;
1822
1823 let order = response
1824 .into_iter()
1825 .next()
1826 .ok_or_else(|| anyhow::anyhow!("Order not found"))?;
1827
1828 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1829 let ts_init = self.generate_ts_init();
1830
1831 parse_order_status_report(&order, &instrument, ts_init)
1832 }
1833
1834 pub async fn request_order_status_reports(
1843 &self,
1844 instrument_id: Option<InstrumentId>,
1845 open_only: bool,
1846 limit: Option<u32>,
1847 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1848 let mut params = GetOrderParamsBuilder::default();
1849
1850 if let Some(instrument_id) = &instrument_id {
1851 params.symbol(instrument_id.symbol.as_str());
1852 }
1853
1854 if open_only {
1855 params.filter(serde_json::json!({
1856 "open": true
1857 }));
1858 }
1859
1860 if let Some(limit) = limit {
1861 params.count(limit as i32);
1862 } else {
1863 params.count(500); }
1865
1866 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1869
1870 let response = self.inner.get_orders(params).await?;
1871
1872 let ts_init = self.generate_ts_init();
1873
1874 let mut reports = Vec::new();
1875
1876 for order in response {
1877 let Some(symbol) = order.symbol else {
1879 log::warn!("Order response missing symbol, skipping");
1880 continue;
1881 };
1882
1883 let Ok(instrument) = self.instrument_from_cache(symbol) else {
1884 log::debug!("Skipping order report for instrument not in cache: symbol={symbol}");
1885 continue;
1886 };
1887
1888 match parse_order_status_report(&order, &instrument, ts_init) {
1889 Ok(report) => reports.push(report),
1890 Err(e) => log::error!("Failed to parse order status report: {e}"),
1891 }
1892 }
1893
1894 Self::populate_linked_order_ids(&mut reports);
1895
1896 Ok(reports)
1897 }
1898
1899 pub async fn request_trades(
1905 &self,
1906 instrument_id: InstrumentId,
1907 start: Option<DateTime<Utc>>,
1908 end: Option<DateTime<Utc>>,
1909 limit: Option<u32>,
1910 ) -> anyhow::Result<Vec<TradeTick>> {
1911 let mut params = GetTradeParamsBuilder::default();
1912 params.symbol(instrument_id.symbol.as_str());
1913
1914 if let Some(start) = start {
1915 params.start_time(start);
1916 }
1917
1918 if let Some(end) = end {
1919 params.end_time(end);
1920 }
1921
1922 if let (Some(start), Some(end)) = (start, end) {
1923 anyhow::ensure!(
1924 start < end,
1925 "Invalid time range: start={start:?} end={end:?}",
1926 );
1927 }
1928
1929 if let Some(limit) = limit {
1930 let clamped_limit = limit.min(1000);
1931 if limit > 1000 {
1932 log::warn!(
1933 "BitMEX trade request limit exceeds venue maximum; clamping: limit={limit}, clamped_limit={clamped_limit}",
1934 );
1935 }
1936 params.count(i32::try_from(clamped_limit).unwrap_or(1000));
1937 }
1938 params.reverse(false);
1939 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1940
1941 let response = self.inner.get_trades(params).await?;
1942
1943 let ts_init = self.generate_ts_init();
1944
1945 let mut parsed_trades = Vec::new();
1946
1947 for trade in response {
1948 if let Some(start) = start
1949 && trade.timestamp < start
1950 {
1951 continue;
1952 }
1953
1954 if let Some(end) = end
1955 && trade.timestamp > end
1956 {
1957 continue;
1958 }
1959
1960 let price_precision = self.get_price_precision(trade.symbol)?;
1961
1962 match parse_trade(trade, price_precision, ts_init) {
1963 Ok(trade) => parsed_trades.push(trade),
1964 Err(e) => log::error!("Failed to parse trade: {e}"),
1965 }
1966 }
1967
1968 Ok(parsed_trades)
1969 }
1970
1971 pub async fn request_bars(
1978 &self,
1979 mut bar_type: BarType,
1980 start: Option<DateTime<Utc>>,
1981 end: Option<DateTime<Utc>>,
1982 limit: Option<u32>,
1983 partial: bool,
1984 ) -> anyhow::Result<Vec<Bar>> {
1985 bar_type = bar_type.standard();
1986
1987 anyhow::ensure!(
1988 bar_type.aggregation_source() == AggregationSource::External,
1989 "Only EXTERNAL aggregation bars are supported"
1990 );
1991 anyhow::ensure!(
1992 bar_type.spec().price_type == PriceType::Last,
1993 "Only LAST price type bars are supported"
1994 );
1995 if let (Some(start), Some(end)) = (start, end) {
1996 anyhow::ensure!(
1997 start < end,
1998 "Invalid time range: start={start:?} end={end:?}"
1999 );
2000 }
2001
2002 let spec = bar_type.spec();
2003 let bin_size = match (spec.aggregation, spec.step.get()) {
2004 (BarAggregation::Minute, 1) => "1m",
2005 (BarAggregation::Minute, 5) => "5m",
2006 (BarAggregation::Hour, 1) => "1h",
2007 (BarAggregation::Day, 1) => "1d",
2008 _ => anyhow::bail!(
2009 "BitMEX does not support {}-{:?}-{:?} bars",
2010 spec.step.get(),
2011 spec.aggregation,
2012 spec.price_type,
2013 ),
2014 };
2015
2016 let instrument_id = bar_type.instrument_id();
2017 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2018
2019 let mut params = GetTradeBucketedParamsBuilder::default();
2020 params.symbol(instrument_id.symbol.as_str());
2021 params.bin_size(bin_size);
2022 if partial {
2023 params.partial(true);
2024 }
2025 if let Some(start) = start {
2026 params.start_time(start);
2027 }
2028 if let Some(end) = end {
2029 params.end_time(end);
2030 }
2031 if let Some(limit) = limit {
2032 let clamped_limit = limit.min(1000);
2033 if limit > 1000 {
2034 log::warn!(
2035 "BitMEX bar request limit exceeds venue maximum; clamping: limit={limit}, clamped_limit={clamped_limit}",
2036 );
2037 }
2038 params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2039 }
2040 params.reverse(false);
2041 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2042
2043 let response = self.inner.get_trade_bucketed(params).await?;
2044 let ts_init = self.generate_ts_init();
2045 let mut bars = Vec::new();
2046
2047 for bin in response {
2048 if let Some(start) = start
2049 && bin.timestamp < start
2050 {
2051 continue;
2052 }
2053 if let Some(end) = end
2054 && bin.timestamp > end
2055 {
2056 continue;
2057 }
2058 if bin.symbol != instrument_id.symbol.inner() {
2059 log::warn!(
2060 "Skipping trade bin for unexpected symbol: symbol={}, expected={}",
2061 bin.symbol,
2062 instrument_id.symbol,
2063 );
2064 continue;
2065 }
2066
2067 match parse_trade_bin(bin, &instrument, &bar_type, ts_init) {
2068 Ok(bar) => bars.push(bar),
2069 Err(e) => log::warn!("Failed to parse trade bin: {e}"),
2070 }
2071 }
2072
2073 Ok(bars)
2074 }
2075
2076 pub async fn request_fill_reports(
2082 &self,
2083 instrument_id: Option<InstrumentId>,
2084 limit: Option<u32>,
2085 ) -> anyhow::Result<Vec<FillReport>> {
2086 let mut params = GetExecutionParamsBuilder::default();
2087 if let Some(instrument_id) = instrument_id {
2088 params.symbol(instrument_id.symbol.as_str());
2089 }
2090 if let Some(limit) = limit {
2091 params.count(limit as i32);
2092 } else {
2093 params.count(500); }
2095 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2098
2099 let response = self.inner.get_executions(params).await?;
2100
2101 let ts_init = self.generate_ts_init();
2102
2103 let mut reports = Vec::new();
2104
2105 for exec in response {
2106 let Some(symbol) = exec.symbol else {
2108 log::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
2109 continue;
2110 };
2111 let symbol_str = symbol.to_string();
2112
2113 let instrument = match self.instrument_from_cache(symbol) {
2114 Ok(instrument) => instrument,
2115 Err(e) => {
2116 log::error!(
2117 "Instrument not found in cache for execution parsing: symbol={symbol_str}, {e}"
2118 );
2119 continue;
2120 }
2121 };
2122
2123 match parse_fill_report(exec, &instrument, ts_init) {
2124 Ok(report) => reports.push(report),
2125 Err(e) => {
2126 let error_msg = e.to_string();
2128 if error_msg.starts_with("Skipping non-trade execution")
2129 || error_msg.starts_with("Skipping execution without order_id")
2130 {
2131 log::debug!("{e}");
2132 } else {
2133 log::error!("Failed to parse fill report: {e}");
2134 }
2135 }
2136 }
2137 }
2138
2139 Ok(reports)
2140 }
2141
2142 pub async fn request_position_status_reports(
2148 &self,
2149 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2150 let params = GetPositionParamsBuilder::default()
2151 .count(500) .build()
2153 .map_err(|e| anyhow::anyhow!(e))?;
2154
2155 let response = self.inner.get_positions(params).await?;
2156
2157 let ts_init = self.generate_ts_init();
2158
2159 let mut reports = Vec::new();
2160
2161 for pos in response {
2162 let symbol = pos.symbol;
2163 let instrument = match self.instrument_from_cache(symbol) {
2164 Ok(instrument) => instrument,
2165 Err(e) => {
2166 log::error!(
2167 "Instrument not found in cache for position parsing: symbol={}, {e}",
2168 pos.symbol.as_str(),
2169 );
2170 continue;
2171 }
2172 };
2173
2174 match parse_position_report(pos, &instrument, ts_init) {
2175 Ok(report) => reports.push(report),
2176 Err(e) => log::error!("Failed to parse position report: {e}"),
2177 }
2178 }
2179
2180 Ok(reports)
2181 }
2182
2183 pub async fn update_position_leverage(
2191 &self,
2192 symbol: &str,
2193 leverage: f64,
2194 ) -> anyhow::Result<PositionStatusReport> {
2195 let params = PostPositionLeverageParams {
2196 symbol: symbol.to_string(),
2197 leverage,
2198 target_account_id: None,
2199 };
2200
2201 let response = self.inner.update_position_leverage(params).await?;
2202
2203 let instrument = self.instrument_from_cache(Ustr::from(symbol))?;
2204 let ts_init = self.generate_ts_init();
2205
2206 parse_position_report(response, &instrument, ts_init)
2207 }
2208}
2209
2210#[cfg(test)]
2211mod tests {
2212 use nautilus_core::UUID4;
2213 use nautilus_model::enums::OrderStatus;
2214 use rstest::rstest;
2215 use serde_json::json;
2216
2217 use super::*;
2218
2219 fn build_report(
2220 client_order_id: &str,
2221 venue_order_id: &str,
2222 contingency_type: ContingencyType,
2223 order_list_id: Option<&str>,
2224 ) -> OrderStatusReport {
2225 let mut report = OrderStatusReport::new(
2226 AccountId::from("BITMEX-1"),
2227 InstrumentId::from("XBTUSD.BITMEX"),
2228 Some(ClientOrderId::from(client_order_id)),
2229 VenueOrderId::from(venue_order_id),
2230 OrderSide::Buy,
2231 OrderType::Limit,
2232 TimeInForce::Gtc,
2233 OrderStatus::Accepted,
2234 Quantity::new(100.0, 0),
2235 Quantity::default(),
2236 UnixNanos::from(1_u64),
2237 UnixNanos::from(1_u64),
2238 UnixNanos::from(1_u64),
2239 Some(UUID4::new()),
2240 );
2241
2242 if let Some(id) = order_list_id {
2243 report = report.with_order_list_id(OrderListId::from(id));
2244 }
2245
2246 report.with_contingency_type(contingency_type)
2247 }
2248
2249 #[rstest]
2250 fn test_sign_request_generates_correct_headers() {
2251 let client = BitmexRawHttpClient::with_credentials(
2252 "test_api_key".to_string(),
2253 "test_api_secret".to_string(),
2254 "http://localhost:8080".to_string(),
2255 Some(60),
2256 None, None, None, None, None, None, None, )
2264 .expect("Failed to create test client");
2265
2266 let headers = client
2267 .sign_request(&Method::GET, "/api/v1/order", None)
2268 .unwrap();
2269
2270 assert!(headers.contains_key("api-key"));
2271 assert!(headers.contains_key("api-signature"));
2272 assert!(headers.contains_key("api-expires"));
2273 assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
2274 }
2275
2276 #[rstest]
2277 fn test_sign_request_with_body() {
2278 let client = BitmexRawHttpClient::with_credentials(
2279 "test_api_key".to_string(),
2280 "test_api_secret".to_string(),
2281 "http://localhost:8080".to_string(),
2282 Some(60),
2283 None, None, None, None, None, None, None, )
2291 .expect("Failed to create test client");
2292
2293 let body = json!({"symbol": "XBTUSD", "orderQty": 100});
2294 let body_bytes = serde_json::to_vec(&body).unwrap();
2295
2296 let headers_without_body = client
2297 .sign_request(&Method::POST, "/api/v1/order", None)
2298 .unwrap();
2299 let headers_with_body = client
2300 .sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
2301 .unwrap();
2302
2303 assert_ne!(
2305 headers_without_body.get("api-signature").unwrap(),
2306 headers_with_body.get("api-signature").unwrap()
2307 );
2308 }
2309
2310 #[rstest]
2311 fn test_sign_request_uses_custom_recv_window() {
2312 let client_default = BitmexRawHttpClient::with_credentials(
2313 "test_api_key".to_string(),
2314 "test_api_secret".to_string(),
2315 "http://localhost:8080".to_string(),
2316 Some(60),
2317 None,
2318 None,
2319 None,
2320 None, None, None, None, )
2325 .expect("Failed to create test client");
2326
2327 let client_custom = BitmexRawHttpClient::with_credentials(
2328 "test_api_key".to_string(),
2329 "test_api_secret".to_string(),
2330 "http://localhost:8080".to_string(),
2331 Some(60),
2332 None,
2333 None,
2334 None,
2335 Some(30_000), None, None, None, )
2340 .expect("Failed to create test client");
2341
2342 let headers_default = client_default
2343 .sign_request(&Method::GET, "/api/v1/order", None)
2344 .unwrap();
2345 let headers_custom = client_custom
2346 .sign_request(&Method::GET, "/api/v1/order", None)
2347 .unwrap();
2348
2349 let expires_default: i64 = headers_default.get("api-expires").unwrap().parse().unwrap();
2351 let expires_custom: i64 = headers_custom.get("api-expires").unwrap().parse().unwrap();
2352
2353 let now = Utc::now().timestamp();
2355 assert!(expires_default > now);
2356 assert!(expires_custom > now);
2357
2358 assert!(expires_custom > expires_default);
2360
2361 let diff = expires_custom - expires_default;
2364 assert!((18..=25).contains(&diff));
2365 }
2366
2367 #[rstest]
2368 fn test_populate_linked_order_ids_from_order_list() {
2369 let base = "O-20250922-002219-001-000";
2370 let entry = format!("{base}-1");
2371 let stop = format!("{base}-2");
2372 let take = format!("{base}-3");
2373
2374 let mut reports = vec![
2375 build_report(&entry, "V-1", ContingencyType::Oto, Some("OL-1")),
2376 build_report(&stop, "V-2", ContingencyType::Ouo, Some("OL-1")),
2377 build_report(&take, "V-3", ContingencyType::Ouo, Some("OL-1")),
2378 ];
2379
2380 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2381
2382 assert_eq!(
2383 reports[0].linked_order_ids,
2384 Some(vec![
2385 ClientOrderId::from(stop.as_str()),
2386 ClientOrderId::from(take.as_str()),
2387 ]),
2388 );
2389 assert_eq!(
2390 reports[1].linked_order_ids,
2391 Some(vec![
2392 ClientOrderId::from(entry.as_str()),
2393 ClientOrderId::from(take.as_str()),
2394 ]),
2395 );
2396 assert_eq!(
2397 reports[2].linked_order_ids,
2398 Some(vec![
2399 ClientOrderId::from(entry.as_str()),
2400 ClientOrderId::from(stop.as_str()),
2401 ]),
2402 );
2403 }
2404
2405 #[rstest]
2406 fn test_populate_linked_order_ids_from_id_prefix() {
2407 let base = "O-20250922-002220-001-000";
2408 let entry = format!("{base}-1");
2409 let stop = format!("{base}-2");
2410 let take = format!("{base}-3");
2411
2412 let mut reports = vec![
2413 build_report(&entry, "V-1", ContingencyType::Oto, None),
2414 build_report(&stop, "V-2", ContingencyType::Ouo, None),
2415 build_report(&take, "V-3", ContingencyType::Ouo, None),
2416 ];
2417
2418 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2419
2420 assert_eq!(
2421 reports[0].linked_order_ids,
2422 Some(vec![
2423 ClientOrderId::from(stop.as_str()),
2424 ClientOrderId::from(take.as_str()),
2425 ]),
2426 );
2427 assert_eq!(
2428 reports[1].linked_order_ids,
2429 Some(vec![
2430 ClientOrderId::from(entry.as_str()),
2431 ClientOrderId::from(take.as_str()),
2432 ]),
2433 );
2434 assert_eq!(
2435 reports[2].linked_order_ids,
2436 Some(vec![
2437 ClientOrderId::from(entry.as_str()),
2438 ClientOrderId::from(stop.as_str()),
2439 ]),
2440 );
2441 }
2442
2443 #[rstest]
2444 fn test_populate_linked_order_ids_respects_non_contingent_orders() {
2445 let base = "O-20250922-002221-001-000";
2446 let entry = format!("{base}-1");
2447 let passive = format!("{base}-2");
2448
2449 let mut reports = vec![
2450 build_report(&entry, "V-1", ContingencyType::NoContingency, None),
2451 build_report(&passive, "V-2", ContingencyType::Ouo, None),
2452 ];
2453
2454 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2455
2456 assert!(reports[0].linked_order_ids.is_none());
2458
2459 assert!(reports[1].linked_order_ids.is_none());
2461 assert_eq!(reports[1].contingency_type, ContingencyType::NoContingency);
2462 }
2463}