1use std::{
26 collections::HashMap,
27 num::NonZeroU32,
28 sync::{Arc, Mutex},
29};
30
31use ahash::AHashMap;
32use chrono::{DateTime, Utc};
33use nautilus_core::{
34 MUTEX_POISONED, UnixNanos,
35 consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
36 env::get_env_var,
37 time::get_atomic_clock_realtime,
38};
39use nautilus_model::{
40 data::{Bar, BarType, TradeTick},
41 enums::{
42 AggregationSource, BarAggregation, ContingencyType, OrderSide, OrderType, PriceType,
43 TimeInForce, TriggerType,
44 },
45 events::AccountState,
46 identifiers::{AccountId, ClientOrderId, InstrumentId, OrderListId, VenueOrderId},
47 instruments::{Instrument as InstrumentTrait, InstrumentAny},
48 reports::{FillReport, OrderStatusReport, PositionStatusReport},
49 types::{Price, Quantity},
50};
51use nautilus_network::{
52 http::HttpClient,
53 ratelimiter::quota::Quota,
54 retry::{RetryConfig, RetryManager},
55};
56use reqwest::{Method, StatusCode, header::USER_AGENT};
57use serde::{Deserialize, Serialize, de::DeserializeOwned};
58use serde_json::Value;
59use tokio_util::sync::CancellationToken;
60use ustr::Ustr;
61
62use super::{
63 error::{BitmexErrorResponse, BitmexHttpError},
64 models::{
65 BitmexApiInfo, BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder,
66 BitmexPosition, BitmexTrade, BitmexTradeBin, BitmexWallet,
67 },
68 query::{
69 DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
70 GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeBucketedParams,
71 GetTradeBucketedParamsBuilder, GetTradeParams, GetTradeParamsBuilder, PostOrderParams,
72 PostPositionLeverageParams, PutOrderParams,
73 },
74};
75use crate::{
76 common::{
77 consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL},
78 credential::Credential,
79 enums::{BitmexContingencyType, BitmexOrderStatus, BitmexSide},
80 parse::{parse_account_state, quantity_to_u32},
81 },
82 http::{
83 parse::{
84 parse_fill_report, parse_instrument_any, parse_order_status_report,
85 parse_position_report, parse_trade, parse_trade_bin,
86 },
87 query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
88 },
89 websocket::messages::BitmexMarginMsg,
90};
91
92const BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 10;
98const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED: u32 = 120;
99const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED: u32 = 30;
100
101const BITMEX_GLOBAL_RATE_KEY: &str = "bitmex:global";
102const BITMEX_MINUTE_RATE_KEY: &str = "bitmex:minute";
103
104#[derive(Debug, Serialize, Deserialize)]
106pub struct BitmexResponse<T> {
107 pub data: Vec<T>,
109}
110
111#[derive(Debug, Clone)]
131pub struct BitmexHttpInnerClient {
132 base_url: String,
133 client: HttpClient,
134 credential: Option<Credential>,
135 recv_window_ms: u64,
136 retry_manager: RetryManager<BitmexHttpError>,
137 cancellation_token: CancellationToken,
138}
139
140impl Default for BitmexHttpInnerClient {
141 fn default() -> Self {
142 Self::new(None, Some(60), None, None, None, None, None, None)
143 .expect("Failed to create default BitmexHttpInnerClient")
144 }
145}
146
147impl BitmexHttpInnerClient {
148 pub fn cancel_all_requests(&self) {
150 self.cancellation_token.cancel();
151 }
152
153 pub fn cancellation_token(&self) -> &CancellationToken {
155 &self.cancellation_token
156 }
157 #[allow(clippy::too_many_arguments)]
167 pub fn new(
168 base_url: Option<String>,
169 timeout_secs: Option<u64>,
170 max_retries: Option<u32>,
171 retry_delay_ms: Option<u64>,
172 retry_delay_max_ms: Option<u64>,
173 recv_window_ms: Option<u64>,
174 max_requests_per_second: Option<u32>,
175 max_requests_per_minute: Option<u32>,
176 ) -> Result<Self, BitmexHttpError> {
177 let retry_config = RetryConfig {
178 max_retries: max_retries.unwrap_or(3),
179 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
180 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
181 backoff_factor: 2.0,
182 jitter_ms: 1000,
183 operation_timeout_ms: Some(60_000),
184 immediate_first: false,
185 max_elapsed_ms: Some(180_000),
186 };
187
188 let retry_manager = RetryManager::new(retry_config).map_err(|e| {
189 BitmexHttpError::NetworkError(format!("Failed to create retry manager: {e}"))
190 })?;
191
192 let max_req_per_sec =
193 max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
194 let max_req_per_min =
195 max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED);
196
197 Ok(Self {
198 base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
199 client: HttpClient::new(
200 Self::default_headers(),
201 vec![],
202 Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
203 Some(Self::default_quota(max_req_per_sec)),
204 timeout_secs,
205 ),
206 credential: None,
207 recv_window_ms: recv_window_ms.unwrap_or(10_000),
208 retry_manager,
209 cancellation_token: CancellationToken::new(),
210 })
211 }
212
213 #[allow(clippy::too_many_arguments)]
220 pub fn with_credentials(
221 api_key: String,
222 api_secret: String,
223 base_url: String,
224 timeout_secs: Option<u64>,
225 max_retries: Option<u32>,
226 retry_delay_ms: Option<u64>,
227 retry_delay_max_ms: Option<u64>,
228 recv_window_ms: Option<u64>,
229 max_requests_per_second: Option<u32>,
230 max_requests_per_minute: Option<u32>,
231 ) -> Result<Self, BitmexHttpError> {
232 let retry_config = RetryConfig {
233 max_retries: max_retries.unwrap_or(3),
234 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
235 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
236 backoff_factor: 2.0,
237 jitter_ms: 1000,
238 operation_timeout_ms: Some(60_000),
239 immediate_first: false,
240 max_elapsed_ms: Some(180_000),
241 };
242
243 let retry_manager = RetryManager::new(retry_config).map_err(|e| {
244 BitmexHttpError::NetworkError(format!("Failed to create retry manager: {e}"))
245 })?;
246
247 let max_req_per_sec =
248 max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
249 let max_req_per_min =
250 max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED);
251
252 Ok(Self {
253 base_url,
254 client: HttpClient::new(
255 Self::default_headers(),
256 vec![],
257 Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
258 Some(Self::default_quota(max_req_per_sec)),
259 timeout_secs,
260 ),
261 credential: Some(Credential::new(api_key, api_secret)),
262 recv_window_ms: recv_window_ms.unwrap_or(10_000),
263 retry_manager,
264 cancellation_token: CancellationToken::new(),
265 })
266 }
267
268 fn default_headers() -> HashMap<String, String> {
269 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
270 }
271
272 fn default_quota(max_requests_per_second: u32) -> Quota {
273 Quota::per_second(
274 NonZeroU32::new(max_requests_per_second)
275 .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
276 )
277 }
278
279 fn rate_limiter_quotas(
280 max_requests_per_second: u32,
281 max_requests_per_minute: u32,
282 ) -> Vec<(String, Quota)> {
283 let per_sec_quota = Quota::per_second(
284 NonZeroU32::new(max_requests_per_second)
285 .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
286 );
287 let per_min_quota =
288 Quota::per_minute(NonZeroU32::new(max_requests_per_minute).unwrap_or_else(|| {
289 NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED).unwrap()
290 }));
291
292 vec![
293 (BITMEX_GLOBAL_RATE_KEY.to_string(), per_sec_quota),
294 (BITMEX_MINUTE_RATE_KEY.to_string(), per_min_quota),
295 ]
296 }
297
298 fn rate_limit_keys() -> Vec<Ustr> {
299 vec![
300 Ustr::from(BITMEX_GLOBAL_RATE_KEY),
301 Ustr::from(BITMEX_MINUTE_RATE_KEY),
302 ]
303 }
304
305 fn sign_request(
306 &self,
307 method: &Method,
308 endpoint: &str,
309 body: Option<&[u8]>,
310 ) -> Result<HashMap<String, String>, BitmexHttpError> {
311 let credential = self
312 .credential
313 .as_ref()
314 .ok_or(BitmexHttpError::MissingCredentials)?;
315
316 let expires = Utc::now().timestamp() + (self.recv_window_ms / 1000) as i64;
317 let body_str = body.and_then(|b| std::str::from_utf8(b).ok()).unwrap_or("");
318
319 let full_path = if endpoint.starts_with("/api/v1") {
320 endpoint.to_string()
321 } else {
322 format!("/api/v1{endpoint}")
323 };
324
325 let signature = credential.sign(method.as_str(), &full_path, expires, body_str);
326
327 let mut headers = HashMap::new();
328 headers.insert("api-expires".to_string(), expires.to_string());
329 headers.insert("api-key".to_string(), credential.api_key.to_string());
330 headers.insert("api-signature".to_string(), signature);
331
332 if body.is_some()
334 && (*method == Method::POST || *method == Method::PUT || *method == Method::DELETE)
335 {
336 headers.insert(
337 "Content-Type".to_string(),
338 "application/x-www-form-urlencoded".to_string(),
339 );
340 }
341
342 Ok(headers)
343 }
344
345 async fn send_request<T: DeserializeOwned>(
346 &self,
347 method: Method,
348 endpoint: &str,
349 body: Option<Vec<u8>>,
350 authenticate: bool,
351 ) -> Result<T, BitmexHttpError> {
352 let endpoint = endpoint.to_string();
353 let url = format!("{}{endpoint}", self.base_url);
354 let method_clone = method.clone();
355 let body_clone = body.clone();
356
357 let operation = || {
358 let url = url.clone();
359 let method = method_clone.clone();
360 let body = body_clone.clone();
361 let endpoint = endpoint.clone();
362
363 async move {
364 let headers = if authenticate {
365 Some(self.sign_request(&method, endpoint.as_str(), body.as_deref())?)
366 } else {
367 None
368 };
369
370 let rate_keys = Self::rate_limit_keys();
371 let resp = self
372 .client
373 .request_with_ustr_keys(method, url, headers, body, None, Some(rate_keys))
374 .await?;
375
376 if resp.status.is_success() {
377 serde_json::from_slice(&resp.body).map_err(Into::into)
378 } else if let Ok(error_resp) =
379 serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
380 {
381 Err(error_resp.into())
382 } else {
383 Err(BitmexHttpError::UnexpectedStatus {
384 status: StatusCode::from_u16(resp.status.as_u16())
385 .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
386 body: String::from_utf8_lossy(&resp.body).to_string(),
387 })
388 }
389 }
390 };
391
392 let should_retry = |error: &BitmexHttpError| -> bool {
409 match error {
410 BitmexHttpError::NetworkError(_) => true,
411 BitmexHttpError::UnexpectedStatus { status, .. } => {
412 status.as_u16() >= 500 || status.as_u16() == 429
413 }
414 BitmexHttpError::BitmexError {
415 error_name,
416 message,
417 } => {
418 error_name == "RateLimitError"
419 || (error_name == "HTTPError"
420 && message.to_lowercase().contains("rate limit"))
421 }
422 _ => false,
423 }
424 };
425
426 let create_error = |msg: String| -> BitmexHttpError {
427 if msg == "canceled" {
428 BitmexHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
429 } else {
430 BitmexHttpError::NetworkError(msg)
431 }
432 };
433
434 self.retry_manager
435 .execute_with_retry_with_cancel(
436 endpoint.as_str(),
437 operation,
438 should_retry,
439 create_error,
440 &self.cancellation_token,
441 )
442 .await
443 }
444
445 pub async fn http_get_instruments(
451 &self,
452 active_only: bool,
453 ) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
454 let path = if active_only {
455 "/instrument/active"
456 } else {
457 "/instrument"
458 };
459 self.send_request(Method::GET, path, None, false).await
460 }
461
462 pub async fn http_get_server_time(&self) -> Result<u64, BitmexHttpError> {
472 let response: BitmexApiInfo = self.send_request(Method::GET, "", None, false).await?;
473 Ok(response.timestamp)
474 }
475
476 pub async fn http_get_instrument(
487 &self,
488 symbol: &str,
489 ) -> Result<Option<BitmexInstrument>, BitmexHttpError> {
490 let path = &format!("/instrument?symbol={symbol}");
491 let instruments: Vec<BitmexInstrument> =
492 self.send_request(Method::GET, path, None, false).await?;
493
494 Ok(instruments.into_iter().next())
495 }
496
497 pub async fn http_get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
503 let endpoint = "/user/wallet";
504 self.send_request(Method::GET, endpoint, None, true).await
505 }
506
507 pub async fn http_get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
513 let path = format!("/user/margin?currency={currency}");
514 self.send_request(Method::GET, &path, None, true).await
515 }
516
517 pub async fn http_get_trades(
527 &self,
528 params: GetTradeParams,
529 ) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
530 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
531 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
532 })?;
533 let path = format!("/trade?{query}");
534 self.send_request(Method::GET, &path, None, true).await
535 }
536
537 pub async fn http_get_trade_bucketed(
543 &self,
544 params: GetTradeBucketedParams,
545 ) -> Result<Vec<BitmexTradeBin>, BitmexHttpError> {
546 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
547 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
548 })?;
549 let path = format!("/trade/bucketed?{query}");
550 self.send_request(Method::GET, &path, None, true).await
551 }
552
553 pub async fn http_get_orders(
563 &self,
564 params: GetOrderParams,
565 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
566 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
567 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
568 })?;
569 let path = format!("/order?{query}");
570 self.send_request(Method::GET, &path, None, true).await
571 }
572
573 pub async fn http_place_order(
583 &self,
584 params: PostOrderParams,
585 ) -> Result<Value, BitmexHttpError> {
586 let body = serde_urlencoded::to_string(¶ms)
588 .map_err(|e| {
589 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
590 })?
591 .into_bytes();
592 let path = "/order";
593 self.send_request(Method::POST, path, Some(body), true)
594 .await
595 }
596
597 pub async fn http_cancel_orders(
607 &self,
608 params: DeleteOrderParams,
609 ) -> Result<Value, BitmexHttpError> {
610 let body = serde_urlencoded::to_string(¶ms)
612 .map_err(|e| {
613 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
614 })?
615 .into_bytes();
616 let path = "/order";
617 self.send_request(Method::DELETE, path, Some(body), true)
618 .await
619 }
620
621 pub async fn http_amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
631 let body = serde_urlencoded::to_string(¶ms)
633 .map_err(|e| {
634 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
635 })?
636 .into_bytes();
637 let path = "/order";
638 self.send_request(Method::PUT, path, Some(body), true).await
639 }
640
641 pub async fn http_cancel_all_orders(
655 &self,
656 params: DeleteAllOrdersParams,
657 ) -> Result<Value, BitmexHttpError> {
658 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
659 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
660 })?;
661 let path = format!("/order/all?{query}");
662 self.send_request(Method::DELETE, &path, None, true).await
663 }
664
665 pub async fn http_get_executions(
675 &self,
676 params: GetExecutionParams,
677 ) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
678 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
679 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
680 })?;
681 let path = format!("/execution/tradeHistory?{query}");
682 self.send_request(Method::GET, &path, None, true).await
683 }
684
685 pub async fn http_get_positions(
695 &self,
696 params: GetPositionParams,
697 ) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
698 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
699 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
700 })?;
701 let path = format!("/position?{query}");
702 self.send_request(Method::GET, &path, None, true).await
703 }
704
705 pub async fn http_update_position_leverage(
715 &self,
716 params: PostPositionLeverageParams,
717 ) -> Result<BitmexPosition, BitmexHttpError> {
718 let body = serde_urlencoded::to_string(¶ms)
720 .map_err(|e| {
721 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
722 })?
723 .into_bytes();
724 let path = "/position/leverage";
725 self.send_request(Method::POST, path, Some(body), true)
726 .await
727 }
728}
729
730#[derive(Clone, Debug)]
735#[cfg_attr(
736 feature = "python",
737 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
738)]
739pub struct BitmexHttpClient {
740 inner: Arc<BitmexHttpInnerClient>,
741 instruments_cache: Arc<Mutex<AHashMap<Ustr, InstrumentAny>>>,
742}
743
744impl Default for BitmexHttpClient {
745 fn default() -> Self {
746 Self::new(
747 None,
748 None,
749 None,
750 false,
751 Some(60),
752 None,
753 None,
754 None,
755 None,
756 None,
757 None,
758 )
759 .expect("Failed to create default BitmexHttpClient")
760 }
761}
762
763impl BitmexHttpClient {
764 #[allow(clippy::too_many_arguments)]
770 pub fn new(
771 base_url: Option<String>,
772 api_key: Option<String>,
773 api_secret: Option<String>,
774 testnet: bool,
775 timeout_secs: Option<u64>,
776 max_retries: Option<u32>,
777 retry_delay_ms: Option<u64>,
778 retry_delay_max_ms: Option<u64>,
779 recv_window_ms: Option<u64>,
780 max_requests_per_second: Option<u32>,
781 max_requests_per_minute: Option<u32>,
782 ) -> Result<Self, BitmexHttpError> {
783 let url = base_url.unwrap_or_else(|| {
785 if testnet {
786 BITMEX_HTTP_TESTNET_URL.to_string()
787 } else {
788 BITMEX_HTTP_URL.to_string()
789 }
790 });
791
792 let inner = match (api_key, api_secret) {
793 (Some(key), Some(secret)) => BitmexHttpInnerClient::with_credentials(
794 key,
795 secret,
796 url,
797 timeout_secs,
798 max_retries,
799 retry_delay_ms,
800 retry_delay_max_ms,
801 recv_window_ms,
802 max_requests_per_second,
803 max_requests_per_minute,
804 )?,
805 _ => BitmexHttpInnerClient::new(
806 Some(url),
807 timeout_secs,
808 max_retries,
809 retry_delay_ms,
810 retry_delay_max_ms,
811 recv_window_ms,
812 max_requests_per_second,
813 max_requests_per_minute,
814 )?,
815 };
816
817 Ok(Self {
818 inner: Arc::new(inner),
819 instruments_cache: Arc::new(Mutex::new(AHashMap::new())),
820 })
821 }
822
823 pub fn from_env() -> anyhow::Result<Self> {
830 Self::with_credentials(None, None, None, None, None, None, None, None, None, None)
831 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
832 }
833
834 #[allow(clippy::too_many_arguments)]
844 pub fn with_credentials(
845 api_key: Option<String>,
846 api_secret: Option<String>,
847 base_url: Option<String>,
848 timeout_secs: Option<u64>,
849 max_retries: Option<u32>,
850 retry_delay_ms: Option<u64>,
851 retry_delay_max_ms: Option<u64>,
852 recv_window_ms: Option<u64>,
853 max_requests_per_second: Option<u32>,
854 max_requests_per_minute: Option<u32>,
855 ) -> anyhow::Result<Self> {
856 let testnet = base_url.as_ref().is_some_and(|url| url.contains("testnet"));
858
859 let (key_var, secret_var) = if testnet {
861 ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
862 } else {
863 ("BITMEX_API_KEY", "BITMEX_API_SECRET")
864 };
865
866 let api_key = api_key.or_else(|| get_env_var(key_var).ok());
867 let api_secret = api_secret.or_else(|| get_env_var(secret_var).ok());
868
869 if api_key.is_some() && api_secret.is_none() {
871 anyhow::bail!("{secret_var} is required when {key_var} is provided");
872 }
873 if api_key.is_none() && api_secret.is_some() {
874 anyhow::bail!("{key_var} is required when {secret_var} is provided");
875 }
876
877 Self::new(
878 base_url,
879 api_key,
880 api_secret,
881 testnet,
882 timeout_secs,
883 max_retries,
884 retry_delay_ms,
885 retry_delay_max_ms,
886 recv_window_ms,
887 max_requests_per_second,
888 max_requests_per_minute,
889 )
890 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
891 }
892
893 #[must_use]
895 pub fn base_url(&self) -> &str {
896 self.inner.base_url.as_str()
897 }
898
899 #[must_use]
901 pub fn api_key(&self) -> Option<&str> {
902 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
903 }
904
905 pub async fn http_get_server_time(&self) -> Result<u64, BitmexHttpError> {
913 self.inner.http_get_server_time().await
914 }
915
916 fn generate_ts_init(&self) -> UnixNanos {
918 get_atomic_clock_realtime().get_time_ns()
919 }
920
921 fn is_contingent_order(contingency_type: ContingencyType) -> bool {
923 matches!(
924 contingency_type,
925 ContingencyType::Oco | ContingencyType::Oto | ContingencyType::Ouo
926 )
927 }
928
929 fn is_parent_contingency(contingency_type: ContingencyType) -> bool {
931 matches!(
932 contingency_type,
933 ContingencyType::Oco | ContingencyType::Oto
934 )
935 }
936
937 fn populate_linked_order_ids(reports: &mut [OrderStatusReport]) {
939 let mut order_list_groups: HashMap<OrderListId, Vec<ClientOrderId>> = HashMap::new();
940 let mut order_list_parents: HashMap<OrderListId, ClientOrderId> = HashMap::new();
941 let mut prefix_groups: HashMap<String, Vec<ClientOrderId>> = HashMap::new();
942 let mut prefix_parents: HashMap<String, ClientOrderId> = HashMap::new();
943
944 for report in reports.iter() {
945 let Some(client_order_id) = report.client_order_id else {
946 continue;
947 };
948
949 if let Some(order_list_id) = report.order_list_id {
950 order_list_groups
951 .entry(order_list_id)
952 .or_default()
953 .push(client_order_id);
954
955 if Self::is_parent_contingency(report.contingency_type) {
956 order_list_parents
957 .entry(order_list_id)
958 .or_insert(client_order_id);
959 }
960 }
961
962 if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
963 && Self::is_contingent_order(report.contingency_type)
964 {
965 prefix_groups
966 .entry(base.to_owned())
967 .or_default()
968 .push(client_order_id);
969
970 if Self::is_parent_contingency(report.contingency_type) {
971 prefix_parents
972 .entry(base.to_owned())
973 .or_insert(client_order_id);
974 }
975 }
976 }
977
978 for report in reports.iter_mut() {
979 let Some(client_order_id) = report.client_order_id else {
980 continue;
981 };
982
983 if report.linked_order_ids.is_some() {
984 continue;
985 }
986
987 if !Self::is_contingent_order(report.contingency_type) {
989 continue;
990 }
991
992 if let Some(order_list_id) = report.order_list_id
993 && let Some(group) = order_list_groups.get(&order_list_id)
994 {
995 let mut linked: Vec<ClientOrderId> = group
996 .iter()
997 .copied()
998 .filter(|candidate| candidate != &client_order_id)
999 .collect();
1000
1001 if !linked.is_empty() {
1002 if let Some(parent_id) = order_list_parents.get(&order_list_id) {
1003 if client_order_id != *parent_id {
1004 linked.sort_by_key(
1005 |candidate| {
1006 if candidate == parent_id { 0 } else { 1 }
1007 },
1008 );
1009 report.parent_order_id = Some(*parent_id);
1010 } else {
1011 report.parent_order_id = None;
1012 }
1013 } else {
1014 report.parent_order_id = None;
1015 }
1016
1017 tracing::trace!(
1018 client_order_id = ?client_order_id,
1019 order_list_id = ?order_list_id,
1020 contingency_type = ?report.contingency_type,
1021 linked_order_ids = ?linked,
1022 "BitMEX linked ids sourced from order list id",
1023 );
1024 report.linked_order_ids = Some(linked);
1025 continue;
1026 }
1027
1028 tracing::trace!(
1029 client_order_id = ?client_order_id,
1030 order_list_id = ?order_list_id,
1031 contingency_type = ?report.contingency_type,
1032 order_list_group = ?group,
1033 "BitMEX order list id group had no peers",
1034 );
1035 report.parent_order_id = None;
1036 } else if report.order_list_id.is_none() {
1037 report.parent_order_id = None;
1038 }
1039
1040 if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1041 && let Some(group) = prefix_groups.get(base)
1042 {
1043 let mut linked: Vec<ClientOrderId> = group
1044 .iter()
1045 .copied()
1046 .filter(|candidate| candidate != &client_order_id)
1047 .collect();
1048
1049 if !linked.is_empty() {
1050 if let Some(parent_id) = prefix_parents.get(base) {
1051 if client_order_id != *parent_id {
1052 linked.sort_by_key(
1053 |candidate| {
1054 if candidate == parent_id { 0 } else { 1 }
1055 },
1056 );
1057 report.parent_order_id = Some(*parent_id);
1058 } else {
1059 report.parent_order_id = None;
1060 }
1061 } else {
1062 report.parent_order_id = None;
1063 }
1064
1065 tracing::trace!(
1066 client_order_id = ?client_order_id,
1067 contingency_type = ?report.contingency_type,
1068 base = base,
1069 linked_order_ids = ?linked,
1070 "BitMEX linked ids constructed from client order id prefix",
1071 );
1072 report.linked_order_ids = Some(linked);
1073 continue;
1074 }
1075
1076 tracing::trace!(
1077 client_order_id = ?client_order_id,
1078 contingency_type = ?report.contingency_type,
1079 base = base,
1080 prefix_group = ?group,
1081 "BitMEX client order id prefix group had no peers",
1082 );
1083 report.parent_order_id = None;
1084 } else if client_order_id.as_str().contains('-') {
1085 report.parent_order_id = None;
1086 }
1087
1088 if Self::is_contingent_order(report.contingency_type) {
1089 tracing::warn!(
1090 client_order_id = ?report.client_order_id,
1091 order_list_id = ?report.order_list_id,
1092 contingency_type = ?report.contingency_type,
1093 "BitMEX order status report missing linked ids after grouping",
1094 );
1095 report.contingency_type = ContingencyType::NoContingency;
1096 report.parent_order_id = None;
1097 }
1098
1099 report.linked_order_ids = None;
1100 }
1101 }
1102
1103 pub fn cancel_all_requests(&self) {
1105 self.inner.cancel_all_requests();
1106 }
1107
1108 pub fn cancellation_token(&self) -> CancellationToken {
1110 self.inner.cancellation_token().clone()
1111 }
1112
1113 pub fn add_instrument(&self, instrument: InstrumentAny) {
1119 self.instruments_cache
1120 .lock()
1121 .unwrap()
1122 .insert(instrument.raw_symbol().inner(), instrument);
1123 }
1124
1125 pub async fn request_instrument(
1133 &self,
1134 instrument_id: InstrumentId,
1135 ) -> anyhow::Result<Option<InstrumentAny>> {
1136 let response = self
1137 .inner
1138 .http_get_instrument(instrument_id.symbol.as_str())
1139 .await?;
1140
1141 let instrument = match response {
1142 Some(instrument) => instrument,
1143 None => return Ok(None),
1144 };
1145
1146 let ts_init = self.generate_ts_init();
1147
1148 Ok(parse_instrument_any(&instrument, ts_init))
1149 }
1150
1151 pub async fn request_instruments(
1157 &self,
1158 active_only: bool,
1159 ) -> anyhow::Result<Vec<InstrumentAny>> {
1160 let instruments = self.inner.http_get_instruments(active_only).await?;
1161 let ts_init = self.generate_ts_init();
1162
1163 let mut parsed_instruments = Vec::new();
1164 let mut failed_count = 0;
1165 let total_count = instruments.len();
1166
1167 for inst in instruments {
1168 if let Some(instrument_any) = parse_instrument_any(&inst, ts_init) {
1169 parsed_instruments.push(instrument_any);
1170 } else {
1171 failed_count += 1;
1172 tracing::error!(
1173 "Failed to parse instrument: symbol={}, type={:?}, state={:?} - instrument will not be cached",
1174 inst.symbol,
1175 inst.instrument_type,
1176 inst.state
1177 );
1178 }
1179 }
1180
1181 if failed_count > 0 {
1182 tracing::error!(
1183 "Instrument parse failures: {} failed out of {} total ({} successfully parsed)",
1184 failed_count,
1185 total_count,
1186 parsed_instruments.len()
1187 );
1188 }
1189
1190 Ok(parsed_instruments)
1191 }
1192
1193 pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
1203 let inner = self.inner.clone();
1204 inner.http_get_wallet().await
1205 }
1206
1207 pub async fn get_orders(
1217 &self,
1218 params: GetOrderParams,
1219 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
1220 let inner = self.inner.clone();
1221 inner.http_get_orders(params).await
1222 }
1223
1224 pub async fn http_place_order(
1234 &self,
1235 params: PostOrderParams,
1236 ) -> Result<Value, BitmexHttpError> {
1237 let inner = self.inner.clone();
1238 inner.http_place_order(params).await
1239 }
1240
1241 pub async fn http_cancel_orders(
1251 &self,
1252 params: DeleteOrderParams,
1253 ) -> Result<Value, BitmexHttpError> {
1254 let inner = self.inner.clone();
1255 inner.http_cancel_orders(params).await
1256 }
1257
1258 pub async fn http_amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
1268 let inner = self.inner.clone();
1269 inner.http_amend_order(params).await
1270 }
1271
1272 pub async fn http_cancel_all_orders(
1286 &self,
1287 params: DeleteAllOrdersParams,
1288 ) -> Result<Value, BitmexHttpError> {
1289 let inner = self.inner.clone();
1290 inner.http_cancel_all_orders(params).await
1291 }
1292
1293 fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1303 let cache = self.instruments_cache.lock().expect(MUTEX_POISONED);
1304 cache.get(&symbol).cloned().ok_or_else(|| {
1305 anyhow::anyhow!(
1306 "Instrument {symbol} not found in cache, ensure instruments loaded first"
1307 )
1308 })
1309 }
1310
1311 pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
1318 self.instrument_from_cache(symbol)
1319 .map(|instrument| instrument.price_precision())
1320 }
1321
1322 pub async fn http_get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
1328 self.inner
1329 .http_get_margin(currency)
1330 .await
1331 .map_err(|e| anyhow::anyhow!(e))
1332 }
1333
1334 pub async fn request_account_state(
1340 &self,
1341 account_id: AccountId,
1342 ) -> anyhow::Result<AccountState> {
1343 let margin = self
1345 .inner
1346 .http_get_margin("XBt")
1347 .await
1348 .map_err(|e| anyhow::anyhow!(e))?;
1349
1350 let ts_init = nautilus_core::nanos::UnixNanos::from(
1351 chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64,
1352 );
1353
1354 let margin_msg = BitmexMarginMsg {
1356 account: margin.account,
1357 currency: margin.currency,
1358 risk_limit: margin.risk_limit,
1359 amount: margin.amount,
1360 prev_realised_pnl: margin.prev_realised_pnl,
1361 gross_comm: margin.gross_comm,
1362 gross_open_cost: margin.gross_open_cost,
1363 gross_open_premium: margin.gross_open_premium,
1364 gross_exec_cost: margin.gross_exec_cost,
1365 gross_mark_value: margin.gross_mark_value,
1366 risk_value: margin.risk_value,
1367 init_margin: margin.init_margin,
1368 maint_margin: margin.maint_margin,
1369 target_excess_margin: margin.target_excess_margin,
1370 realised_pnl: margin.realised_pnl,
1371 unrealised_pnl: margin.unrealised_pnl,
1372 wallet_balance: margin.wallet_balance,
1373 margin_balance: margin.margin_balance,
1374 margin_leverage: margin.margin_leverage,
1375 margin_used_pcnt: margin.margin_used_pcnt,
1376 excess_margin: margin.excess_margin,
1377 available_margin: margin.available_margin,
1378 withdrawable_margin: margin.withdrawable_margin,
1379 maker_fee_discount: None, taker_fee_discount: None, timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
1382 foreign_margin_balance: None,
1383 foreign_requirement: None,
1384 };
1385
1386 parse_account_state(&margin_msg, account_id, ts_init)
1387 }
1388
1389 #[allow(clippy::too_many_arguments)]
1396 pub async fn submit_order(
1397 &self,
1398 instrument_id: InstrumentId,
1399 client_order_id: ClientOrderId,
1400 order_side: OrderSide,
1401 order_type: OrderType,
1402 quantity: Quantity,
1403 time_in_force: TimeInForce,
1404 price: Option<Price>,
1405 trigger_price: Option<Price>,
1406 trigger_type: Option<TriggerType>,
1407 display_qty: Option<Quantity>,
1408 post_only: bool,
1409 reduce_only: bool,
1410 order_list_id: Option<OrderListId>,
1411 contingency_type: Option<ContingencyType>,
1412 ) -> anyhow::Result<OrderStatusReport> {
1413 use crate::common::enums::{
1414 BitmexExecInstruction, BitmexOrderType, BitmexSide, BitmexTimeInForce,
1415 };
1416
1417 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1418
1419 let mut params = super::query::PostOrderParamsBuilder::default();
1420 params.text(NAUTILUS_TRADER);
1421 params.symbol(instrument_id.symbol.as_str());
1422 params.cl_ord_id(client_order_id.as_str());
1423
1424 let side = BitmexSide::try_from_order_side(order_side)?;
1425 params.side(side);
1426
1427 let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
1428 params.ord_type(ord_type);
1429
1430 params.order_qty(quantity_to_u32(&quantity, &instrument));
1431
1432 let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
1433 params.time_in_force(tif);
1434
1435 if let Some(price) = price {
1436 params.price(price.as_f64());
1437 }
1438
1439 if let Some(trigger_price) = trigger_price {
1440 params.stop_px(trigger_price.as_f64());
1441 }
1442
1443 if let Some(display_qty) = display_qty {
1444 params.display_qty(quantity_to_u32(&display_qty, &instrument));
1445 }
1446
1447 if let Some(order_list_id) = order_list_id {
1448 params.cl_ord_link_id(order_list_id.as_str());
1449 }
1450
1451 let mut exec_inst = Vec::new();
1452
1453 if post_only {
1454 exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
1455 }
1456
1457 if reduce_only {
1458 exec_inst.push(BitmexExecInstruction::ReduceOnly);
1459 }
1460
1461 if trigger_price.is_some()
1462 && let Some(trigger_type) = trigger_type
1463 {
1464 match trigger_type {
1465 TriggerType::LastPrice => exec_inst.push(BitmexExecInstruction::LastPrice),
1466 TriggerType::MarkPrice => exec_inst.push(BitmexExecInstruction::MarkPrice),
1467 TriggerType::IndexPrice => exec_inst.push(BitmexExecInstruction::IndexPrice),
1468 _ => {} }
1470 }
1471
1472 if !exec_inst.is_empty() {
1473 params.exec_inst(exec_inst);
1474 }
1475
1476 if let Some(contingency_type) = contingency_type {
1477 let bitmex_contingency = BitmexContingencyType::try_from(contingency_type)?;
1478 params.contingency_type(bitmex_contingency);
1479 }
1480
1481 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1482
1483 let response = self.inner.http_place_order(params).await?;
1484
1485 let order: BitmexOrder = serde_json::from_value(response)?;
1486
1487 if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1488 let reason = order
1489 .ord_rej_reason
1490 .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1491 anyhow::bail!("Order rejected: {reason}");
1492 }
1493
1494 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1495 let ts_init = self.generate_ts_init();
1496
1497 parse_order_status_report(&order, &instrument, ts_init)
1498 }
1499
1500 pub async fn cancel_order(
1510 &self,
1511 instrument_id: InstrumentId,
1512 client_order_id: Option<ClientOrderId>,
1513 venue_order_id: Option<VenueOrderId>,
1514 ) -> anyhow::Result<OrderStatusReport> {
1515 let mut params = super::query::DeleteOrderParamsBuilder::default();
1516 params.text(NAUTILUS_TRADER);
1517
1518 if let Some(venue_order_id) = venue_order_id {
1519 params.order_id(vec![venue_order_id.as_str().to_string()]);
1520 } else if let Some(client_order_id) = client_order_id {
1521 params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
1522 } else {
1523 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1524 }
1525
1526 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1527
1528 let response = self.inner.http_cancel_orders(params).await?;
1529
1530 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1531 let order = orders
1532 .into_iter()
1533 .next()
1534 .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1535
1536 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1537 let ts_init = self.generate_ts_init();
1538
1539 parse_order_status_report(&order, &instrument, ts_init)
1540 }
1541
1542 pub async fn cancel_orders(
1552 &self,
1553 instrument_id: InstrumentId,
1554 client_order_ids: Option<Vec<ClientOrderId>>,
1555 venue_order_ids: Option<Vec<VenueOrderId>>,
1556 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1557 let mut params = super::query::DeleteOrderParamsBuilder::default();
1558 params.text(NAUTILUS_TRADER);
1559
1560 if let Some(venue_order_ids) = venue_order_ids {
1563 if venue_order_ids.is_empty() {
1564 anyhow::bail!("venue_order_ids cannot be empty");
1565 }
1566 params.order_id(
1567 venue_order_ids
1568 .iter()
1569 .map(|id| id.to_string())
1570 .collect::<Vec<_>>(),
1571 );
1572 } else if let Some(client_order_ids) = client_order_ids {
1573 if client_order_ids.is_empty() {
1574 anyhow::bail!("client_order_ids cannot be empty");
1575 }
1576 params.cl_ord_id(
1577 client_order_ids
1578 .iter()
1579 .map(|id| id.to_string())
1580 .collect::<Vec<_>>(),
1581 );
1582 } else {
1583 anyhow::bail!("Either client_order_ids or venue_order_ids must be provided");
1584 }
1585
1586 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1587
1588 let response = self.inner.http_cancel_orders(params).await?;
1589
1590 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1591
1592 let ts_init = self.generate_ts_init();
1593 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1594
1595 let mut reports = Vec::new();
1596
1597 for order in orders {
1598 reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1599 }
1600
1601 Self::populate_linked_order_ids(&mut reports);
1602
1603 Ok(reports)
1604 }
1605
1606 pub async fn cancel_all_orders(
1616 &self,
1617 instrument_id: InstrumentId,
1618 order_side: Option<OrderSide>,
1619 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1620 let mut params = DeleteAllOrdersParamsBuilder::default();
1621 params.text(NAUTILUS_TRADER);
1622 params.symbol(instrument_id.symbol.as_str());
1623
1624 if let Some(side) = order_side {
1625 let side = BitmexSide::try_from_order_side(side)?;
1626 params.filter(serde_json::json!({
1627 "side": side
1628 }));
1629 }
1630
1631 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1632
1633 let response = self.inner.http_cancel_all_orders(params).await?;
1634
1635 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1636
1637 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1638 let ts_init = self.generate_ts_init();
1639
1640 let mut reports = Vec::new();
1641
1642 for order in orders {
1643 reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1644 }
1645
1646 Self::populate_linked_order_ids(&mut reports);
1647
1648 Ok(reports)
1649 }
1650
1651 pub async fn modify_order(
1662 &self,
1663 instrument_id: InstrumentId,
1664 client_order_id: Option<ClientOrderId>,
1665 venue_order_id: Option<VenueOrderId>,
1666 quantity: Option<Quantity>,
1667 price: Option<Price>,
1668 trigger_price: Option<Price>,
1669 ) -> anyhow::Result<OrderStatusReport> {
1670 let mut params = PutOrderParamsBuilder::default();
1671 params.text(NAUTILUS_TRADER);
1672
1673 if let Some(venue_order_id) = venue_order_id {
1675 params.order_id(venue_order_id.as_str());
1676 } else if let Some(client_order_id) = client_order_id {
1677 params.orig_cl_ord_id(client_order_id.as_str());
1678 } else {
1679 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1680 }
1681
1682 if let Some(quantity) = quantity {
1683 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1684 params.order_qty(quantity_to_u32(&quantity, &instrument));
1685 }
1686
1687 if let Some(price) = price {
1688 params.price(price.as_f64());
1689 }
1690
1691 if let Some(trigger_price) = trigger_price {
1692 params.stop_px(trigger_price.as_f64());
1693 }
1694
1695 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1696
1697 let response = self.inner.http_amend_order(params).await?;
1698
1699 let order: BitmexOrder = serde_json::from_value(response)?;
1700
1701 if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1702 let reason = order
1703 .ord_rej_reason
1704 .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1705 anyhow::bail!("Order modification rejected: {reason}");
1706 }
1707
1708 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1709 let ts_init = self.generate_ts_init();
1710
1711 parse_order_status_report(&order, &instrument, ts_init)
1712 }
1713
1714 pub async fn query_order(
1723 &self,
1724 instrument_id: InstrumentId,
1725 client_order_id: Option<ClientOrderId>,
1726 venue_order_id: Option<VenueOrderId>,
1727 ) -> anyhow::Result<Option<OrderStatusReport>> {
1728 let mut params = GetOrderParamsBuilder::default();
1729
1730 let filter_json = if let Some(client_order_id) = client_order_id {
1731 serde_json::json!({
1732 "clOrdID": client_order_id.to_string()
1733 })
1734 } else if let Some(venue_order_id) = venue_order_id {
1735 serde_json::json!({
1736 "orderID": venue_order_id.to_string()
1737 })
1738 } else {
1739 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1740 };
1741
1742 params.filter(filter_json);
1743 params.count(1); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1746
1747 let response = self.inner.http_get_orders(params).await?;
1748
1749 if response.is_empty() {
1750 return Ok(None);
1751 }
1752
1753 let order = &response[0];
1754
1755 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1756 let ts_init = self.generate_ts_init();
1757
1758 let report = parse_order_status_report(order, &instrument, ts_init)?;
1759
1760 Ok(Some(report))
1761 }
1762
1763 pub async fn request_order_status_report(
1772 &self,
1773 instrument_id: InstrumentId,
1774 client_order_id: Option<ClientOrderId>,
1775 venue_order_id: Option<VenueOrderId>,
1776 ) -> anyhow::Result<OrderStatusReport> {
1777 let mut params = GetOrderParamsBuilder::default();
1778 params.symbol(instrument_id.symbol.as_str());
1779
1780 if let Some(venue_order_id) = venue_order_id {
1781 params.filter(serde_json::json!({
1782 "orderID": venue_order_id.as_str()
1783 }));
1784 } else if let Some(client_order_id) = client_order_id {
1785 params.filter(serde_json::json!({
1786 "clOrdID": client_order_id.as_str()
1787 }));
1788 }
1789
1790 params.count(1i32);
1791 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1792
1793 let response = self.inner.http_get_orders(params).await?;
1794
1795 let order = response
1796 .into_iter()
1797 .next()
1798 .ok_or_else(|| anyhow::anyhow!("Order not found"))?;
1799
1800 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1801 let ts_init = self.generate_ts_init();
1802
1803 parse_order_status_report(&order, &instrument, ts_init)
1804 }
1805
1806 pub async fn request_order_status_reports(
1815 &self,
1816 instrument_id: Option<InstrumentId>,
1817 open_only: bool,
1818 limit: Option<u32>,
1819 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1820 let mut params = GetOrderParamsBuilder::default();
1821
1822 if let Some(instrument_id) = &instrument_id {
1823 params.symbol(instrument_id.symbol.as_str());
1824 }
1825
1826 if open_only {
1827 params.filter(serde_json::json!({
1828 "open": true
1829 }));
1830 }
1831
1832 if let Some(limit) = limit {
1833 params.count(limit as i32);
1834 } else {
1835 params.count(500); }
1837
1838 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1841
1842 let response = self.inner.http_get_orders(params).await?;
1843
1844 let ts_init = self.generate_ts_init();
1845
1846 let mut reports = Vec::new();
1847
1848 for order in response {
1849 let Some(symbol) = order.symbol else {
1851 tracing::warn!("Order response missing symbol, skipping");
1852 continue;
1853 };
1854
1855 let instrument = self.instrument_from_cache(symbol)?;
1856
1857 match parse_order_status_report(&order, &instrument, ts_init) {
1858 Ok(report) => reports.push(report),
1859 Err(e) => tracing::error!("Failed to parse order status report: {e}"),
1860 }
1861 }
1862
1863 Self::populate_linked_order_ids(&mut reports);
1864
1865 Ok(reports)
1866 }
1867
1868 pub async fn request_trades(
1874 &self,
1875 instrument_id: InstrumentId,
1876 start: Option<DateTime<Utc>>,
1877 end: Option<DateTime<Utc>>,
1878 limit: Option<u32>,
1879 ) -> anyhow::Result<Vec<TradeTick>> {
1880 let mut params = GetTradeParamsBuilder::default();
1881 params.symbol(instrument_id.symbol.as_str());
1882
1883 if let Some(start) = start {
1884 params.start_time(start);
1885 }
1886
1887 if let Some(end) = end {
1888 params.end_time(end);
1889 }
1890
1891 if let (Some(start), Some(end)) = (start, end) {
1892 anyhow::ensure!(
1893 start < end,
1894 "Invalid time range: start={start:?} end={end:?}",
1895 );
1896 }
1897
1898 if let Some(limit) = limit {
1899 let clamped_limit = limit.min(1000);
1900 if limit > 1000 {
1901 tracing::warn!(
1902 limit,
1903 clamped_limit,
1904 "BitMEX trade request limit exceeds venue maximum; clamping",
1905 );
1906 }
1907 params.count(i32::try_from(clamped_limit).unwrap_or(1000));
1908 }
1909 params.reverse(false);
1910 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1911
1912 let response = self.inner.http_get_trades(params).await?;
1913
1914 let ts_init = self.generate_ts_init();
1915
1916 let mut parsed_trades = Vec::new();
1917
1918 for trade in response {
1919 if let Some(start) = start
1920 && trade.timestamp < start
1921 {
1922 continue;
1923 }
1924
1925 if let Some(end) = end
1926 && trade.timestamp > end
1927 {
1928 continue;
1929 }
1930
1931 let price_precision = self.get_price_precision(trade.symbol)?;
1932
1933 match parse_trade(trade, price_precision, ts_init) {
1934 Ok(trade) => parsed_trades.push(trade),
1935 Err(e) => tracing::error!("Failed to parse trade: {e}"),
1936 }
1937 }
1938
1939 Ok(parsed_trades)
1940 }
1941
1942 pub async fn request_bars(
1949 &self,
1950 mut bar_type: BarType,
1951 start: Option<DateTime<Utc>>,
1952 end: Option<DateTime<Utc>>,
1953 limit: Option<u32>,
1954 partial: bool,
1955 ) -> anyhow::Result<Vec<Bar>> {
1956 bar_type = bar_type.standard();
1957
1958 anyhow::ensure!(
1959 bar_type.aggregation_source() == AggregationSource::External,
1960 "Only EXTERNAL aggregation bars are supported"
1961 );
1962 anyhow::ensure!(
1963 bar_type.spec().price_type == PriceType::Last,
1964 "Only LAST price type bars are supported"
1965 );
1966 if let (Some(start), Some(end)) = (start, end) {
1967 anyhow::ensure!(
1968 start < end,
1969 "Invalid time range: start={start:?} end={end:?}"
1970 );
1971 }
1972
1973 let spec = bar_type.spec();
1974 let bin_size = match (spec.aggregation, spec.step.get()) {
1975 (BarAggregation::Minute, 1) => "1m",
1976 (BarAggregation::Minute, 5) => "5m",
1977 (BarAggregation::Hour, 1) => "1h",
1978 (BarAggregation::Day, 1) => "1d",
1979 _ => anyhow::bail!(
1980 "BitMEX does not support {}-{:?}-{:?} bars",
1981 spec.step.get(),
1982 spec.aggregation,
1983 spec.price_type,
1984 ),
1985 };
1986
1987 let instrument_id = bar_type.instrument_id();
1988 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1989
1990 let mut params = GetTradeBucketedParamsBuilder::default();
1991 params.symbol(instrument_id.symbol.as_str());
1992 params.bin_size(bin_size);
1993 if partial {
1994 params.partial(true);
1995 }
1996 if let Some(start) = start {
1997 params.start_time(start);
1998 }
1999 if let Some(end) = end {
2000 params.end_time(end);
2001 }
2002 if let Some(limit) = limit {
2003 let clamped_limit = limit.min(1000);
2004 if limit > 1000 {
2005 tracing::warn!(
2006 limit,
2007 clamped_limit,
2008 "BitMEX bar request limit exceeds venue maximum; clamping",
2009 );
2010 }
2011 params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2012 }
2013 params.reverse(false);
2014 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2015
2016 let response = self.inner.http_get_trade_bucketed(params).await?;
2017 let ts_init = self.generate_ts_init();
2018 let mut bars = Vec::new();
2019
2020 for bin in response {
2021 if let Some(start) = start
2022 && bin.timestamp < start
2023 {
2024 continue;
2025 }
2026 if let Some(end) = end
2027 && bin.timestamp > end
2028 {
2029 continue;
2030 }
2031 if bin.symbol != instrument_id.symbol.inner() {
2032 tracing::warn!(
2033 symbol = %bin.symbol,
2034 expected = %instrument_id.symbol,
2035 "Skipping trade bin for unexpected symbol",
2036 );
2037 continue;
2038 }
2039
2040 match parse_trade_bin(bin, &instrument, &bar_type, ts_init) {
2041 Ok(bar) => bars.push(bar),
2042 Err(e) => tracing::warn!("Failed to parse trade bin: {e}"),
2043 }
2044 }
2045
2046 Ok(bars)
2047 }
2048
2049 pub async fn request_fill_reports(
2055 &self,
2056 instrument_id: Option<InstrumentId>,
2057 limit: Option<u32>,
2058 ) -> anyhow::Result<Vec<FillReport>> {
2059 let mut params = GetExecutionParamsBuilder::default();
2060 if let Some(instrument_id) = instrument_id {
2061 params.symbol(instrument_id.symbol.as_str());
2062 }
2063 if let Some(limit) = limit {
2064 params.count(limit as i32);
2065 } else {
2066 params.count(500); }
2068 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2071
2072 let response = self.inner.http_get_executions(params).await?;
2073
2074 let ts_init = self.generate_ts_init();
2075
2076 let mut reports = Vec::new();
2077
2078 for exec in response {
2079 let Some(symbol) = exec.symbol else {
2081 tracing::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
2082 continue;
2083 };
2084 let symbol_str = symbol.to_string();
2085
2086 let instrument = match self.instrument_from_cache(symbol) {
2087 Ok(instrument) => instrument,
2088 Err(e) => {
2089 tracing::error!(symbol = %symbol_str, "Instrument not found in cache for execution parsing: {e}");
2090 continue;
2091 }
2092 };
2093
2094 match parse_fill_report(exec, &instrument, ts_init) {
2095 Ok(report) => reports.push(report),
2096 Err(e) => {
2097 let error_msg = e.to_string();
2099 if error_msg.starts_with("Skipping non-trade execution")
2100 || error_msg.starts_with("Skipping execution without order_id")
2101 {
2102 tracing::debug!("{e}");
2103 } else {
2104 tracing::error!("Failed to parse fill report: {e}");
2105 }
2106 }
2107 }
2108 }
2109
2110 Ok(reports)
2111 }
2112
2113 pub async fn request_position_status_reports(
2119 &self,
2120 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2121 let params = GetPositionParamsBuilder::default()
2122 .count(500) .build()
2124 .map_err(|e| anyhow::anyhow!(e))?;
2125
2126 let response = self.inner.http_get_positions(params).await?;
2127
2128 let ts_init = self.generate_ts_init();
2129
2130 let mut reports = Vec::new();
2131
2132 for pos in response {
2133 let symbol = Ustr::from(pos.symbol.as_str());
2134 let instrument = match self.instrument_from_cache(symbol) {
2135 Ok(instrument) => instrument,
2136 Err(e) => {
2137 tracing::error!(
2138 symbol = pos.symbol.as_str(),
2139 "Instrument not found in cache for position parsing: {e}"
2140 );
2141 continue;
2142 }
2143 };
2144
2145 match parse_position_report(pos, &instrument, ts_init) {
2146 Ok(report) => reports.push(report),
2147 Err(e) => tracing::error!("Failed to parse position report: {e}"),
2148 }
2149 }
2150
2151 Ok(reports)
2152 }
2153
2154 pub async fn update_position_leverage(
2162 &self,
2163 symbol: &str,
2164 leverage: f64,
2165 ) -> anyhow::Result<PositionStatusReport> {
2166 let params = PostPositionLeverageParams {
2167 symbol: symbol.to_string(),
2168 leverage,
2169 target_account_id: None,
2170 };
2171
2172 let response = self.inner.http_update_position_leverage(params).await?;
2173
2174 let instrument = self.instrument_from_cache(Ustr::from(symbol))?;
2175 let ts_init = self.generate_ts_init();
2176
2177 parse_position_report(response, &instrument, ts_init)
2178 }
2179}
2180
2181#[cfg(test)]
2186mod tests {
2187 use nautilus_core::UUID4;
2188 use nautilus_model::enums::OrderStatus;
2189 use rstest::rstest;
2190 use serde_json::json;
2191
2192 use super::*;
2193
2194 fn build_report(
2195 client_order_id: &str,
2196 venue_order_id: &str,
2197 contingency_type: ContingencyType,
2198 order_list_id: Option<&str>,
2199 ) -> OrderStatusReport {
2200 let mut report = OrderStatusReport::new(
2201 AccountId::from("BITMEX-1"),
2202 InstrumentId::from("XBTUSD.BITMEX"),
2203 Some(ClientOrderId::from(client_order_id)),
2204 VenueOrderId::from(venue_order_id),
2205 OrderSide::Buy,
2206 OrderType::Limit,
2207 TimeInForce::Gtc,
2208 OrderStatus::Accepted,
2209 Quantity::new(100.0, 0),
2210 Quantity::default(),
2211 UnixNanos::from(1_u64),
2212 UnixNanos::from(1_u64),
2213 UnixNanos::from(1_u64),
2214 Some(UUID4::new()),
2215 );
2216
2217 if let Some(id) = order_list_id {
2218 report = report.with_order_list_id(OrderListId::from(id));
2219 }
2220
2221 report.with_contingency_type(contingency_type)
2222 }
2223
2224 #[rstest]
2225 fn test_sign_request_generates_correct_headers() {
2226 let client = BitmexHttpInnerClient::with_credentials(
2227 "test_api_key".to_string(),
2228 "test_api_secret".to_string(),
2229 "http://localhost:8080".to_string(),
2230 Some(60),
2231 None, None, None, None, None, None, )
2238 .expect("Failed to create test client");
2239
2240 let headers = client
2241 .sign_request(&Method::GET, "/api/v1/order", None)
2242 .unwrap();
2243
2244 assert!(headers.contains_key("api-key"));
2245 assert!(headers.contains_key("api-signature"));
2246 assert!(headers.contains_key("api-expires"));
2247 assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
2248 }
2249
2250 #[rstest]
2251 fn test_sign_request_with_body() {
2252 let client = BitmexHttpInnerClient::with_credentials(
2253 "test_api_key".to_string(),
2254 "test_api_secret".to_string(),
2255 "http://localhost:8080".to_string(),
2256 Some(60),
2257 None, None, None, None, None, None, )
2264 .expect("Failed to create test client");
2265
2266 let body = json!({"symbol": "XBTUSD", "orderQty": 100});
2267 let body_bytes = serde_json::to_vec(&body).unwrap();
2268
2269 let headers_without_body = client
2270 .sign_request(&Method::POST, "/api/v1/order", None)
2271 .unwrap();
2272 let headers_with_body = client
2273 .sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
2274 .unwrap();
2275
2276 assert_ne!(
2278 headers_without_body.get("api-signature").unwrap(),
2279 headers_with_body.get("api-signature").unwrap()
2280 );
2281 }
2282
2283 #[rstest]
2284 fn test_sign_request_uses_custom_recv_window() {
2285 let client_default = BitmexHttpInnerClient::with_credentials(
2286 "test_api_key".to_string(),
2287 "test_api_secret".to_string(),
2288 "http://localhost:8080".to_string(),
2289 Some(60),
2290 None,
2291 None,
2292 None,
2293 None, None, None, )
2297 .expect("Failed to create test client");
2298
2299 let client_custom = BitmexHttpInnerClient::with_credentials(
2300 "test_api_key".to_string(),
2301 "test_api_secret".to_string(),
2302 "http://localhost:8080".to_string(),
2303 Some(60),
2304 None,
2305 None,
2306 None,
2307 Some(30_000), None, None, )
2311 .expect("Failed to create test client");
2312
2313 let headers_default = client_default
2314 .sign_request(&Method::GET, "/api/v1/order", None)
2315 .unwrap();
2316 let headers_custom = client_custom
2317 .sign_request(&Method::GET, "/api/v1/order", None)
2318 .unwrap();
2319
2320 let expires_default: i64 = headers_default.get("api-expires").unwrap().parse().unwrap();
2322 let expires_custom: i64 = headers_custom.get("api-expires").unwrap().parse().unwrap();
2323
2324 let now = Utc::now().timestamp();
2326 assert!(expires_default > now);
2327 assert!(expires_custom > now);
2328
2329 assert!(expires_custom > expires_default);
2331
2332 let diff = expires_custom - expires_default;
2335 assert!((18..=25).contains(&diff));
2336 }
2337
2338 #[rstest]
2339 fn test_populate_linked_order_ids_from_order_list() {
2340 let base = "O-20250922-002219-001-000";
2341 let entry = format!("{base}-1");
2342 let stop = format!("{base}-2");
2343 let take = format!("{base}-3");
2344
2345 let mut reports = vec![
2346 build_report(&entry, "V-1", ContingencyType::Oto, Some("OL-1")),
2347 build_report(&stop, "V-2", ContingencyType::Ouo, Some("OL-1")),
2348 build_report(&take, "V-3", ContingencyType::Ouo, Some("OL-1")),
2349 ];
2350
2351 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2352
2353 assert_eq!(
2354 reports[0].linked_order_ids,
2355 Some(vec![
2356 ClientOrderId::from(stop.as_str()),
2357 ClientOrderId::from(take.as_str()),
2358 ]),
2359 );
2360 assert_eq!(
2361 reports[1].linked_order_ids,
2362 Some(vec![
2363 ClientOrderId::from(entry.as_str()),
2364 ClientOrderId::from(take.as_str()),
2365 ]),
2366 );
2367 assert_eq!(
2368 reports[2].linked_order_ids,
2369 Some(vec![
2370 ClientOrderId::from(entry.as_str()),
2371 ClientOrderId::from(stop.as_str()),
2372 ]),
2373 );
2374 }
2375
2376 #[rstest]
2377 fn test_populate_linked_order_ids_from_id_prefix() {
2378 let base = "O-20250922-002220-001-000";
2379 let entry = format!("{base}-1");
2380 let stop = format!("{base}-2");
2381 let take = format!("{base}-3");
2382
2383 let mut reports = vec![
2384 build_report(&entry, "V-1", ContingencyType::Oto, None),
2385 build_report(&stop, "V-2", ContingencyType::Ouo, None),
2386 build_report(&take, "V-3", ContingencyType::Ouo, None),
2387 ];
2388
2389 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2390
2391 assert_eq!(
2392 reports[0].linked_order_ids,
2393 Some(vec![
2394 ClientOrderId::from(stop.as_str()),
2395 ClientOrderId::from(take.as_str()),
2396 ]),
2397 );
2398 assert_eq!(
2399 reports[1].linked_order_ids,
2400 Some(vec![
2401 ClientOrderId::from(entry.as_str()),
2402 ClientOrderId::from(take.as_str()),
2403 ]),
2404 );
2405 assert_eq!(
2406 reports[2].linked_order_ids,
2407 Some(vec![
2408 ClientOrderId::from(entry.as_str()),
2409 ClientOrderId::from(stop.as_str()),
2410 ]),
2411 );
2412 }
2413
2414 #[rstest]
2415 fn test_populate_linked_order_ids_respects_non_contingent_orders() {
2416 let base = "O-20250922-002221-001-000";
2417 let entry = format!("{base}-1");
2418 let passive = format!("{base}-2");
2419
2420 let mut reports = vec![
2421 build_report(&entry, "V-1", ContingencyType::NoContingency, None),
2422 build_report(&passive, "V-2", ContingencyType::Ouo, None),
2423 ];
2424
2425 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2426
2427 assert!(reports[0].linked_order_ids.is_none());
2429
2430 assert!(reports[1].linked_order_ids.is_none());
2432 assert_eq!(reports[1].contingency_type, ContingencyType::NoContingency);
2433 }
2434}