1use std::{
26 collections::HashMap,
27 num::NonZeroU32,
28 sync::{
29 Arc, LazyLock,
30 atomic::{AtomicBool, Ordering},
31 },
32};
33
34use chrono::{DateTime, Utc};
35use dashmap::DashMap;
36use nautilus_core::{
37 UnixNanos,
38 consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
39 env::get_or_env_var_opt,
40 time::get_atomic_clock_realtime,
41};
42use nautilus_model::{
43 data::{Bar, BarType, TradeTick},
44 enums::{
45 AggregationSource, BarAggregation, ContingencyType, OrderSide, OrderType, PriceType,
46 TimeInForce, TriggerType,
47 },
48 events::AccountState,
49 identifiers::{AccountId, ClientOrderId, InstrumentId, OrderListId, VenueOrderId},
50 instruments::{Instrument as InstrumentTrait, InstrumentAny},
51 reports::{FillReport, OrderStatusReport, PositionStatusReport},
52 types::{Price, Quantity},
53};
54use nautilus_network::{
55 http::{HttpClient, Method, StatusCode, USER_AGENT},
56 ratelimiter::quota::Quota,
57 retry::{RetryConfig, RetryManager},
58};
59use serde::{Deserialize, Serialize, de::DeserializeOwned};
60use serde_json::Value;
61use tokio_util::sync::CancellationToken;
62use ustr::Ustr;
63
64use super::{
65 error::{BitmexErrorResponse, BitmexHttpError},
66 models::{
67 BitmexApiInfo, BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder,
68 BitmexPosition, BitmexTrade, BitmexTradeBin, BitmexWallet,
69 },
70 query::{
71 DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
72 GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeBucketedParams,
73 GetTradeBucketedParamsBuilder, GetTradeParams, GetTradeParamsBuilder, PostOrderParams,
74 PostPositionLeverageParams, PutOrderParams,
75 },
76};
77use crate::{
78 common::{
79 consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL},
80 credential::Credential,
81 enums::{BitmexContingencyType, BitmexOrderStatus, BitmexSide},
82 parse::{parse_account_state, quantity_to_u32},
83 },
84 http::{
85 parse::{
86 InstrumentParseResult, parse_fill_report, parse_instrument_any,
87 parse_order_status_report, parse_position_report, parse_trade, parse_trade_bin,
88 },
89 query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
90 },
91 websocket::messages::BitmexMarginMsg,
92};
93
94const BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 10;
100const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED: u32 = 120;
101const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED: u32 = 30;
102
103const BITMEX_GLOBAL_RATE_KEY: &str = "bitmex:global";
104const BITMEX_MINUTE_RATE_KEY: &str = "bitmex:minute";
105
106static RATE_LIMIT_KEYS: LazyLock<Vec<Ustr>> = LazyLock::new(|| {
107 vec![
108 Ustr::from(BITMEX_GLOBAL_RATE_KEY),
109 Ustr::from(BITMEX_MINUTE_RATE_KEY),
110 ]
111});
112
113#[derive(Debug, Serialize, Deserialize)]
115pub struct BitmexResponse<T> {
116 pub data: Vec<T>,
118}
119
120#[derive(Debug, Clone)]
140pub struct BitmexRawHttpClient {
141 base_url: String,
142 client: HttpClient,
143 credential: Option<Credential>,
144 recv_window_ms: u64,
145 retry_manager: RetryManager<BitmexHttpError>,
146 cancellation_token: CancellationToken,
147}
148
149impl Default for BitmexRawHttpClient {
150 fn default() -> Self {
151 Self::new(None, Some(60), None, None, None, None, None, None, None)
152 .expect("Failed to create default BitmexHttpInnerClient")
153 }
154}
155
156impl BitmexRawHttpClient {
157 #[allow(clippy::too_many_arguments)]
167 pub fn new(
168 base_url: Option<String>,
169 timeout_secs: Option<u64>,
170 max_retries: Option<u32>,
171 retry_delay_ms: Option<u64>,
172 retry_delay_max_ms: Option<u64>,
173 recv_window_ms: Option<u64>,
174 max_requests_per_second: Option<u32>,
175 max_requests_per_minute: Option<u32>,
176 proxy_url: Option<String>,
177 ) -> Result<Self, BitmexHttpError> {
178 let retry_config = RetryConfig {
179 max_retries: max_retries.unwrap_or(3),
180 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
181 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
182 backoff_factor: 2.0,
183 jitter_ms: 1000,
184 operation_timeout_ms: Some(60_000),
185 immediate_first: false,
186 max_elapsed_ms: Some(180_000),
187 };
188
189 let retry_manager = RetryManager::new(retry_config);
190
191 let max_req_per_sec =
192 max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
193 let max_req_per_min =
194 max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED);
195
196 Ok(Self {
197 base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
198 client: HttpClient::new(
199 Self::default_headers(),
200 vec![],
201 Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
202 Some(Self::default_quota(max_req_per_sec)),
203 timeout_secs,
204 proxy_url,
205 )
206 .map_err(|e| {
207 BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
208 })?,
209 credential: None,
210 recv_window_ms: recv_window_ms.unwrap_or(10_000),
211 retry_manager,
212 cancellation_token: CancellationToken::new(),
213 })
214 }
215
216 #[allow(clippy::too_many_arguments)]
223 pub fn with_credentials(
224 api_key: String,
225 api_secret: String,
226 base_url: String,
227 timeout_secs: Option<u64>,
228 max_retries: Option<u32>,
229 retry_delay_ms: Option<u64>,
230 retry_delay_max_ms: Option<u64>,
231 recv_window_ms: Option<u64>,
232 max_requests_per_second: Option<u32>,
233 max_requests_per_minute: Option<u32>,
234 proxy_url: Option<String>,
235 ) -> Result<Self, BitmexHttpError> {
236 let retry_config = RetryConfig {
237 max_retries: max_retries.unwrap_or(3),
238 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
239 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
240 backoff_factor: 2.0,
241 jitter_ms: 1000,
242 operation_timeout_ms: Some(60_000),
243 immediate_first: false,
244 max_elapsed_ms: Some(180_000),
245 };
246
247 let retry_manager = RetryManager::new(retry_config);
248
249 let max_req_per_sec =
250 max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
251 let max_req_per_min =
252 max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED);
253
254 Ok(Self {
255 base_url,
256 client: HttpClient::new(
257 Self::default_headers(),
258 vec![],
259 Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
260 Some(Self::default_quota(max_req_per_sec)),
261 timeout_secs,
262 proxy_url,
263 )
264 .map_err(|e| {
265 BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
266 })?,
267 credential: Some(Credential::new(api_key, api_secret)),
268 recv_window_ms: recv_window_ms.unwrap_or(10_000),
269 retry_manager,
270 cancellation_token: CancellationToken::new(),
271 })
272 }
273
274 fn default_headers() -> HashMap<String, String> {
275 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
276 }
277
278 fn default_quota(max_requests_per_second: u32) -> Quota {
279 Quota::per_second(
280 NonZeroU32::new(max_requests_per_second)
281 .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
282 )
283 }
284
285 fn rate_limiter_quotas(
286 max_requests_per_second: u32,
287 max_requests_per_minute: u32,
288 ) -> Vec<(String, Quota)> {
289 let per_sec_quota = Quota::per_second(
290 NonZeroU32::new(max_requests_per_second)
291 .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
292 );
293 let per_min_quota =
294 Quota::per_minute(NonZeroU32::new(max_requests_per_minute).unwrap_or_else(|| {
295 NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED).unwrap()
296 }));
297
298 vec![
299 (BITMEX_GLOBAL_RATE_KEY.to_string(), per_sec_quota),
300 (BITMEX_MINUTE_RATE_KEY.to_string(), per_min_quota),
301 ]
302 }
303
304 fn rate_limit_keys() -> Vec<Ustr> {
305 RATE_LIMIT_KEYS.clone()
306 }
307
308 pub fn cancel_all_requests(&self) {
310 self.cancellation_token.cancel();
311 }
312
313 pub fn cancellation_token(&self) -> &CancellationToken {
315 &self.cancellation_token
316 }
317
318 fn sign_request(
319 &self,
320 method: &Method,
321 endpoint: &str,
322 body: Option<&[u8]>,
323 ) -> Result<HashMap<String, String>, BitmexHttpError> {
324 let credential = self
325 .credential
326 .as_ref()
327 .ok_or(BitmexHttpError::MissingCredentials)?;
328
329 let expires = Utc::now().timestamp() + (self.recv_window_ms / 1000) as i64;
330 let body_str = body.and_then(|b| std::str::from_utf8(b).ok()).unwrap_or("");
331
332 let full_path = if endpoint.starts_with("/api/v1") {
333 endpoint.to_string()
334 } else {
335 format!("/api/v1{endpoint}")
336 };
337
338 let signature = credential.sign(method.as_str(), &full_path, expires, body_str);
339
340 let mut headers = HashMap::new();
341 headers.insert("api-expires".to_string(), expires.to_string());
342 headers.insert("api-key".to_string(), credential.api_key.to_string());
343 headers.insert("api-signature".to_string(), signature);
344
345 if body.is_some()
347 && (*method == Method::POST || *method == Method::PUT || *method == Method::DELETE)
348 {
349 headers.insert(
350 "Content-Type".to_string(),
351 "application/x-www-form-urlencoded".to_string(),
352 );
353 }
354
355 Ok(headers)
356 }
357
358 async fn send_request<T: DeserializeOwned, P: Serialize>(
359 &self,
360 method: Method,
361 endpoint: &str,
362 params: Option<&P>,
363 body: Option<Vec<u8>>,
364 authenticate: bool,
365 ) -> Result<T, BitmexHttpError> {
366 let endpoint = endpoint.to_string();
367 let method_clone = method.clone();
368 let body_clone = body.clone();
369
370 let params_str = if method == Method::GET || method == Method::DELETE {
373 params
374 .map(serde_urlencoded::to_string)
375 .transpose()
376 .map_err(|e| {
377 BitmexHttpError::JsonError(format!("Failed to serialize params: {e}"))
378 })?
379 } else {
380 None
381 };
382
383 let full_endpoint = match params_str {
384 Some(ref query) if !query.is_empty() => format!("{endpoint}?{query}"),
385 _ => endpoint.clone(),
386 };
387
388 let url = format!("{}{}", self.base_url, full_endpoint);
389
390 let operation = || {
391 let url = url.clone();
392 let method = method_clone.clone();
393 let body = body_clone.clone();
394 let full_endpoint = full_endpoint.clone();
395
396 async move {
397 let headers = if authenticate {
398 Some(self.sign_request(&method, &full_endpoint, body.as_deref())?)
399 } else {
400 None
401 };
402
403 let rate_keys = Self::rate_limit_keys();
404 let resp = self
405 .client
406 .request_with_ustr_keys(method, url, None, headers, body, None, Some(rate_keys))
407 .await?;
408
409 if resp.status.is_success() {
410 serde_json::from_slice(&resp.body).map_err(Into::into)
411 } else if let Ok(error_resp) =
412 serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
413 {
414 Err(error_resp.into())
415 } else {
416 Err(BitmexHttpError::UnexpectedStatus {
417 status: StatusCode::from_u16(resp.status.as_u16())
418 .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
419 body: String::from_utf8_lossy(&resp.body).to_string(),
420 })
421 }
422 }
423 };
424
425 let should_retry = |error: &BitmexHttpError| -> bool {
442 match error {
443 BitmexHttpError::NetworkError(_) => true,
444 BitmexHttpError::UnexpectedStatus { status, .. } => {
445 status.as_u16() >= 500 || status.as_u16() == 429
446 }
447 BitmexHttpError::BitmexError {
448 error_name,
449 message,
450 } => {
451 error_name == "RateLimitError"
452 || (error_name == "HTTPError"
453 && message.to_lowercase().contains("rate limit"))
454 }
455 _ => false,
456 }
457 };
458
459 let create_error = |msg: String| -> BitmexHttpError {
460 if msg == "canceled" {
461 BitmexHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
462 } else {
463 BitmexHttpError::NetworkError(msg)
464 }
465 };
466
467 self.retry_manager
468 .execute_with_retry_with_cancel(
469 endpoint.as_str(),
470 operation,
471 should_retry,
472 create_error,
473 &self.cancellation_token,
474 )
475 .await
476 }
477
478 pub async fn get_instruments(
484 &self,
485 active_only: bool,
486 ) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
487 let path = if active_only {
488 "/instrument/active"
489 } else {
490 "/instrument"
491 };
492 self.send_request::<_, ()>(Method::GET, path, None, None, false)
493 .await
494 }
495
496 pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
506 let response: BitmexApiInfo = self
507 .send_request::<_, ()>(Method::GET, "", None, None, false)
508 .await?;
509 Ok(response.timestamp)
510 }
511
512 pub async fn get_instrument(
523 &self,
524 symbol: &str,
525 ) -> Result<Option<BitmexInstrument>, BitmexHttpError> {
526 let path = &format!("/instrument?symbol={symbol}");
527 let instruments: Vec<BitmexInstrument> = self
528 .send_request::<_, ()>(Method::GET, path, None, None, false)
529 .await?;
530
531 Ok(instruments.into_iter().next())
532 }
533
534 pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
540 let endpoint = "/user/wallet";
541 self.send_request::<_, ()>(Method::GET, endpoint, None, None, true)
542 .await
543 }
544
545 pub async fn get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
551 let path = format!("/user/margin?currency={currency}");
552 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
553 .await
554 }
555
556 pub async fn get_trades(
566 &self,
567 params: GetTradeParams,
568 ) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
569 self.send_request(Method::GET, "/trade", Some(¶ms), None, true)
570 .await
571 }
572
573 pub async fn get_trade_bucketed(
579 &self,
580 params: GetTradeBucketedParams,
581 ) -> Result<Vec<BitmexTradeBin>, BitmexHttpError> {
582 self.send_request(Method::GET, "/trade/bucketed", Some(¶ms), None, true)
583 .await
584 }
585
586 pub async fn get_orders(
596 &self,
597 params: GetOrderParams,
598 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
599 self.send_request(Method::GET, "/order", Some(¶ms), None, true)
600 .await
601 }
602
603 pub async fn place_order(&self, params: PostOrderParams) -> Result<Value, BitmexHttpError> {
613 let body = serde_urlencoded::to_string(¶ms)
615 .map_err(|e| {
616 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
617 })?
618 .into_bytes();
619 let path = "/order";
620 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
621 .await
622 }
623
624 pub async fn cancel_orders(&self, params: DeleteOrderParams) -> Result<Value, BitmexHttpError> {
634 let body = serde_urlencoded::to_string(¶ms)
636 .map_err(|e| {
637 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
638 })?
639 .into_bytes();
640 let path = "/order";
641 self.send_request::<_, ()>(Method::DELETE, path, None, Some(body), true)
642 .await
643 }
644
645 pub async fn amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
655 let body = serde_urlencoded::to_string(¶ms)
657 .map_err(|e| {
658 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
659 })?
660 .into_bytes();
661 let path = "/order";
662 self.send_request::<_, ()>(Method::PUT, path, None, Some(body), true)
663 .await
664 }
665
666 pub async fn cancel_all_orders(
680 &self,
681 params: DeleteAllOrdersParams,
682 ) -> Result<Value, BitmexHttpError> {
683 self.send_request(Method::DELETE, "/order/all", Some(¶ms), None, true)
684 .await
685 }
686
687 pub async fn get_executions(
697 &self,
698 params: GetExecutionParams,
699 ) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
700 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
701 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
702 })?;
703 let path = format!("/execution/tradeHistory?{query}");
704 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
705 .await
706 }
707
708 pub async fn get_positions(
718 &self,
719 params: GetPositionParams,
720 ) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
721 self.send_request(Method::GET, "/position", Some(¶ms), None, true)
722 .await
723 }
724
725 pub async fn update_position_leverage(
735 &self,
736 params: PostPositionLeverageParams,
737 ) -> Result<BitmexPosition, BitmexHttpError> {
738 let body = serde_urlencoded::to_string(¶ms)
740 .map_err(|e| {
741 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
742 })?
743 .into_bytes();
744 let path = "/position/leverage";
745 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
746 .await
747 }
748}
749
750#[derive(Debug)]
755#[cfg_attr(
756 feature = "python",
757 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.bitmex")
758)]
759pub struct BitmexHttpClient {
760 inner: Arc<BitmexRawHttpClient>,
761 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
762 cache_initialized: AtomicBool,
763}
764
765impl Clone for BitmexHttpClient {
766 fn clone(&self) -> Self {
767 let cache_initialized = AtomicBool::new(false);
768
769 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
770 if is_initialized {
771 cache_initialized.store(true, Ordering::Release);
772 }
773
774 Self {
775 inner: self.inner.clone(),
776 instruments_cache: self.instruments_cache.clone(),
777 cache_initialized,
778 }
779 }
780}
781
782impl Default for BitmexHttpClient {
783 fn default() -> Self {
784 Self::new(
785 None,
786 None,
787 None,
788 false,
789 Some(60),
790 None,
791 None,
792 None,
793 None,
794 None,
795 None,
796 None, )
798 .expect("Failed to create default BitmexHttpClient")
799 }
800}
801
802impl BitmexHttpClient {
803 #[allow(clippy::too_many_arguments)]
809 pub fn new(
810 base_url: Option<String>,
811 api_key: Option<String>,
812 api_secret: Option<String>,
813 testnet: bool,
814 timeout_secs: Option<u64>,
815 max_retries: Option<u32>,
816 retry_delay_ms: Option<u64>,
817 retry_delay_max_ms: Option<u64>,
818 recv_window_ms: Option<u64>,
819 max_requests_per_second: Option<u32>,
820 max_requests_per_minute: Option<u32>,
821 proxy_url: Option<String>,
822 ) -> Result<Self, BitmexHttpError> {
823 let url = base_url.unwrap_or_else(|| {
825 if testnet {
826 BITMEX_HTTP_TESTNET_URL.to_string()
827 } else {
828 BITMEX_HTTP_URL.to_string()
829 }
830 });
831
832 let inner = match (api_key, api_secret) {
833 (Some(key), Some(secret)) => BitmexRawHttpClient::with_credentials(
834 key,
835 secret,
836 url,
837 timeout_secs,
838 max_retries,
839 retry_delay_ms,
840 retry_delay_max_ms,
841 recv_window_ms,
842 max_requests_per_second,
843 max_requests_per_minute,
844 proxy_url,
845 )?,
846 _ => BitmexRawHttpClient::new(
847 Some(url),
848 timeout_secs,
849 max_retries,
850 retry_delay_ms,
851 retry_delay_max_ms,
852 recv_window_ms,
853 max_requests_per_second,
854 max_requests_per_minute,
855 proxy_url,
856 )?,
857 };
858
859 Ok(Self {
860 inner: Arc::new(inner),
861 instruments_cache: Arc::new(DashMap::new()),
862 cache_initialized: AtomicBool::new(false),
863 })
864 }
865
866 pub fn from_env() -> anyhow::Result<Self> {
873 Self::with_credentials(
874 None, None, None, None, None, None, None, None, None, None, None,
875 )
876 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
877 }
878
879 #[allow(clippy::too_many_arguments)]
889 pub fn with_credentials(
890 api_key: Option<String>,
891 api_secret: Option<String>,
892 base_url: Option<String>,
893 timeout_secs: Option<u64>,
894 max_retries: Option<u32>,
895 retry_delay_ms: Option<u64>,
896 retry_delay_max_ms: Option<u64>,
897 recv_window_ms: Option<u64>,
898 max_requests_per_second: Option<u32>,
899 max_requests_per_minute: Option<u32>,
900 proxy_url: Option<String>,
901 ) -> anyhow::Result<Self> {
902 let testnet = base_url.as_ref().is_some_and(|url| url.contains("testnet"));
904
905 let (key_var, secret_var) = if testnet {
907 ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
908 } else {
909 ("BITMEX_API_KEY", "BITMEX_API_SECRET")
910 };
911
912 let api_key = get_or_env_var_opt(api_key, key_var);
913 let api_secret = get_or_env_var_opt(api_secret, secret_var);
914
915 if api_key.is_some() && api_secret.is_none() {
917 anyhow::bail!("{secret_var} is required when {key_var} is provided");
918 }
919 if api_key.is_none() && api_secret.is_some() {
920 anyhow::bail!("{key_var} is required when {secret_var} is provided");
921 }
922
923 Self::new(
924 base_url,
925 api_key,
926 api_secret,
927 testnet,
928 timeout_secs,
929 max_retries,
930 retry_delay_ms,
931 retry_delay_max_ms,
932 recv_window_ms,
933 max_requests_per_second,
934 max_requests_per_minute,
935 proxy_url,
936 )
937 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
938 }
939
940 #[must_use]
942 pub fn base_url(&self) -> &str {
943 self.inner.base_url.as_str()
944 }
945
946 #[must_use]
948 pub fn api_key(&self) -> Option<&str> {
949 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
950 }
951
952 #[must_use]
954 pub fn api_key_masked(&self) -> Option<String> {
955 self.inner.credential.as_ref().map(|c| c.api_key_masked())
956 }
957
958 pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
966 self.inner.get_server_time().await
967 }
968
969 fn generate_ts_init(&self) -> UnixNanos {
971 get_atomic_clock_realtime().get_time_ns()
972 }
973
974 fn is_contingent_order(contingency_type: ContingencyType) -> bool {
976 matches!(
977 contingency_type,
978 ContingencyType::Oco | ContingencyType::Oto | ContingencyType::Ouo
979 )
980 }
981
982 fn is_parent_contingency(contingency_type: ContingencyType) -> bool {
984 matches!(
985 contingency_type,
986 ContingencyType::Oco | ContingencyType::Oto
987 )
988 }
989
990 fn populate_linked_order_ids(reports: &mut [OrderStatusReport]) {
992 let mut order_list_groups: HashMap<OrderListId, Vec<ClientOrderId>> = HashMap::new();
993 let mut order_list_parents: HashMap<OrderListId, ClientOrderId> = HashMap::new();
994 let mut prefix_groups: HashMap<String, Vec<ClientOrderId>> = HashMap::new();
995 let mut prefix_parents: HashMap<String, ClientOrderId> = HashMap::new();
996
997 for report in reports.iter() {
998 let Some(client_order_id) = report.client_order_id else {
999 continue;
1000 };
1001
1002 if let Some(order_list_id) = report.order_list_id {
1003 order_list_groups
1004 .entry(order_list_id)
1005 .or_default()
1006 .push(client_order_id);
1007
1008 if Self::is_parent_contingency(report.contingency_type) {
1009 order_list_parents
1010 .entry(order_list_id)
1011 .or_insert(client_order_id);
1012 }
1013 }
1014
1015 if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1016 && Self::is_contingent_order(report.contingency_type)
1017 {
1018 prefix_groups
1019 .entry(base.to_owned())
1020 .or_default()
1021 .push(client_order_id);
1022
1023 if Self::is_parent_contingency(report.contingency_type) {
1024 prefix_parents
1025 .entry(base.to_owned())
1026 .or_insert(client_order_id);
1027 }
1028 }
1029 }
1030
1031 for report in reports.iter_mut() {
1032 let Some(client_order_id) = report.client_order_id else {
1033 continue;
1034 };
1035
1036 if report.linked_order_ids.is_some() {
1037 continue;
1038 }
1039
1040 if !Self::is_contingent_order(report.contingency_type) {
1042 continue;
1043 }
1044
1045 if let Some(order_list_id) = report.order_list_id
1046 && let Some(group) = order_list_groups.get(&order_list_id)
1047 {
1048 let mut linked: Vec<ClientOrderId> = group
1049 .iter()
1050 .copied()
1051 .filter(|candidate| candidate != &client_order_id)
1052 .collect();
1053
1054 if !linked.is_empty() {
1055 if let Some(parent_id) = order_list_parents.get(&order_list_id) {
1056 if client_order_id != *parent_id {
1057 linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1058 report.parent_order_id = Some(*parent_id);
1059 } else {
1060 report.parent_order_id = None;
1061 }
1062 } else {
1063 report.parent_order_id = None;
1064 }
1065
1066 tracing::trace!(
1067 client_order_id = ?client_order_id,
1068 order_list_id = ?order_list_id,
1069 contingency_type = ?report.contingency_type,
1070 linked_order_ids = ?linked,
1071 "BitMEX linked ids sourced from order list id",
1072 );
1073 report.linked_order_ids = Some(linked);
1074 continue;
1075 }
1076
1077 tracing::trace!(
1078 client_order_id = ?client_order_id,
1079 order_list_id = ?order_list_id,
1080 contingency_type = ?report.contingency_type,
1081 order_list_group = ?group,
1082 "BitMEX order list id group had no peers",
1083 );
1084 report.parent_order_id = None;
1085 } else if report.order_list_id.is_none() {
1086 report.parent_order_id = None;
1087 }
1088
1089 if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1090 && let Some(group) = prefix_groups.get(base)
1091 {
1092 let mut linked: Vec<ClientOrderId> = group
1093 .iter()
1094 .copied()
1095 .filter(|candidate| candidate != &client_order_id)
1096 .collect();
1097
1098 if !linked.is_empty() {
1099 if let Some(parent_id) = prefix_parents.get(base) {
1100 if client_order_id != *parent_id {
1101 linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1102 report.parent_order_id = Some(*parent_id);
1103 } else {
1104 report.parent_order_id = None;
1105 }
1106 } else {
1107 report.parent_order_id = None;
1108 }
1109
1110 tracing::trace!(
1111 client_order_id = ?client_order_id,
1112 contingency_type = ?report.contingency_type,
1113 base = base,
1114 linked_order_ids = ?linked,
1115 "BitMEX linked ids constructed from client order id prefix",
1116 );
1117 report.linked_order_ids = Some(linked);
1118 continue;
1119 }
1120
1121 tracing::trace!(
1122 client_order_id = ?client_order_id,
1123 contingency_type = ?report.contingency_type,
1124 base = base,
1125 prefix_group = ?group,
1126 "BitMEX client order id prefix group had no peers",
1127 );
1128 report.parent_order_id = None;
1129 } else if client_order_id.as_str().contains('-') {
1130 report.parent_order_id = None;
1131 }
1132
1133 if Self::is_contingent_order(report.contingency_type) {
1134 tracing::warn!(
1135 client_order_id = ?report.client_order_id,
1136 order_list_id = ?report.order_list_id,
1137 contingency_type = ?report.contingency_type,
1138 "BitMEX order status report missing linked ids after grouping",
1139 );
1140 report.contingency_type = ContingencyType::NoContingency;
1141 report.parent_order_id = None;
1142 }
1143
1144 report.linked_order_ids = None;
1145 }
1146 }
1147
1148 pub fn cancel_all_requests(&self) {
1150 self.inner.cancel_all_requests();
1151 }
1152
1153 pub fn cancellation_token(&self) -> CancellationToken {
1155 self.inner.cancellation_token().clone()
1156 }
1157
1158 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1162 self.instruments_cache
1163 .insert(instrument.raw_symbol().inner(), instrument);
1164 self.cache_initialized.store(true, Ordering::Release);
1165 }
1166
1167 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1169 self.instruments_cache
1170 .get(symbol)
1171 .map(|entry| entry.value().clone())
1172 }
1173
1174 pub async fn request_instrument(
1182 &self,
1183 instrument_id: InstrumentId,
1184 ) -> anyhow::Result<Option<InstrumentAny>> {
1185 let response = self
1186 .inner
1187 .get_instrument(instrument_id.symbol.as_str())
1188 .await?;
1189
1190 let instrument = match response {
1191 Some(instrument) => instrument,
1192 None => return Ok(None),
1193 };
1194
1195 let ts_init = self.generate_ts_init();
1196
1197 match parse_instrument_any(&instrument, ts_init) {
1198 InstrumentParseResult::Ok(inst) => Ok(Some(*inst)),
1199 InstrumentParseResult::Unsupported {
1200 symbol,
1201 instrument_type,
1202 } => {
1203 tracing::debug!(
1204 "Instrument {symbol} has unsupported type {instrument_type:?}, returning None"
1205 );
1206 Ok(None)
1207 }
1208 InstrumentParseResult::Inactive { symbol, state } => {
1209 tracing::debug!("Instrument {symbol} is inactive (state={state}), returning None");
1210 Ok(None)
1211 }
1212 InstrumentParseResult::Failed {
1213 symbol,
1214 instrument_type,
1215 error,
1216 } => {
1217 tracing::error!(
1218 "Failed to parse instrument {symbol} (type={instrument_type:?}): {error}"
1219 );
1220 Ok(None)
1221 }
1222 }
1223 }
1224
1225 pub async fn request_instruments(
1231 &self,
1232 active_only: bool,
1233 ) -> anyhow::Result<Vec<InstrumentAny>> {
1234 let instruments = self.inner.get_instruments(active_only).await?;
1235 let ts_init = self.generate_ts_init();
1236
1237 let mut parsed_instruments = Vec::new();
1238 let mut skipped_count = 0;
1239 let mut inactive_count = 0;
1240 let mut failed_count = 0;
1241 let total_count = instruments.len();
1242
1243 for inst in instruments {
1244 match parse_instrument_any(&inst, ts_init) {
1245 InstrumentParseResult::Ok(instrument_any) => {
1246 parsed_instruments.push(*instrument_any);
1247 }
1248 InstrumentParseResult::Unsupported {
1249 symbol,
1250 instrument_type,
1251 } => {
1252 skipped_count += 1;
1253 tracing::debug!(
1254 "Skipping unsupported instrument type: symbol={symbol}, type={instrument_type:?}"
1255 );
1256 }
1257 InstrumentParseResult::Inactive { symbol, state } => {
1258 inactive_count += 1;
1259 tracing::debug!("Skipping inactive instrument: symbol={symbol}, state={state}");
1260 }
1261 InstrumentParseResult::Failed {
1262 symbol,
1263 instrument_type,
1264 error,
1265 } => {
1266 failed_count += 1;
1267 tracing::error!(
1268 "Failed to parse instrument: symbol={symbol}, type={instrument_type:?}, error={error}"
1269 );
1270 }
1271 }
1272 }
1273
1274 if skipped_count > 0 {
1275 tracing::info!(
1276 "Skipped {skipped_count} unsupported instrument type(s) out of {total_count} total"
1277 );
1278 }
1279
1280 if inactive_count > 0 {
1281 tracing::info!(
1282 "Skipped {inactive_count} inactive instrument(s) out of {total_count} total"
1283 );
1284 }
1285
1286 if failed_count > 0 {
1287 tracing::error!(
1288 "Instrument parse failures: {failed_count} failed out of {total_count} total ({} successfully parsed)",
1289 parsed_instruments.len()
1290 );
1291 }
1292
1293 Ok(parsed_instruments)
1294 }
1295
1296 pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
1306 let inner = self.inner.clone();
1307 inner.get_wallet().await
1308 }
1309
1310 pub async fn get_orders(
1320 &self,
1321 params: GetOrderParams,
1322 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
1323 let inner = self.inner.clone();
1324 inner.get_orders(params).await
1325 }
1326
1327 fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1333 self.get_instrument(&symbol).ok_or_else(|| {
1334 anyhow::anyhow!(
1335 "Instrument {symbol} not found in cache, ensure instruments loaded first"
1336 )
1337 })
1338 }
1339
1340 pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
1347 self.instrument_from_cache(symbol)
1348 .map(|instrument| instrument.price_precision())
1349 }
1350
1351 pub async fn get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
1357 self.inner
1358 .get_margin(currency)
1359 .await
1360 .map_err(|e| anyhow::anyhow!(e))
1361 }
1362
1363 pub async fn request_account_state(
1369 &self,
1370 account_id: AccountId,
1371 ) -> anyhow::Result<AccountState> {
1372 let margin = self
1374 .inner
1375 .get_margin("XBt")
1376 .await
1377 .map_err(|e| anyhow::anyhow!(e))?;
1378
1379 let ts_init =
1380 UnixNanos::from(chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64);
1381
1382 let margin_msg = BitmexMarginMsg {
1384 account: margin.account,
1385 currency: margin.currency,
1386 risk_limit: margin.risk_limit,
1387 amount: margin.amount,
1388 prev_realised_pnl: margin.prev_realised_pnl,
1389 gross_comm: margin.gross_comm,
1390 gross_open_cost: margin.gross_open_cost,
1391 gross_open_premium: margin.gross_open_premium,
1392 gross_exec_cost: margin.gross_exec_cost,
1393 gross_mark_value: margin.gross_mark_value,
1394 risk_value: margin.risk_value,
1395 init_margin: margin.init_margin,
1396 maint_margin: margin.maint_margin,
1397 target_excess_margin: margin.target_excess_margin,
1398 realised_pnl: margin.realised_pnl,
1399 unrealised_pnl: margin.unrealised_pnl,
1400 wallet_balance: margin.wallet_balance,
1401 margin_balance: margin.margin_balance,
1402 margin_leverage: margin.margin_leverage,
1403 margin_used_pcnt: margin.margin_used_pcnt,
1404 excess_margin: margin.excess_margin,
1405 available_margin: margin.available_margin,
1406 withdrawable_margin: margin.withdrawable_margin,
1407 maker_fee_discount: None, taker_fee_discount: None, timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
1410 foreign_margin_balance: None,
1411 foreign_requirement: None,
1412 };
1413
1414 parse_account_state(&margin_msg, account_id, ts_init)
1415 }
1416
1417 #[allow(clippy::too_many_arguments)]
1424 pub async fn submit_order(
1425 &self,
1426 instrument_id: InstrumentId,
1427 client_order_id: ClientOrderId,
1428 order_side: OrderSide,
1429 order_type: OrderType,
1430 quantity: Quantity,
1431 time_in_force: TimeInForce,
1432 price: Option<Price>,
1433 trigger_price: Option<Price>,
1434 trigger_type: Option<TriggerType>,
1435 display_qty: Option<Quantity>,
1436 post_only: bool,
1437 reduce_only: bool,
1438 order_list_id: Option<OrderListId>,
1439 contingency_type: Option<ContingencyType>,
1440 ) -> anyhow::Result<OrderStatusReport> {
1441 use crate::common::enums::{
1442 BitmexExecInstruction, BitmexOrderType, BitmexSide, BitmexTimeInForce,
1443 };
1444
1445 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1446
1447 let mut params = super::query::PostOrderParamsBuilder::default();
1448 params.text(NAUTILUS_TRADER);
1449 params.symbol(instrument_id.symbol.as_str());
1450 params.cl_ord_id(client_order_id.as_str());
1451
1452 let side = BitmexSide::try_from_order_side(order_side)?;
1453 params.side(side);
1454
1455 let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
1456 params.ord_type(ord_type);
1457
1458 params.order_qty(quantity_to_u32(&quantity, &instrument));
1459
1460 let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
1461 params.time_in_force(tif);
1462
1463 if let Some(price) = price {
1464 params.price(price.as_f64());
1465 }
1466
1467 if let Some(trigger_price) = trigger_price {
1468 params.stop_px(trigger_price.as_f64());
1469 }
1470
1471 if let Some(display_qty) = display_qty {
1472 params.display_qty(quantity_to_u32(&display_qty, &instrument));
1473 }
1474
1475 if let Some(order_list_id) = order_list_id {
1476 params.cl_ord_link_id(order_list_id.as_str());
1477 }
1478
1479 let mut exec_inst = Vec::new();
1480
1481 if post_only {
1482 exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
1483 }
1484
1485 if reduce_only {
1486 exec_inst.push(BitmexExecInstruction::ReduceOnly);
1487 }
1488
1489 if trigger_price.is_some()
1490 && let Some(trigger_type) = trigger_type
1491 {
1492 match trigger_type {
1493 TriggerType::LastPrice => exec_inst.push(BitmexExecInstruction::LastPrice),
1494 TriggerType::MarkPrice => exec_inst.push(BitmexExecInstruction::MarkPrice),
1495 TriggerType::IndexPrice => exec_inst.push(BitmexExecInstruction::IndexPrice),
1496 _ => {} }
1498 }
1499
1500 if !exec_inst.is_empty() {
1501 params.exec_inst(exec_inst);
1502 }
1503
1504 if let Some(contingency_type) = contingency_type {
1505 let bitmex_contingency = BitmexContingencyType::try_from(contingency_type)?;
1506 params.contingency_type(bitmex_contingency);
1507 }
1508
1509 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1510
1511 let response = self.inner.place_order(params).await?;
1512
1513 let order: BitmexOrder = serde_json::from_value(response)?;
1514
1515 if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1516 let reason = order
1517 .ord_rej_reason
1518 .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1519 anyhow::bail!("Order rejected: {reason}");
1520 }
1521
1522 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1523 let ts_init = self.generate_ts_init();
1524
1525 parse_order_status_report(&order, &instrument, ts_init)
1526 }
1527
1528 pub async fn cancel_order(
1538 &self,
1539 instrument_id: InstrumentId,
1540 client_order_id: Option<ClientOrderId>,
1541 venue_order_id: Option<VenueOrderId>,
1542 ) -> anyhow::Result<OrderStatusReport> {
1543 let mut params = super::query::DeleteOrderParamsBuilder::default();
1544 params.text(NAUTILUS_TRADER);
1545
1546 if let Some(venue_order_id) = venue_order_id {
1547 params.order_id(vec![venue_order_id.as_str().to_string()]);
1548 } else if let Some(client_order_id) = client_order_id {
1549 params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
1550 } else {
1551 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1552 }
1553
1554 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1555
1556 let response = self.inner.cancel_orders(params).await?;
1557
1558 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1559 let order = orders
1560 .into_iter()
1561 .next()
1562 .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1563
1564 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1565 let ts_init = self.generate_ts_init();
1566
1567 parse_order_status_report(&order, &instrument, ts_init)
1568 }
1569
1570 pub async fn cancel_orders(
1580 &self,
1581 instrument_id: InstrumentId,
1582 client_order_ids: Option<Vec<ClientOrderId>>,
1583 venue_order_ids: Option<Vec<VenueOrderId>>,
1584 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1585 let mut params = super::query::DeleteOrderParamsBuilder::default();
1586 params.text(NAUTILUS_TRADER);
1587
1588 if let Some(venue_order_ids) = venue_order_ids {
1591 if venue_order_ids.is_empty() {
1592 anyhow::bail!("venue_order_ids cannot be empty");
1593 }
1594 params.order_id(
1595 venue_order_ids
1596 .iter()
1597 .map(|id| id.to_string())
1598 .collect::<Vec<_>>(),
1599 );
1600 } else if let Some(client_order_ids) = client_order_ids {
1601 if client_order_ids.is_empty() {
1602 anyhow::bail!("client_order_ids cannot be empty");
1603 }
1604 params.cl_ord_id(
1605 client_order_ids
1606 .iter()
1607 .map(|id| id.to_string())
1608 .collect::<Vec<_>>(),
1609 );
1610 } else {
1611 anyhow::bail!("Either client_order_ids or venue_order_ids must be provided");
1612 }
1613
1614 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1615
1616 let response = self.inner.cancel_orders(params).await?;
1617
1618 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1619
1620 let ts_init = self.generate_ts_init();
1621 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1622
1623 let mut reports = Vec::new();
1624
1625 for order in orders {
1626 reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1627 }
1628
1629 Self::populate_linked_order_ids(&mut reports);
1630
1631 Ok(reports)
1632 }
1633
1634 pub async fn cancel_all_orders(
1644 &self,
1645 instrument_id: InstrumentId,
1646 order_side: Option<OrderSide>,
1647 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1648 let mut params = DeleteAllOrdersParamsBuilder::default();
1649 params.text(NAUTILUS_TRADER);
1650 params.symbol(instrument_id.symbol.as_str());
1651
1652 if let Some(side) = order_side {
1653 let side = BitmexSide::try_from_order_side(side)?;
1654 params.filter(serde_json::json!({
1655 "side": side
1656 }));
1657 }
1658
1659 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1660
1661 let response = self.inner.cancel_all_orders(params).await?;
1662
1663 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1664
1665 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1666 let ts_init = self.generate_ts_init();
1667
1668 let mut reports = Vec::new();
1669
1670 for order in orders {
1671 reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1672 }
1673
1674 Self::populate_linked_order_ids(&mut reports);
1675
1676 Ok(reports)
1677 }
1678
1679 pub async fn modify_order(
1690 &self,
1691 instrument_id: InstrumentId,
1692 client_order_id: Option<ClientOrderId>,
1693 venue_order_id: Option<VenueOrderId>,
1694 quantity: Option<Quantity>,
1695 price: Option<Price>,
1696 trigger_price: Option<Price>,
1697 ) -> anyhow::Result<OrderStatusReport> {
1698 let mut params = PutOrderParamsBuilder::default();
1699 params.text(NAUTILUS_TRADER);
1700
1701 if let Some(venue_order_id) = venue_order_id {
1703 params.order_id(venue_order_id.as_str());
1704 } else if let Some(client_order_id) = client_order_id {
1705 params.orig_cl_ord_id(client_order_id.as_str());
1706 } else {
1707 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1708 }
1709
1710 if let Some(quantity) = quantity {
1711 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1712 params.order_qty(quantity_to_u32(&quantity, &instrument));
1713 }
1714
1715 if let Some(price) = price {
1716 params.price(price.as_f64());
1717 }
1718
1719 if let Some(trigger_price) = trigger_price {
1720 params.stop_px(trigger_price.as_f64());
1721 }
1722
1723 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1724
1725 let response = self.inner.amend_order(params).await?;
1726
1727 let order: BitmexOrder = serde_json::from_value(response)?;
1728
1729 if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1730 let reason = order
1731 .ord_rej_reason
1732 .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1733 anyhow::bail!("Order modification rejected: {reason}");
1734 }
1735
1736 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1737 let ts_init = self.generate_ts_init();
1738
1739 parse_order_status_report(&order, &instrument, ts_init)
1740 }
1741
1742 pub async fn query_order(
1751 &self,
1752 instrument_id: InstrumentId,
1753 client_order_id: Option<ClientOrderId>,
1754 venue_order_id: Option<VenueOrderId>,
1755 ) -> anyhow::Result<Option<OrderStatusReport>> {
1756 let mut params = GetOrderParamsBuilder::default();
1757
1758 let filter_json = if let Some(client_order_id) = client_order_id {
1759 serde_json::json!({
1760 "clOrdID": client_order_id.to_string()
1761 })
1762 } else if let Some(venue_order_id) = venue_order_id {
1763 serde_json::json!({
1764 "orderID": venue_order_id.to_string()
1765 })
1766 } else {
1767 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1768 };
1769
1770 params.filter(filter_json);
1771 params.count(1); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1774
1775 let response = self.inner.get_orders(params).await?;
1776
1777 if response.is_empty() {
1778 return Ok(None);
1779 }
1780
1781 let order = &response[0];
1782
1783 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1784 let ts_init = self.generate_ts_init();
1785
1786 let report = parse_order_status_report(order, &instrument, ts_init)?;
1787
1788 Ok(Some(report))
1789 }
1790
1791 pub async fn request_order_status_report(
1800 &self,
1801 instrument_id: InstrumentId,
1802 client_order_id: Option<ClientOrderId>,
1803 venue_order_id: Option<VenueOrderId>,
1804 ) -> anyhow::Result<OrderStatusReport> {
1805 let mut params = GetOrderParamsBuilder::default();
1806 params.symbol(instrument_id.symbol.as_str());
1807
1808 if let Some(venue_order_id) = venue_order_id {
1809 params.filter(serde_json::json!({
1810 "orderID": venue_order_id.as_str()
1811 }));
1812 } else if let Some(client_order_id) = client_order_id {
1813 params.filter(serde_json::json!({
1814 "clOrdID": client_order_id.as_str()
1815 }));
1816 }
1817
1818 params.count(1i32);
1819 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1820
1821 let response = self.inner.get_orders(params).await?;
1822
1823 let order = response
1824 .into_iter()
1825 .next()
1826 .ok_or_else(|| anyhow::anyhow!("Order not found"))?;
1827
1828 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1829 let ts_init = self.generate_ts_init();
1830
1831 parse_order_status_report(&order, &instrument, ts_init)
1832 }
1833
1834 pub async fn request_order_status_reports(
1843 &self,
1844 instrument_id: Option<InstrumentId>,
1845 open_only: bool,
1846 limit: Option<u32>,
1847 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1848 let mut params = GetOrderParamsBuilder::default();
1849
1850 if let Some(instrument_id) = &instrument_id {
1851 params.symbol(instrument_id.symbol.as_str());
1852 }
1853
1854 if open_only {
1855 params.filter(serde_json::json!({
1856 "open": true
1857 }));
1858 }
1859
1860 if let Some(limit) = limit {
1861 params.count(limit as i32);
1862 } else {
1863 params.count(500); }
1865
1866 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1869
1870 let response = self.inner.get_orders(params).await?;
1871
1872 let ts_init = self.generate_ts_init();
1873
1874 let mut reports = Vec::new();
1875
1876 for order in response {
1877 let Some(symbol) = order.symbol else {
1879 tracing::warn!("Order response missing symbol, skipping");
1880 continue;
1881 };
1882
1883 let Ok(instrument) = self.instrument_from_cache(symbol) else {
1884 tracing::debug!(
1885 symbol = %symbol,
1886 "Skipping order report for instrument not in cache"
1887 );
1888 continue;
1889 };
1890
1891 match parse_order_status_report(&order, &instrument, ts_init) {
1892 Ok(report) => reports.push(report),
1893 Err(e) => tracing::error!("Failed to parse order status report: {e}"),
1894 }
1895 }
1896
1897 Self::populate_linked_order_ids(&mut reports);
1898
1899 Ok(reports)
1900 }
1901
1902 pub async fn request_trades(
1908 &self,
1909 instrument_id: InstrumentId,
1910 start: Option<DateTime<Utc>>,
1911 end: Option<DateTime<Utc>>,
1912 limit: Option<u32>,
1913 ) -> anyhow::Result<Vec<TradeTick>> {
1914 let mut params = GetTradeParamsBuilder::default();
1915 params.symbol(instrument_id.symbol.as_str());
1916
1917 if let Some(start) = start {
1918 params.start_time(start);
1919 }
1920
1921 if let Some(end) = end {
1922 params.end_time(end);
1923 }
1924
1925 if let (Some(start), Some(end)) = (start, end) {
1926 anyhow::ensure!(
1927 start < end,
1928 "Invalid time range: start={start:?} end={end:?}",
1929 );
1930 }
1931
1932 if let Some(limit) = limit {
1933 let clamped_limit = limit.min(1000);
1934 if limit > 1000 {
1935 tracing::warn!(
1936 limit,
1937 clamped_limit,
1938 "BitMEX trade request limit exceeds venue maximum; clamping",
1939 );
1940 }
1941 params.count(i32::try_from(clamped_limit).unwrap_or(1000));
1942 }
1943 params.reverse(false);
1944 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1945
1946 let response = self.inner.get_trades(params).await?;
1947
1948 let ts_init = self.generate_ts_init();
1949
1950 let mut parsed_trades = Vec::new();
1951
1952 for trade in response {
1953 if let Some(start) = start
1954 && trade.timestamp < start
1955 {
1956 continue;
1957 }
1958
1959 if let Some(end) = end
1960 && trade.timestamp > end
1961 {
1962 continue;
1963 }
1964
1965 let price_precision = self.get_price_precision(trade.symbol)?;
1966
1967 match parse_trade(trade, price_precision, ts_init) {
1968 Ok(trade) => parsed_trades.push(trade),
1969 Err(e) => tracing::error!("Failed to parse trade: {e}"),
1970 }
1971 }
1972
1973 Ok(parsed_trades)
1974 }
1975
1976 pub async fn request_bars(
1983 &self,
1984 mut bar_type: BarType,
1985 start: Option<DateTime<Utc>>,
1986 end: Option<DateTime<Utc>>,
1987 limit: Option<u32>,
1988 partial: bool,
1989 ) -> anyhow::Result<Vec<Bar>> {
1990 bar_type = bar_type.standard();
1991
1992 anyhow::ensure!(
1993 bar_type.aggregation_source() == AggregationSource::External,
1994 "Only EXTERNAL aggregation bars are supported"
1995 );
1996 anyhow::ensure!(
1997 bar_type.spec().price_type == PriceType::Last,
1998 "Only LAST price type bars are supported"
1999 );
2000 if let (Some(start), Some(end)) = (start, end) {
2001 anyhow::ensure!(
2002 start < end,
2003 "Invalid time range: start={start:?} end={end:?}"
2004 );
2005 }
2006
2007 let spec = bar_type.spec();
2008 let bin_size = match (spec.aggregation, spec.step.get()) {
2009 (BarAggregation::Minute, 1) => "1m",
2010 (BarAggregation::Minute, 5) => "5m",
2011 (BarAggregation::Hour, 1) => "1h",
2012 (BarAggregation::Day, 1) => "1d",
2013 _ => anyhow::bail!(
2014 "BitMEX does not support {}-{:?}-{:?} bars",
2015 spec.step.get(),
2016 spec.aggregation,
2017 spec.price_type,
2018 ),
2019 };
2020
2021 let instrument_id = bar_type.instrument_id();
2022 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2023
2024 let mut params = GetTradeBucketedParamsBuilder::default();
2025 params.symbol(instrument_id.symbol.as_str());
2026 params.bin_size(bin_size);
2027 if partial {
2028 params.partial(true);
2029 }
2030 if let Some(start) = start {
2031 params.start_time(start);
2032 }
2033 if let Some(end) = end {
2034 params.end_time(end);
2035 }
2036 if let Some(limit) = limit {
2037 let clamped_limit = limit.min(1000);
2038 if limit > 1000 {
2039 tracing::warn!(
2040 limit,
2041 clamped_limit,
2042 "BitMEX bar request limit exceeds venue maximum; clamping",
2043 );
2044 }
2045 params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2046 }
2047 params.reverse(false);
2048 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2049
2050 let response = self.inner.get_trade_bucketed(params).await?;
2051 let ts_init = self.generate_ts_init();
2052 let mut bars = Vec::new();
2053
2054 for bin in response {
2055 if let Some(start) = start
2056 && bin.timestamp < start
2057 {
2058 continue;
2059 }
2060 if let Some(end) = end
2061 && bin.timestamp > end
2062 {
2063 continue;
2064 }
2065 if bin.symbol != instrument_id.symbol.inner() {
2066 tracing::warn!(
2067 symbol = %bin.symbol,
2068 expected = %instrument_id.symbol,
2069 "Skipping trade bin for unexpected symbol",
2070 );
2071 continue;
2072 }
2073
2074 match parse_trade_bin(bin, &instrument, &bar_type, ts_init) {
2075 Ok(bar) => bars.push(bar),
2076 Err(e) => tracing::warn!("Failed to parse trade bin: {e}"),
2077 }
2078 }
2079
2080 Ok(bars)
2081 }
2082
2083 pub async fn request_fill_reports(
2089 &self,
2090 instrument_id: Option<InstrumentId>,
2091 limit: Option<u32>,
2092 ) -> anyhow::Result<Vec<FillReport>> {
2093 let mut params = GetExecutionParamsBuilder::default();
2094 if let Some(instrument_id) = instrument_id {
2095 params.symbol(instrument_id.symbol.as_str());
2096 }
2097 if let Some(limit) = limit {
2098 params.count(limit as i32);
2099 } else {
2100 params.count(500); }
2102 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2105
2106 let response = self.inner.get_executions(params).await?;
2107
2108 let ts_init = self.generate_ts_init();
2109
2110 let mut reports = Vec::new();
2111
2112 for exec in response {
2113 let Some(symbol) = exec.symbol else {
2115 tracing::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
2116 continue;
2117 };
2118 let symbol_str = symbol.to_string();
2119
2120 let instrument = match self.instrument_from_cache(symbol) {
2121 Ok(instrument) => instrument,
2122 Err(e) => {
2123 tracing::error!(symbol = %symbol_str, "Instrument not found in cache for execution parsing: {e}");
2124 continue;
2125 }
2126 };
2127
2128 match parse_fill_report(exec, &instrument, ts_init) {
2129 Ok(report) => reports.push(report),
2130 Err(e) => {
2131 let error_msg = e.to_string();
2133 if error_msg.starts_with("Skipping non-trade execution")
2134 || error_msg.starts_with("Skipping execution without order_id")
2135 {
2136 tracing::debug!("{e}");
2137 } else {
2138 tracing::error!("Failed to parse fill report: {e}");
2139 }
2140 }
2141 }
2142 }
2143
2144 Ok(reports)
2145 }
2146
2147 pub async fn request_position_status_reports(
2153 &self,
2154 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2155 let params = GetPositionParamsBuilder::default()
2156 .count(500) .build()
2158 .map_err(|e| anyhow::anyhow!(e))?;
2159
2160 let response = self.inner.get_positions(params).await?;
2161
2162 let ts_init = self.generate_ts_init();
2163
2164 let mut reports = Vec::new();
2165
2166 for pos in response {
2167 let symbol = pos.symbol;
2168 let instrument = match self.instrument_from_cache(symbol) {
2169 Ok(instrument) => instrument,
2170 Err(e) => {
2171 tracing::error!(
2172 symbol = pos.symbol.as_str(),
2173 "Instrument not found in cache for position parsing: {e}"
2174 );
2175 continue;
2176 }
2177 };
2178
2179 match parse_position_report(pos, &instrument, ts_init) {
2180 Ok(report) => reports.push(report),
2181 Err(e) => tracing::error!("Failed to parse position report: {e}"),
2182 }
2183 }
2184
2185 Ok(reports)
2186 }
2187
2188 pub async fn update_position_leverage(
2196 &self,
2197 symbol: &str,
2198 leverage: f64,
2199 ) -> anyhow::Result<PositionStatusReport> {
2200 let params = PostPositionLeverageParams {
2201 symbol: symbol.to_string(),
2202 leverage,
2203 target_account_id: None,
2204 };
2205
2206 let response = self.inner.update_position_leverage(params).await?;
2207
2208 let instrument = self.instrument_from_cache(Ustr::from(symbol))?;
2209 let ts_init = self.generate_ts_init();
2210
2211 parse_position_report(response, &instrument, ts_init)
2212 }
2213}
2214
2215#[cfg(test)]
2216mod tests {
2217 use nautilus_core::UUID4;
2218 use nautilus_model::enums::OrderStatus;
2219 use rstest::rstest;
2220 use serde_json::json;
2221
2222 use super::*;
2223
2224 fn build_report(
2225 client_order_id: &str,
2226 venue_order_id: &str,
2227 contingency_type: ContingencyType,
2228 order_list_id: Option<&str>,
2229 ) -> OrderStatusReport {
2230 let mut report = OrderStatusReport::new(
2231 AccountId::from("BITMEX-1"),
2232 InstrumentId::from("XBTUSD.BITMEX"),
2233 Some(ClientOrderId::from(client_order_id)),
2234 VenueOrderId::from(venue_order_id),
2235 OrderSide::Buy,
2236 OrderType::Limit,
2237 TimeInForce::Gtc,
2238 OrderStatus::Accepted,
2239 Quantity::new(100.0, 0),
2240 Quantity::default(),
2241 UnixNanos::from(1_u64),
2242 UnixNanos::from(1_u64),
2243 UnixNanos::from(1_u64),
2244 Some(UUID4::new()),
2245 );
2246
2247 if let Some(id) = order_list_id {
2248 report = report.with_order_list_id(OrderListId::from(id));
2249 }
2250
2251 report.with_contingency_type(contingency_type)
2252 }
2253
2254 #[rstest]
2255 fn test_sign_request_generates_correct_headers() {
2256 let client = BitmexRawHttpClient::with_credentials(
2257 "test_api_key".to_string(),
2258 "test_api_secret".to_string(),
2259 "http://localhost:8080".to_string(),
2260 Some(60),
2261 None, None, None, None, None, None, None, )
2269 .expect("Failed to create test client");
2270
2271 let headers = client
2272 .sign_request(&Method::GET, "/api/v1/order", None)
2273 .unwrap();
2274
2275 assert!(headers.contains_key("api-key"));
2276 assert!(headers.contains_key("api-signature"));
2277 assert!(headers.contains_key("api-expires"));
2278 assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
2279 }
2280
2281 #[rstest]
2282 fn test_sign_request_with_body() {
2283 let client = BitmexRawHttpClient::with_credentials(
2284 "test_api_key".to_string(),
2285 "test_api_secret".to_string(),
2286 "http://localhost:8080".to_string(),
2287 Some(60),
2288 None, None, None, None, None, None, None, )
2296 .expect("Failed to create test client");
2297
2298 let body = json!({"symbol": "XBTUSD", "orderQty": 100});
2299 let body_bytes = serde_json::to_vec(&body).unwrap();
2300
2301 let headers_without_body = client
2302 .sign_request(&Method::POST, "/api/v1/order", None)
2303 .unwrap();
2304 let headers_with_body = client
2305 .sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
2306 .unwrap();
2307
2308 assert_ne!(
2310 headers_without_body.get("api-signature").unwrap(),
2311 headers_with_body.get("api-signature").unwrap()
2312 );
2313 }
2314
2315 #[rstest]
2316 fn test_sign_request_uses_custom_recv_window() {
2317 let client_default = BitmexRawHttpClient::with_credentials(
2318 "test_api_key".to_string(),
2319 "test_api_secret".to_string(),
2320 "http://localhost:8080".to_string(),
2321 Some(60),
2322 None,
2323 None,
2324 None,
2325 None, None, None, None, )
2330 .expect("Failed to create test client");
2331
2332 let client_custom = BitmexRawHttpClient::with_credentials(
2333 "test_api_key".to_string(),
2334 "test_api_secret".to_string(),
2335 "http://localhost:8080".to_string(),
2336 Some(60),
2337 None,
2338 None,
2339 None,
2340 Some(30_000), None, None, None, )
2345 .expect("Failed to create test client");
2346
2347 let headers_default = client_default
2348 .sign_request(&Method::GET, "/api/v1/order", None)
2349 .unwrap();
2350 let headers_custom = client_custom
2351 .sign_request(&Method::GET, "/api/v1/order", None)
2352 .unwrap();
2353
2354 let expires_default: i64 = headers_default.get("api-expires").unwrap().parse().unwrap();
2356 let expires_custom: i64 = headers_custom.get("api-expires").unwrap().parse().unwrap();
2357
2358 let now = Utc::now().timestamp();
2360 assert!(expires_default > now);
2361 assert!(expires_custom > now);
2362
2363 assert!(expires_custom > expires_default);
2365
2366 let diff = expires_custom - expires_default;
2369 assert!((18..=25).contains(&diff));
2370 }
2371
2372 #[rstest]
2373 fn test_populate_linked_order_ids_from_order_list() {
2374 let base = "O-20250922-002219-001-000";
2375 let entry = format!("{base}-1");
2376 let stop = format!("{base}-2");
2377 let take = format!("{base}-3");
2378
2379 let mut reports = vec![
2380 build_report(&entry, "V-1", ContingencyType::Oto, Some("OL-1")),
2381 build_report(&stop, "V-2", ContingencyType::Ouo, Some("OL-1")),
2382 build_report(&take, "V-3", ContingencyType::Ouo, Some("OL-1")),
2383 ];
2384
2385 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2386
2387 assert_eq!(
2388 reports[0].linked_order_ids,
2389 Some(vec![
2390 ClientOrderId::from(stop.as_str()),
2391 ClientOrderId::from(take.as_str()),
2392 ]),
2393 );
2394 assert_eq!(
2395 reports[1].linked_order_ids,
2396 Some(vec![
2397 ClientOrderId::from(entry.as_str()),
2398 ClientOrderId::from(take.as_str()),
2399 ]),
2400 );
2401 assert_eq!(
2402 reports[2].linked_order_ids,
2403 Some(vec![
2404 ClientOrderId::from(entry.as_str()),
2405 ClientOrderId::from(stop.as_str()),
2406 ]),
2407 );
2408 }
2409
2410 #[rstest]
2411 fn test_populate_linked_order_ids_from_id_prefix() {
2412 let base = "O-20250922-002220-001-000";
2413 let entry = format!("{base}-1");
2414 let stop = format!("{base}-2");
2415 let take = format!("{base}-3");
2416
2417 let mut reports = vec![
2418 build_report(&entry, "V-1", ContingencyType::Oto, None),
2419 build_report(&stop, "V-2", ContingencyType::Ouo, None),
2420 build_report(&take, "V-3", ContingencyType::Ouo, None),
2421 ];
2422
2423 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2424
2425 assert_eq!(
2426 reports[0].linked_order_ids,
2427 Some(vec![
2428 ClientOrderId::from(stop.as_str()),
2429 ClientOrderId::from(take.as_str()),
2430 ]),
2431 );
2432 assert_eq!(
2433 reports[1].linked_order_ids,
2434 Some(vec![
2435 ClientOrderId::from(entry.as_str()),
2436 ClientOrderId::from(take.as_str()),
2437 ]),
2438 );
2439 assert_eq!(
2440 reports[2].linked_order_ids,
2441 Some(vec![
2442 ClientOrderId::from(entry.as_str()),
2443 ClientOrderId::from(stop.as_str()),
2444 ]),
2445 );
2446 }
2447
2448 #[rstest]
2449 fn test_populate_linked_order_ids_respects_non_contingent_orders() {
2450 let base = "O-20250922-002221-001-000";
2451 let entry = format!("{base}-1");
2452 let passive = format!("{base}-2");
2453
2454 let mut reports = vec![
2455 build_report(&entry, "V-1", ContingencyType::NoContingency, None),
2456 build_report(&passive, "V-2", ContingencyType::Ouo, None),
2457 ];
2458
2459 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2460
2461 assert!(reports[0].linked_order_ids.is_none());
2463
2464 assert!(reports[1].linked_order_ids.is_none());
2466 assert_eq!(reports[1].contingency_type, ContingencyType::NoContingency);
2467 }
2468}