1use std::{
26 collections::HashMap,
27 num::NonZeroU32,
28 sync::{Arc, Mutex},
29};
30
31use ahash::AHashMap;
32use chrono::{DateTime, Utc};
33use nautilus_core::{
34 UnixNanos,
35 consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
36 env::get_env_var,
37 time::get_atomic_clock_realtime,
38};
39use nautilus_model::{
40 data::{Bar, BarType, TradeTick},
41 enums::{
42 AggregationSource, BarAggregation, ContingencyType, OrderSide, OrderType, PriceType,
43 TimeInForce, TriggerType,
44 },
45 events::AccountState,
46 identifiers::{AccountId, ClientOrderId, InstrumentId, OrderListId, VenueOrderId},
47 instruments::{Instrument as InstrumentTrait, InstrumentAny},
48 reports::{FillReport, OrderStatusReport, PositionStatusReport},
49 types::{Price, Quantity},
50};
51use nautilus_network::{
52 http::HttpClient,
53 ratelimiter::quota::Quota,
54 retry::{RetryConfig, RetryManager},
55};
56use reqwest::{Method, StatusCode, header::USER_AGENT};
57use serde::{Deserialize, Serialize, de::DeserializeOwned};
58use serde_json::Value;
59use tokio_util::sync::CancellationToken;
60use ustr::Ustr;
61
62use super::{
63 error::{BitmexErrorResponse, BitmexHttpError},
64 models::{
65 BitmexApiInfo, BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder,
66 BitmexPosition, BitmexTrade, BitmexTradeBin, BitmexWallet,
67 },
68 query::{
69 DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
70 GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeBucketedParams,
71 GetTradeBucketedParamsBuilder, GetTradeParams, GetTradeParamsBuilder, PostOrderParams,
72 PostPositionLeverageParams, PutOrderParams,
73 },
74};
75use crate::{
76 common::{
77 consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL},
78 credential::Credential,
79 enums::{BitmexContingencyType, BitmexOrderStatus, BitmexSide},
80 parse::{parse_account_state, quantity_to_u32},
81 },
82 http::{
83 parse::{
84 parse_fill_report, parse_instrument_any, parse_order_status_report,
85 parse_position_report, parse_trade, parse_trade_bin,
86 },
87 query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
88 },
89 websocket::messages::BitmexMarginMsg,
90};
91
92const BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 10;
98const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED: u32 = 120;
99const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED: u32 = 30;
100
101const BITMEX_GLOBAL_RATE_KEY: &str = "bitmex:global";
102const BITMEX_MINUTE_RATE_KEY: &str = "bitmex:minute";
103
104#[derive(Debug, Serialize, Deserialize)]
106pub struct BitmexResponse<T> {
107 pub data: Vec<T>,
109}
110
111#[derive(Debug, Clone)]
131pub struct BitmexHttpInnerClient {
132 base_url: String,
133 client: HttpClient,
134 credential: Option<Credential>,
135 recv_window_ms: u64,
136 retry_manager: RetryManager<BitmexHttpError>,
137 cancellation_token: CancellationToken,
138}
139
140impl Default for BitmexHttpInnerClient {
141 fn default() -> Self {
142 Self::new(None, Some(60), None, None, None, None, None, None)
143 .expect("Failed to create default BitmexHttpInnerClient")
144 }
145}
146
147impl BitmexHttpInnerClient {
148 pub fn cancel_all_requests(&self) {
150 self.cancellation_token.cancel();
151 }
152
153 pub fn cancellation_token(&self) -> &CancellationToken {
155 &self.cancellation_token
156 }
157 #[allow(clippy::too_many_arguments)]
167 pub fn new(
168 base_url: Option<String>,
169 timeout_secs: Option<u64>,
170 max_retries: Option<u32>,
171 retry_delay_ms: Option<u64>,
172 retry_delay_max_ms: Option<u64>,
173 recv_window_ms: Option<u64>,
174 max_requests_per_second: Option<u32>,
175 max_requests_per_minute: Option<u32>,
176 ) -> Result<Self, BitmexHttpError> {
177 let retry_config = RetryConfig {
178 max_retries: max_retries.unwrap_or(3),
179 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
180 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
181 backoff_factor: 2.0,
182 jitter_ms: 1000,
183 operation_timeout_ms: Some(60_000),
184 immediate_first: false,
185 max_elapsed_ms: Some(180_000),
186 };
187
188 let retry_manager = RetryManager::new(retry_config).map_err(|e| {
189 BitmexHttpError::NetworkError(format!("Failed to create retry manager: {e}"))
190 })?;
191
192 let max_req_per_sec =
193 max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
194 let max_req_per_min =
195 max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED);
196
197 Ok(Self {
198 base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
199 client: HttpClient::new(
200 Self::default_headers(),
201 vec![],
202 Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
203 Some(Self::default_quota(max_req_per_sec)),
204 timeout_secs,
205 ),
206 credential: None,
207 recv_window_ms: recv_window_ms.unwrap_or(10_000),
208 retry_manager,
209 cancellation_token: CancellationToken::new(),
210 })
211 }
212
213 #[allow(clippy::too_many_arguments)]
220 pub fn with_credentials(
221 api_key: String,
222 api_secret: String,
223 base_url: String,
224 timeout_secs: Option<u64>,
225 max_retries: Option<u32>,
226 retry_delay_ms: Option<u64>,
227 retry_delay_max_ms: Option<u64>,
228 recv_window_ms: Option<u64>,
229 max_requests_per_second: Option<u32>,
230 max_requests_per_minute: Option<u32>,
231 ) -> Result<Self, BitmexHttpError> {
232 let retry_config = RetryConfig {
233 max_retries: max_retries.unwrap_or(3),
234 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
235 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
236 backoff_factor: 2.0,
237 jitter_ms: 1000,
238 operation_timeout_ms: Some(60_000),
239 immediate_first: false,
240 max_elapsed_ms: Some(180_000),
241 };
242
243 let retry_manager = RetryManager::new(retry_config).map_err(|e| {
244 BitmexHttpError::NetworkError(format!("Failed to create retry manager: {e}"))
245 })?;
246
247 let max_req_per_sec =
248 max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
249 let max_req_per_min =
250 max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED);
251
252 Ok(Self {
253 base_url,
254 client: HttpClient::new(
255 Self::default_headers(),
256 vec![],
257 Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
258 Some(Self::default_quota(max_req_per_sec)),
259 timeout_secs,
260 ),
261 credential: Some(Credential::new(api_key, api_secret)),
262 recv_window_ms: recv_window_ms.unwrap_or(10_000),
263 retry_manager,
264 cancellation_token: CancellationToken::new(),
265 })
266 }
267
268 fn default_headers() -> HashMap<String, String> {
269 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
270 }
271
272 fn default_quota(max_requests_per_second: u32) -> Quota {
273 Quota::per_second(
274 NonZeroU32::new(max_requests_per_second)
275 .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
276 )
277 }
278
279 fn rate_limiter_quotas(
280 max_requests_per_second: u32,
281 max_requests_per_minute: u32,
282 ) -> Vec<(String, Quota)> {
283 let per_sec_quota = Quota::per_second(
284 NonZeroU32::new(max_requests_per_second)
285 .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
286 );
287 let per_min_quota =
288 Quota::per_minute(NonZeroU32::new(max_requests_per_minute).unwrap_or_else(|| {
289 NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED).unwrap()
290 }));
291
292 vec![
293 (BITMEX_GLOBAL_RATE_KEY.to_string(), per_sec_quota),
294 (BITMEX_MINUTE_RATE_KEY.to_string(), per_min_quota),
295 ]
296 }
297
298 fn rate_limit_keys() -> Vec<Ustr> {
299 vec![
300 Ustr::from(BITMEX_GLOBAL_RATE_KEY),
301 Ustr::from(BITMEX_MINUTE_RATE_KEY),
302 ]
303 }
304
305 fn sign_request(
306 &self,
307 method: &Method,
308 endpoint: &str,
309 body: Option<&[u8]>,
310 ) -> Result<HashMap<String, String>, BitmexHttpError> {
311 let credential = self
312 .credential
313 .as_ref()
314 .ok_or(BitmexHttpError::MissingCredentials)?;
315
316 let expires = Utc::now().timestamp() + (self.recv_window_ms / 1000) as i64;
317 let body_str = body.and_then(|b| std::str::from_utf8(b).ok()).unwrap_or("");
318
319 let full_path = if endpoint.starts_with("/api/v1") {
320 endpoint.to_string()
321 } else {
322 format!("/api/v1{endpoint}")
323 };
324
325 let signature = credential.sign(method.as_str(), &full_path, expires, body_str);
326
327 let mut headers = HashMap::new();
328 headers.insert("api-expires".to_string(), expires.to_string());
329 headers.insert("api-key".to_string(), credential.api_key.to_string());
330 headers.insert("api-signature".to_string(), signature);
331
332 if body.is_some()
334 && (*method == Method::POST || *method == Method::PUT || *method == Method::DELETE)
335 {
336 headers.insert(
337 "Content-Type".to_string(),
338 "application/x-www-form-urlencoded".to_string(),
339 );
340 }
341
342 Ok(headers)
343 }
344
345 async fn send_request<T: DeserializeOwned>(
346 &self,
347 method: Method,
348 endpoint: &str,
349 body: Option<Vec<u8>>,
350 authenticate: bool,
351 ) -> Result<T, BitmexHttpError> {
352 let endpoint = endpoint.to_string();
353 let url = format!("{}{endpoint}", self.base_url);
354 let method_clone = method.clone();
355 let body_clone = body.clone();
356
357 let operation = || {
358 let url = url.clone();
359 let method = method_clone.clone();
360 let body = body_clone.clone();
361 let endpoint = endpoint.clone();
362
363 async move {
364 let headers = if authenticate {
365 Some(self.sign_request(&method, endpoint.as_str(), body.as_deref())?)
366 } else {
367 None
368 };
369
370 let rate_keys = Self::rate_limit_keys();
371 let resp = self
372 .client
373 .request_with_ustr_keys(method, url, headers, body, None, Some(rate_keys))
374 .await?;
375
376 if resp.status.is_success() {
377 serde_json::from_slice(&resp.body).map_err(Into::into)
378 } else if let Ok(error_resp) =
379 serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
380 {
381 Err(error_resp.into())
382 } else {
383 Err(BitmexHttpError::UnexpectedStatus {
384 status: StatusCode::from_u16(resp.status.as_u16())
385 .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
386 body: String::from_utf8_lossy(&resp.body).to_string(),
387 })
388 }
389 }
390 };
391
392 let should_retry = |error: &BitmexHttpError| -> bool {
409 match error {
410 BitmexHttpError::NetworkError(_) => true,
411 BitmexHttpError::UnexpectedStatus { status, .. } => {
412 status.as_u16() >= 500 || status.as_u16() == 429
413 }
414 BitmexHttpError::BitmexError {
415 error_name,
416 message,
417 } => {
418 error_name == "RateLimitError"
419 || (error_name == "HTTPError"
420 && message.to_lowercase().contains("rate limit"))
421 }
422 _ => false,
423 }
424 };
425
426 let create_error = |msg: String| -> BitmexHttpError {
427 if msg == "canceled" {
428 BitmexHttpError::NetworkError("Request canceled".to_string())
429 } else {
430 BitmexHttpError::NetworkError(msg)
431 }
432 };
433
434 self.retry_manager
435 .execute_with_retry_with_cancel(
436 endpoint.as_str(),
437 operation,
438 should_retry,
439 create_error,
440 &self.cancellation_token,
441 )
442 .await
443 }
444
445 pub async fn http_get_instruments(
451 &self,
452 active_only: bool,
453 ) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
454 let path = if active_only {
455 "/instrument/active"
456 } else {
457 "/instrument"
458 };
459 self.send_request(Method::GET, path, None, false).await
460 }
461
462 pub async fn http_get_server_time(&self) -> Result<u64, BitmexHttpError> {
472 let response: BitmexApiInfo = self.send_request(Method::GET, "", None, false).await?;
473 Ok(response.timestamp)
474 }
475
476 pub async fn http_get_instrument(
487 &self,
488 symbol: &str,
489 ) -> Result<Option<BitmexInstrument>, BitmexHttpError> {
490 let path = &format!("/instrument?symbol={symbol}");
491 let instruments: Vec<BitmexInstrument> =
492 self.send_request(Method::GET, path, None, false).await?;
493
494 Ok(instruments.into_iter().next())
495 }
496
497 pub async fn http_get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
503 let endpoint = "/user/wallet";
504 self.send_request(Method::GET, endpoint, None, true).await
505 }
506
507 pub async fn http_get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
513 let path = format!("/user/margin?currency={currency}");
514 self.send_request(Method::GET, &path, None, true).await
515 }
516
517 pub async fn http_get_trades(
527 &self,
528 params: GetTradeParams,
529 ) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
530 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
531 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
532 })?;
533 let path = format!("/trade?{query}");
534 self.send_request(Method::GET, &path, None, true).await
535 }
536
537 pub async fn http_get_trade_bucketed(
543 &self,
544 params: GetTradeBucketedParams,
545 ) -> Result<Vec<BitmexTradeBin>, BitmexHttpError> {
546 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
547 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
548 })?;
549 let path = format!("/trade/bucketed?{query}");
550 self.send_request(Method::GET, &path, None, true).await
551 }
552
553 pub async fn http_get_orders(
563 &self,
564 params: GetOrderParams,
565 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
566 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
567 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
568 })?;
569 let path = format!("/order?{query}");
570 self.send_request(Method::GET, &path, None, true).await
571 }
572
573 pub async fn http_place_order(
583 &self,
584 params: PostOrderParams,
585 ) -> Result<Value, BitmexHttpError> {
586 let body = serde_urlencoded::to_string(¶ms)
588 .map_err(|e| {
589 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
590 })?
591 .into_bytes();
592 let path = "/order";
593 self.send_request(Method::POST, path, Some(body), true)
594 .await
595 }
596
597 pub async fn http_cancel_orders(
607 &self,
608 params: DeleteOrderParams,
609 ) -> Result<Value, BitmexHttpError> {
610 let body = serde_urlencoded::to_string(¶ms)
612 .map_err(|e| {
613 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
614 })?
615 .into_bytes();
616 let path = "/order";
617 self.send_request(Method::DELETE, path, Some(body), true)
618 .await
619 }
620
621 pub async fn http_amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
631 let body = serde_urlencoded::to_string(¶ms)
633 .map_err(|e| {
634 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
635 })?
636 .into_bytes();
637 let path = "/order";
638 self.send_request(Method::PUT, path, Some(body), true).await
639 }
640
641 pub async fn http_cancel_all_orders(
655 &self,
656 params: DeleteAllOrdersParams,
657 ) -> Result<Value, BitmexHttpError> {
658 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
659 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
660 })?;
661 let path = format!("/order/all?{query}");
662 self.send_request(Method::DELETE, &path, None, true).await
663 }
664
665 pub async fn http_get_executions(
675 &self,
676 params: GetExecutionParams,
677 ) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
678 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
679 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
680 })?;
681 let path = format!("/execution/tradeHistory?{query}");
682 self.send_request(Method::GET, &path, None, true).await
683 }
684
685 pub async fn http_get_positions(
695 &self,
696 params: GetPositionParams,
697 ) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
698 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
699 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
700 })?;
701 let path = format!("/position?{query}");
702 self.send_request(Method::GET, &path, None, true).await
703 }
704
705 pub async fn http_update_position_leverage(
715 &self,
716 params: PostPositionLeverageParams,
717 ) -> Result<BitmexPosition, BitmexHttpError> {
718 let body = serde_urlencoded::to_string(¶ms)
720 .map_err(|e| {
721 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
722 })?
723 .into_bytes();
724 let path = "/position/leverage";
725 self.send_request(Method::POST, path, Some(body), true)
726 .await
727 }
728}
729
730#[derive(Clone, Debug)]
735#[cfg_attr(
736 feature = "python",
737 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
738)]
739pub struct BitmexHttpClient {
740 inner: Arc<BitmexHttpInnerClient>,
741 instruments_cache: Arc<Mutex<AHashMap<Ustr, InstrumentAny>>>,
742}
743
744impl Default for BitmexHttpClient {
745 fn default() -> Self {
746 Self::new(
747 None,
748 None,
749 None,
750 false,
751 Some(60),
752 None,
753 None,
754 None,
755 None,
756 None,
757 None,
758 )
759 .expect("Failed to create default BitmexHttpClient")
760 }
761}
762
763impl BitmexHttpClient {
764 #[allow(clippy::too_many_arguments)]
770 pub fn new(
771 base_url: Option<String>,
772 api_key: Option<String>,
773 api_secret: Option<String>,
774 testnet: bool,
775 timeout_secs: Option<u64>,
776 max_retries: Option<u32>,
777 retry_delay_ms: Option<u64>,
778 retry_delay_max_ms: Option<u64>,
779 recv_window_ms: Option<u64>,
780 max_requests_per_second: Option<u32>,
781 max_requests_per_minute: Option<u32>,
782 ) -> Result<Self, BitmexHttpError> {
783 let url = base_url.unwrap_or_else(|| {
785 if testnet {
786 BITMEX_HTTP_TESTNET_URL.to_string()
787 } else {
788 BITMEX_HTTP_URL.to_string()
789 }
790 });
791
792 let inner = match (api_key, api_secret) {
793 (Some(key), Some(secret)) => BitmexHttpInnerClient::with_credentials(
794 key,
795 secret,
796 url,
797 timeout_secs,
798 max_retries,
799 retry_delay_ms,
800 retry_delay_max_ms,
801 recv_window_ms,
802 max_requests_per_second,
803 max_requests_per_minute,
804 )?,
805 _ => BitmexHttpInnerClient::new(
806 Some(url),
807 timeout_secs,
808 max_retries,
809 retry_delay_ms,
810 retry_delay_max_ms,
811 recv_window_ms,
812 max_requests_per_second,
813 max_requests_per_minute,
814 )?,
815 };
816
817 Ok(Self {
818 inner: Arc::new(inner),
819 instruments_cache: Arc::new(Mutex::new(AHashMap::new())),
820 })
821 }
822
823 pub fn from_env() -> anyhow::Result<Self> {
830 Self::with_credentials(None, None, None, None, None, None, None, None, None, None)
831 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
832 }
833
834 #[allow(clippy::too_many_arguments)]
844 pub fn with_credentials(
845 api_key: Option<String>,
846 api_secret: Option<String>,
847 base_url: Option<String>,
848 timeout_secs: Option<u64>,
849 max_retries: Option<u32>,
850 retry_delay_ms: Option<u64>,
851 retry_delay_max_ms: Option<u64>,
852 recv_window_ms: Option<u64>,
853 max_requests_per_second: Option<u32>,
854 max_requests_per_minute: Option<u32>,
855 ) -> anyhow::Result<Self> {
856 let testnet = base_url.as_ref().is_some_and(|url| url.contains("testnet"));
858
859 let (key_var, secret_var) = if testnet {
861 ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
862 } else {
863 ("BITMEX_API_KEY", "BITMEX_API_SECRET")
864 };
865
866 let api_key = api_key.or_else(|| get_env_var(key_var).ok());
867 let api_secret = api_secret.or_else(|| get_env_var(secret_var).ok());
868
869 if api_key.is_some() && api_secret.is_none() {
871 anyhow::bail!("{secret_var} is required when {key_var} is provided");
872 }
873 if api_key.is_none() && api_secret.is_some() {
874 anyhow::bail!("{key_var} is required when {secret_var} is provided");
875 }
876
877 Self::new(
878 base_url,
879 api_key,
880 api_secret,
881 testnet,
882 timeout_secs,
883 max_retries,
884 retry_delay_ms,
885 retry_delay_max_ms,
886 recv_window_ms,
887 max_requests_per_second,
888 max_requests_per_minute,
889 )
890 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
891 }
892
893 #[must_use]
895 pub fn base_url(&self) -> &str {
896 self.inner.base_url.as_str()
897 }
898
899 #[must_use]
901 pub fn api_key(&self) -> Option<&str> {
902 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
903 }
904
905 pub async fn http_get_server_time(&self) -> Result<u64, BitmexHttpError> {
913 self.inner.http_get_server_time().await
914 }
915
916 fn generate_ts_init(&self) -> UnixNanos {
918 get_atomic_clock_realtime().get_time_ns()
919 }
920
921 fn is_contingent_order(contingency_type: ContingencyType) -> bool {
923 matches!(
924 contingency_type,
925 ContingencyType::Oco | ContingencyType::Oto | ContingencyType::Ouo
926 )
927 }
928
929 fn is_parent_contingency(contingency_type: ContingencyType) -> bool {
931 matches!(
932 contingency_type,
933 ContingencyType::Oco | ContingencyType::Oto
934 )
935 }
936
937 fn populate_linked_order_ids(reports: &mut [OrderStatusReport]) {
939 let mut order_list_groups: HashMap<OrderListId, Vec<ClientOrderId>> = HashMap::new();
940 let mut order_list_parents: HashMap<OrderListId, ClientOrderId> = HashMap::new();
941 let mut prefix_groups: HashMap<String, Vec<ClientOrderId>> = HashMap::new();
942 let mut prefix_parents: HashMap<String, ClientOrderId> = HashMap::new();
943
944 for report in reports.iter() {
945 let Some(client_order_id) = report.client_order_id else {
946 continue;
947 };
948
949 if let Some(order_list_id) = report.order_list_id {
950 order_list_groups
951 .entry(order_list_id)
952 .or_default()
953 .push(client_order_id);
954
955 if Self::is_parent_contingency(report.contingency_type) {
956 order_list_parents
957 .entry(order_list_id)
958 .or_insert(client_order_id);
959 }
960 }
961
962 if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
963 && Self::is_contingent_order(report.contingency_type)
964 {
965 prefix_groups
966 .entry(base.to_owned())
967 .or_default()
968 .push(client_order_id);
969
970 if Self::is_parent_contingency(report.contingency_type) {
971 prefix_parents
972 .entry(base.to_owned())
973 .or_insert(client_order_id);
974 }
975 }
976 }
977
978 for report in reports.iter_mut() {
979 let Some(client_order_id) = report.client_order_id else {
980 continue;
981 };
982
983 if report.linked_order_ids.is_some() {
984 continue;
985 }
986
987 if !Self::is_contingent_order(report.contingency_type) {
989 continue;
990 }
991
992 if let Some(order_list_id) = report.order_list_id
993 && let Some(group) = order_list_groups.get(&order_list_id)
994 {
995 let mut linked: Vec<ClientOrderId> = group
996 .iter()
997 .copied()
998 .filter(|candidate| candidate != &client_order_id)
999 .collect();
1000
1001 if !linked.is_empty() {
1002 if let Some(parent_id) = order_list_parents.get(&order_list_id) {
1003 if client_order_id != *parent_id {
1004 linked.sort_by_key(
1005 |candidate| {
1006 if candidate == parent_id { 0 } else { 1 }
1007 },
1008 );
1009 report.parent_order_id = Some(*parent_id);
1010 } else {
1011 report.parent_order_id = None;
1012 }
1013 } else {
1014 report.parent_order_id = None;
1015 }
1016
1017 tracing::trace!(
1018 client_order_id = ?client_order_id,
1019 order_list_id = ?order_list_id,
1020 contingency_type = ?report.contingency_type,
1021 linked_order_ids = ?linked,
1022 "BitMEX linked ids sourced from order list id",
1023 );
1024 report.linked_order_ids = Some(linked);
1025 continue;
1026 }
1027
1028 tracing::trace!(
1029 client_order_id = ?client_order_id,
1030 order_list_id = ?order_list_id,
1031 contingency_type = ?report.contingency_type,
1032 order_list_group = ?group,
1033 "BitMEX order list id group had no peers",
1034 );
1035 report.parent_order_id = None;
1036 } else if report.order_list_id.is_none() {
1037 report.parent_order_id = None;
1038 }
1039
1040 if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1041 && let Some(group) = prefix_groups.get(base)
1042 {
1043 let mut linked: Vec<ClientOrderId> = group
1044 .iter()
1045 .copied()
1046 .filter(|candidate| candidate != &client_order_id)
1047 .collect();
1048
1049 if !linked.is_empty() {
1050 if let Some(parent_id) = prefix_parents.get(base) {
1051 if client_order_id != *parent_id {
1052 linked.sort_by_key(
1053 |candidate| {
1054 if candidate == parent_id { 0 } else { 1 }
1055 },
1056 );
1057 report.parent_order_id = Some(*parent_id);
1058 } else {
1059 report.parent_order_id = None;
1060 }
1061 } else {
1062 report.parent_order_id = None;
1063 }
1064
1065 tracing::trace!(
1066 client_order_id = ?client_order_id,
1067 contingency_type = ?report.contingency_type,
1068 base = base,
1069 linked_order_ids = ?linked,
1070 "BitMEX linked ids constructed from client order id prefix",
1071 );
1072 report.linked_order_ids = Some(linked);
1073 continue;
1074 }
1075
1076 tracing::trace!(
1077 client_order_id = ?client_order_id,
1078 contingency_type = ?report.contingency_type,
1079 base = base,
1080 prefix_group = ?group,
1081 "BitMEX client order id prefix group had no peers",
1082 );
1083 report.parent_order_id = None;
1084 } else if client_order_id.as_str().contains('-') {
1085 report.parent_order_id = None;
1086 }
1087
1088 if Self::is_contingent_order(report.contingency_type) {
1089 tracing::warn!(
1090 client_order_id = ?report.client_order_id,
1091 order_list_id = ?report.order_list_id,
1092 contingency_type = ?report.contingency_type,
1093 "BitMEX order status report missing linked ids after grouping",
1094 );
1095 report.contingency_type = ContingencyType::NoContingency;
1096 report.parent_order_id = None;
1097 }
1098
1099 report.linked_order_ids = None;
1100 }
1101 }
1102
1103 pub fn cancel_all_requests(&self) {
1105 self.inner.cancel_all_requests();
1106 }
1107
1108 pub fn cancellation_token(&self) -> CancellationToken {
1110 self.inner.cancellation_token().clone()
1111 }
1112
1113 pub fn add_instrument(&self, instrument: InstrumentAny) {
1119 self.instruments_cache
1120 .lock()
1121 .unwrap()
1122 .insert(instrument.raw_symbol().inner(), instrument);
1123 }
1124
1125 pub async fn request_instrument(
1133 &self,
1134 instrument_id: InstrumentId,
1135 ) -> anyhow::Result<Option<InstrumentAny>> {
1136 let response = self
1137 .inner
1138 .http_get_instrument(instrument_id.symbol.as_str())
1139 .await?;
1140
1141 let instrument = match response {
1142 Some(instrument) => instrument,
1143 None => return Ok(None),
1144 };
1145
1146 let ts_init = self.generate_ts_init();
1147
1148 Ok(parse_instrument_any(&instrument, ts_init))
1149 }
1150
1151 pub async fn request_instruments(
1157 &self,
1158 active_only: bool,
1159 ) -> anyhow::Result<Vec<InstrumentAny>> {
1160 let instruments = self.inner.http_get_instruments(active_only).await?;
1161 let ts_init = self.generate_ts_init();
1162
1163 let mut parsed_instruments = Vec::new();
1164 for inst in instruments {
1165 if let Some(instrument_any) = parse_instrument_any(&inst, ts_init) {
1166 parsed_instruments.push(instrument_any);
1167 }
1168 }
1169
1170 Ok(parsed_instruments)
1171 }
1172
1173 pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
1183 let inner = self.inner.clone();
1184 inner.http_get_wallet().await
1185 }
1186
1187 pub async fn get_orders(
1197 &self,
1198 params: GetOrderParams,
1199 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
1200 let inner = self.inner.clone();
1201 inner.http_get_orders(params).await
1202 }
1203
1204 pub async fn http_place_order(
1214 &self,
1215 params: PostOrderParams,
1216 ) -> Result<Value, BitmexHttpError> {
1217 let inner = self.inner.clone();
1218 inner.http_place_order(params).await
1219 }
1220
1221 pub async fn http_cancel_orders(
1231 &self,
1232 params: DeleteOrderParams,
1233 ) -> Result<Value, BitmexHttpError> {
1234 let inner = self.inner.clone();
1235 inner.http_cancel_orders(params).await
1236 }
1237
1238 pub async fn http_amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
1248 let inner = self.inner.clone();
1249 inner.http_amend_order(params).await
1250 }
1251
1252 pub async fn http_cancel_all_orders(
1266 &self,
1267 params: DeleteAllOrdersParams,
1268 ) -> Result<Value, BitmexHttpError> {
1269 let inner = self.inner.clone();
1270 inner.http_cancel_all_orders(params).await
1271 }
1272
1273 fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1283 let cache = self.instruments_cache.lock().unwrap();
1284 cache.get(&symbol).cloned().ok_or_else(|| {
1285 anyhow::anyhow!(
1286 "Instrument {symbol} not found in cache, ensure instruments loaded first"
1287 )
1288 })
1289 }
1290
1291 pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
1298 self.instrument_from_cache(symbol)
1299 .map(|instrument| instrument.price_precision())
1300 }
1301
1302 pub async fn http_get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
1308 self.inner
1309 .http_get_margin(currency)
1310 .await
1311 .map_err(|e| anyhow::anyhow!(e))
1312 }
1313
1314 pub async fn request_account_state(
1320 &self,
1321 account_id: AccountId,
1322 ) -> anyhow::Result<AccountState> {
1323 let margin = self
1325 .inner
1326 .http_get_margin("XBt")
1327 .await
1328 .map_err(|e| anyhow::anyhow!(e))?;
1329
1330 let ts_init = nautilus_core::nanos::UnixNanos::from(
1331 chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64,
1332 );
1333
1334 let margin_msg = BitmexMarginMsg {
1336 account: margin.account,
1337 currency: margin.currency,
1338 risk_limit: margin.risk_limit,
1339 amount: margin.amount,
1340 prev_realised_pnl: margin.prev_realised_pnl,
1341 gross_comm: margin.gross_comm,
1342 gross_open_cost: margin.gross_open_cost,
1343 gross_open_premium: margin.gross_open_premium,
1344 gross_exec_cost: margin.gross_exec_cost,
1345 gross_mark_value: margin.gross_mark_value,
1346 risk_value: margin.risk_value,
1347 init_margin: margin.init_margin,
1348 maint_margin: margin.maint_margin,
1349 target_excess_margin: margin.target_excess_margin,
1350 realised_pnl: margin.realised_pnl,
1351 unrealised_pnl: margin.unrealised_pnl,
1352 wallet_balance: margin.wallet_balance,
1353 margin_balance: margin.margin_balance,
1354 margin_leverage: margin.margin_leverage,
1355 margin_used_pcnt: margin.margin_used_pcnt,
1356 excess_margin: margin.excess_margin,
1357 available_margin: margin.available_margin,
1358 withdrawable_margin: margin.withdrawable_margin,
1359 maker_fee_discount: None, taker_fee_discount: None, timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
1362 foreign_margin_balance: None,
1363 foreign_requirement: None,
1364 };
1365
1366 parse_account_state(&margin_msg, account_id, ts_init)
1367 }
1368
1369 #[allow(clippy::too_many_arguments)]
1376 pub async fn submit_order(
1377 &self,
1378 instrument_id: InstrumentId,
1379 client_order_id: ClientOrderId,
1380 order_side: OrderSide,
1381 order_type: OrderType,
1382 quantity: Quantity,
1383 time_in_force: TimeInForce,
1384 price: Option<Price>,
1385 trigger_price: Option<Price>,
1386 trigger_type: Option<TriggerType>,
1387 display_qty: Option<Quantity>,
1388 post_only: bool,
1389 reduce_only: bool,
1390 order_list_id: Option<OrderListId>,
1391 contingency_type: Option<ContingencyType>,
1392 ) -> anyhow::Result<OrderStatusReport> {
1393 use crate::common::enums::{
1394 BitmexExecInstruction, BitmexOrderType, BitmexSide, BitmexTimeInForce,
1395 };
1396
1397 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1398
1399 let mut params = super::query::PostOrderParamsBuilder::default();
1400 params.text(NAUTILUS_TRADER);
1401 params.symbol(instrument_id.symbol.as_str());
1402 params.cl_ord_id(client_order_id.as_str());
1403
1404 let side = BitmexSide::try_from_order_side(order_side)?;
1405 params.side(side);
1406
1407 let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
1408 params.ord_type(ord_type);
1409
1410 params.order_qty(quantity_to_u32(&quantity, &instrument));
1411
1412 let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
1413 params.time_in_force(tif);
1414
1415 if let Some(price) = price {
1416 params.price(price.as_f64());
1417 }
1418
1419 if let Some(trigger_price) = trigger_price {
1420 params.stop_px(trigger_price.as_f64());
1421 }
1422
1423 if let Some(display_qty) = display_qty {
1424 params.display_qty(quantity_to_u32(&display_qty, &instrument));
1425 }
1426
1427 if let Some(order_list_id) = order_list_id {
1428 params.cl_ord_link_id(order_list_id.as_str());
1429 }
1430
1431 let mut exec_inst = Vec::new();
1432
1433 if post_only {
1434 exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
1435 }
1436
1437 if reduce_only {
1438 exec_inst.push(BitmexExecInstruction::ReduceOnly);
1439 }
1440
1441 if trigger_price.is_some()
1442 && let Some(trigger_type) = trigger_type
1443 {
1444 match trigger_type {
1445 TriggerType::LastPrice => exec_inst.push(BitmexExecInstruction::LastPrice),
1446 TriggerType::MarkPrice => exec_inst.push(BitmexExecInstruction::MarkPrice),
1447 TriggerType::IndexPrice => exec_inst.push(BitmexExecInstruction::IndexPrice),
1448 _ => {} }
1450 }
1451
1452 if !exec_inst.is_empty() {
1453 params.exec_inst(exec_inst);
1454 }
1455
1456 if let Some(contingency_type) = contingency_type {
1457 let bitmex_contingency = BitmexContingencyType::try_from(contingency_type)?;
1458 params.contingency_type(bitmex_contingency);
1459 }
1460
1461 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1462
1463 let response = self.inner.http_place_order(params).await?;
1464
1465 let order: BitmexOrder = serde_json::from_value(response)?;
1466
1467 if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1468 let reason = order
1469 .ord_rej_reason
1470 .map(|r| r.to_string())
1471 .unwrap_or_else(|| "No reason provided".to_string());
1472 anyhow::bail!("Order rejected: {reason}");
1473 }
1474
1475 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1476 let ts_init = self.generate_ts_init();
1477
1478 parse_order_status_report(&order, &instrument, ts_init)
1479 }
1480
1481 pub async fn cancel_order(
1491 &self,
1492 instrument_id: InstrumentId,
1493 client_order_id: Option<ClientOrderId>,
1494 venue_order_id: Option<VenueOrderId>,
1495 ) -> anyhow::Result<OrderStatusReport> {
1496 let mut params = super::query::DeleteOrderParamsBuilder::default();
1497 params.text(NAUTILUS_TRADER);
1498
1499 if let Some(venue_order_id) = venue_order_id {
1500 params.order_id(vec![venue_order_id.as_str().to_string()]);
1501 } else if let Some(client_order_id) = client_order_id {
1502 params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
1503 } else {
1504 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1505 }
1506
1507 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1508
1509 let response = self.inner.http_cancel_orders(params).await?;
1510
1511 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1512 let order = orders
1513 .into_iter()
1514 .next()
1515 .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1516
1517 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1518 let ts_init = self.generate_ts_init();
1519
1520 parse_order_status_report(&order, &instrument, ts_init)
1521 }
1522
1523 pub async fn cancel_orders(
1533 &self,
1534 instrument_id: InstrumentId,
1535 client_order_ids: Option<Vec<ClientOrderId>>,
1536 venue_order_ids: Option<Vec<VenueOrderId>>,
1537 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1538 let mut params = super::query::DeleteOrderParamsBuilder::default();
1539 params.text(NAUTILUS_TRADER);
1540
1541 if let Some(venue_order_ids) = venue_order_ids {
1544 if venue_order_ids.is_empty() {
1545 anyhow::bail!("venue_order_ids cannot be empty");
1546 }
1547 params.order_id(
1548 venue_order_ids
1549 .iter()
1550 .map(|id| id.to_string())
1551 .collect::<Vec<_>>(),
1552 );
1553 } else if let Some(client_order_ids) = client_order_ids {
1554 if client_order_ids.is_empty() {
1555 anyhow::bail!("client_order_ids cannot be empty");
1556 }
1557 params.cl_ord_id(
1558 client_order_ids
1559 .iter()
1560 .map(|id| id.to_string())
1561 .collect::<Vec<_>>(),
1562 );
1563 } else {
1564 anyhow::bail!("Either client_order_ids or venue_order_ids must be provided");
1565 }
1566
1567 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1568
1569 let response = self.inner.http_cancel_orders(params).await?;
1570
1571 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1572
1573 let ts_init = self.generate_ts_init();
1574 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1575
1576 let mut reports = Vec::new();
1577
1578 for order in orders {
1579 reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1580 }
1581
1582 Self::populate_linked_order_ids(&mut reports);
1583
1584 Ok(reports)
1585 }
1586
1587 pub async fn cancel_all_orders(
1597 &self,
1598 instrument_id: InstrumentId,
1599 order_side: Option<OrderSide>,
1600 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1601 let mut params = DeleteAllOrdersParamsBuilder::default();
1602 params.text(NAUTILUS_TRADER);
1603 params.symbol(instrument_id.symbol.as_str());
1604
1605 if let Some(side) = order_side {
1606 let side = BitmexSide::try_from_order_side(side)?;
1607 params.filter(serde_json::json!({
1608 "side": side
1609 }));
1610 }
1611
1612 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1613
1614 let response = self.inner.http_cancel_all_orders(params).await?;
1615
1616 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1617
1618 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1619 let ts_init = self.generate_ts_init();
1620
1621 let mut reports = Vec::new();
1622
1623 for order in orders {
1624 reports.push(parse_order_status_report(&order, &instrument, ts_init)?);
1625 }
1626
1627 Self::populate_linked_order_ids(&mut reports);
1628
1629 Ok(reports)
1630 }
1631
1632 pub async fn modify_order(
1643 &self,
1644 instrument_id: InstrumentId,
1645 client_order_id: Option<ClientOrderId>,
1646 venue_order_id: Option<VenueOrderId>,
1647 quantity: Option<Quantity>,
1648 price: Option<Price>,
1649 trigger_price: Option<Price>,
1650 ) -> anyhow::Result<OrderStatusReport> {
1651 let mut params = PutOrderParamsBuilder::default();
1652 params.text(NAUTILUS_TRADER);
1653
1654 if let Some(venue_order_id) = venue_order_id {
1656 params.order_id(venue_order_id.as_str());
1657 } else if let Some(client_order_id) = client_order_id {
1658 params.orig_cl_ord_id(client_order_id.as_str());
1659 } else {
1660 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1661 }
1662
1663 if let Some(quantity) = quantity {
1664 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1665 params.order_qty(quantity_to_u32(&quantity, &instrument));
1666 }
1667
1668 if let Some(price) = price {
1669 params.price(price.as_f64());
1670 }
1671
1672 if let Some(trigger_price) = trigger_price {
1673 params.stop_px(trigger_price.as_f64());
1674 }
1675
1676 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1677
1678 let response = self.inner.http_amend_order(params).await?;
1679
1680 let order: BitmexOrder = serde_json::from_value(response)?;
1681
1682 if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1683 let reason = order
1684 .ord_rej_reason
1685 .map(|r| r.to_string())
1686 .unwrap_or_else(|| "No reason provided".to_string());
1687 anyhow::bail!("Order modification rejected: {reason}");
1688 }
1689
1690 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1691 let ts_init = self.generate_ts_init();
1692
1693 parse_order_status_report(&order, &instrument, ts_init)
1694 }
1695
1696 pub async fn query_order(
1705 &self,
1706 instrument_id: InstrumentId,
1707 client_order_id: Option<ClientOrderId>,
1708 venue_order_id: Option<VenueOrderId>,
1709 ) -> anyhow::Result<Option<OrderStatusReport>> {
1710 let mut params = GetOrderParamsBuilder::default();
1711
1712 let filter_json = if let Some(client_order_id) = client_order_id {
1713 serde_json::json!({
1714 "clOrdID": client_order_id.to_string()
1715 })
1716 } else if let Some(venue_order_id) = venue_order_id {
1717 serde_json::json!({
1718 "orderID": venue_order_id.to_string()
1719 })
1720 } else {
1721 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1722 };
1723
1724 params.filter(filter_json);
1725 params.count(1); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1728
1729 let response = self.inner.http_get_orders(params).await?;
1730
1731 if response.is_empty() {
1732 return Ok(None);
1733 }
1734
1735 let order = &response[0];
1736
1737 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1738 let ts_init = self.generate_ts_init();
1739
1740 let report = parse_order_status_report(order, &instrument, ts_init)?;
1741
1742 Ok(Some(report))
1743 }
1744
1745 pub async fn request_order_status_report(
1754 &self,
1755 instrument_id: InstrumentId,
1756 client_order_id: Option<ClientOrderId>,
1757 venue_order_id: Option<VenueOrderId>,
1758 ) -> anyhow::Result<OrderStatusReport> {
1759 let mut params = GetOrderParamsBuilder::default();
1760 params.symbol(instrument_id.symbol.as_str());
1761
1762 if let Some(venue_order_id) = venue_order_id {
1763 params.filter(serde_json::json!({
1764 "orderID": venue_order_id.as_str()
1765 }));
1766 } else if let Some(client_order_id) = client_order_id {
1767 params.filter(serde_json::json!({
1768 "clOrdID": client_order_id.as_str()
1769 }));
1770 }
1771
1772 params.count(1i32);
1773 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1774
1775 let response = self.inner.http_get_orders(params).await?;
1776
1777 let order = response
1778 .into_iter()
1779 .next()
1780 .ok_or_else(|| anyhow::anyhow!("Order not found"))?;
1781
1782 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1783 let ts_init = self.generate_ts_init();
1784
1785 parse_order_status_report(&order, &instrument, ts_init)
1786 }
1787
1788 pub async fn request_order_status_reports(
1797 &self,
1798 instrument_id: Option<InstrumentId>,
1799 open_only: bool,
1800 limit: Option<u32>,
1801 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1802 let mut params = GetOrderParamsBuilder::default();
1803
1804 if let Some(instrument_id) = &instrument_id {
1805 params.symbol(instrument_id.symbol.as_str());
1806 }
1807
1808 if open_only {
1809 params.filter(serde_json::json!({
1810 "open": true
1811 }));
1812 }
1813
1814 if let Some(limit) = limit {
1815 params.count(limit as i32);
1816 } else {
1817 params.count(500); }
1819
1820 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1823
1824 let response = self.inner.http_get_orders(params).await?;
1825
1826 let ts_init = self.generate_ts_init();
1827
1828 let mut reports = Vec::new();
1829
1830 for order in response {
1831 let Some(symbol) = order.symbol else {
1833 tracing::warn!("Order response missing symbol, skipping");
1834 continue;
1835 };
1836
1837 let instrument = self.instrument_from_cache(symbol)?;
1838
1839 match parse_order_status_report(&order, &instrument, ts_init) {
1840 Ok(report) => reports.push(report),
1841 Err(e) => tracing::error!("Failed to parse order status report: {e}"),
1842 }
1843 }
1844
1845 Self::populate_linked_order_ids(&mut reports);
1846
1847 Ok(reports)
1848 }
1849
1850 pub async fn request_trades(
1856 &self,
1857 instrument_id: InstrumentId,
1858 start: Option<DateTime<Utc>>,
1859 end: Option<DateTime<Utc>>,
1860 limit: Option<u32>,
1861 ) -> anyhow::Result<Vec<TradeTick>> {
1862 let mut params = GetTradeParamsBuilder::default();
1863 params.symbol(instrument_id.symbol.as_str());
1864
1865 if let Some(start) = start {
1866 params.start_time(start);
1867 }
1868
1869 if let Some(end) = end {
1870 params.end_time(end);
1871 }
1872
1873 if let (Some(start), Some(end)) = (start, end) {
1874 anyhow::ensure!(
1875 start < end,
1876 "Invalid time range: start={start:?} end={end:?}",
1877 );
1878 }
1879
1880 if let Some(limit) = limit {
1881 let clamped_limit = limit.min(1000);
1882 if limit > 1000 {
1883 tracing::warn!(
1884 limit,
1885 clamped_limit,
1886 "BitMEX trade request limit exceeds venue maximum; clamping",
1887 );
1888 }
1889 params.count(i32::try_from(clamped_limit).unwrap_or(1000));
1890 }
1891 params.reverse(false);
1892 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1893
1894 let response = self.inner.http_get_trades(params).await?;
1895
1896 let ts_init = self.generate_ts_init();
1897
1898 let mut parsed_trades = Vec::new();
1899
1900 for trade in response {
1901 if let Some(start) = start
1902 && trade.timestamp < start
1903 {
1904 continue;
1905 }
1906
1907 if let Some(end) = end
1908 && trade.timestamp > end
1909 {
1910 continue;
1911 }
1912
1913 let price_precision = self.get_price_precision(trade.symbol)?;
1914
1915 match parse_trade(trade, price_precision, ts_init) {
1916 Ok(trade) => parsed_trades.push(trade),
1917 Err(e) => tracing::error!("Failed to parse trade: {e}"),
1918 }
1919 }
1920
1921 Ok(parsed_trades)
1922 }
1923
1924 pub async fn request_bars(
1931 &self,
1932 mut bar_type: BarType,
1933 start: Option<DateTime<Utc>>,
1934 end: Option<DateTime<Utc>>,
1935 limit: Option<u32>,
1936 partial: bool,
1937 ) -> anyhow::Result<Vec<Bar>> {
1938 bar_type = bar_type.standard();
1939
1940 anyhow::ensure!(
1941 bar_type.aggregation_source() == AggregationSource::External,
1942 "Only EXTERNAL aggregation bars are supported"
1943 );
1944 anyhow::ensure!(
1945 bar_type.spec().price_type == PriceType::Last,
1946 "Only LAST price type bars are supported"
1947 );
1948 if let (Some(start), Some(end)) = (start, end) {
1949 anyhow::ensure!(
1950 start < end,
1951 "Invalid time range: start={start:?} end={end:?}"
1952 );
1953 }
1954
1955 let spec = bar_type.spec();
1956 let bin_size = match (spec.aggregation, spec.step.get()) {
1957 (BarAggregation::Minute, 1) => "1m",
1958 (BarAggregation::Minute, 5) => "5m",
1959 (BarAggregation::Hour, 1) => "1h",
1960 (BarAggregation::Day, 1) => "1d",
1961 _ => anyhow::bail!(
1962 "BitMEX does not support {}-{:?}-{:?} bars",
1963 spec.step.get(),
1964 spec.aggregation,
1965 spec.price_type,
1966 ),
1967 };
1968
1969 let instrument_id = bar_type.instrument_id();
1970 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1971
1972 let mut params = GetTradeBucketedParamsBuilder::default();
1973 params.symbol(instrument_id.symbol.as_str());
1974 params.bin_size(bin_size);
1975 if partial {
1976 params.partial(true);
1977 }
1978 if let Some(start) = start {
1979 params.start_time(start);
1980 }
1981 if let Some(end) = end {
1982 params.end_time(end);
1983 }
1984 if let Some(limit) = limit {
1985 let clamped_limit = limit.min(1000);
1986 if limit > 1000 {
1987 tracing::warn!(
1988 limit,
1989 clamped_limit,
1990 "BitMEX bar request limit exceeds venue maximum; clamping",
1991 );
1992 }
1993 params.count(i32::try_from(clamped_limit).unwrap_or(1000));
1994 }
1995 params.reverse(false);
1996 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1997
1998 let response = self.inner.http_get_trade_bucketed(params).await?;
1999 let ts_init = self.generate_ts_init();
2000 let mut bars = Vec::new();
2001
2002 for bin in response {
2003 if let Some(start) = start
2004 && bin.timestamp < start
2005 {
2006 continue;
2007 }
2008 if let Some(end) = end
2009 && bin.timestamp > end
2010 {
2011 continue;
2012 }
2013 if bin.symbol != instrument_id.symbol.inner() {
2014 tracing::warn!(
2015 symbol = %bin.symbol,
2016 expected = %instrument_id.symbol,
2017 "Skipping trade bin for unexpected symbol",
2018 );
2019 continue;
2020 }
2021
2022 match parse_trade_bin(bin, &instrument, &bar_type, ts_init) {
2023 Ok(bar) => bars.push(bar),
2024 Err(e) => tracing::warn!("Failed to parse trade bin: {e}"),
2025 }
2026 }
2027
2028 Ok(bars)
2029 }
2030
2031 pub async fn request_fill_reports(
2037 &self,
2038 instrument_id: Option<InstrumentId>,
2039 limit: Option<u32>,
2040 ) -> anyhow::Result<Vec<FillReport>> {
2041 let mut params = GetExecutionParamsBuilder::default();
2042 if let Some(instrument_id) = instrument_id {
2043 params.symbol(instrument_id.symbol.as_str());
2044 }
2045 if let Some(limit) = limit {
2046 params.count(limit as i32);
2047 } else {
2048 params.count(500); }
2050 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2053
2054 let response = self.inner.http_get_executions(params).await?;
2055
2056 let ts_init = self.generate_ts_init();
2057
2058 let mut reports = Vec::new();
2059
2060 for exec in response {
2061 let Some(symbol) = exec.symbol else {
2063 tracing::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
2064 continue;
2065 };
2066 let symbol_str = symbol.to_string();
2067
2068 let instrument = match self.instrument_from_cache(symbol) {
2069 Ok(instrument) => instrument,
2070 Err(err) => {
2071 tracing::error!(symbol = %symbol_str, "Instrument not found in cache for execution parsing: {err}");
2072 continue;
2073 }
2074 };
2075
2076 match parse_fill_report(exec, &instrument, ts_init) {
2077 Ok(report) => reports.push(report),
2078 Err(e) => {
2079 let error_msg = e.to_string();
2081 if error_msg.starts_with("Skipping non-trade execution")
2082 || error_msg.starts_with("Skipping execution without order_id")
2083 {
2084 tracing::debug!("{e}");
2085 } else {
2086 tracing::error!("Failed to parse fill report: {e}");
2087 }
2088 }
2089 }
2090 }
2091
2092 Ok(reports)
2093 }
2094
2095 pub async fn request_position_status_reports(
2101 &self,
2102 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2103 let params = GetPositionParamsBuilder::default()
2104 .count(500) .build()
2106 .map_err(|e| anyhow::anyhow!(e))?;
2107
2108 let response = self.inner.http_get_positions(params).await?;
2109
2110 let ts_init = self.generate_ts_init();
2111
2112 let mut reports = Vec::new();
2113
2114 for pos in response {
2115 let symbol = Ustr::from(pos.symbol.as_str());
2116 let instrument = match self.instrument_from_cache(symbol) {
2117 Ok(instrument) => instrument,
2118 Err(err) => {
2119 tracing::error!(
2120 symbol = pos.symbol.as_str(),
2121 "Instrument not found in cache for position parsing: {err}"
2122 );
2123 continue;
2124 }
2125 };
2126
2127 match parse_position_report(pos, &instrument, ts_init) {
2128 Ok(report) => reports.push(report),
2129 Err(e) => tracing::error!("Failed to parse position report: {e}"),
2130 }
2131 }
2132
2133 Ok(reports)
2134 }
2135
2136 pub async fn update_position_leverage(
2144 &self,
2145 symbol: &str,
2146 leverage: f64,
2147 ) -> anyhow::Result<PositionStatusReport> {
2148 let params = PostPositionLeverageParams {
2149 symbol: symbol.to_string(),
2150 leverage,
2151 target_account_id: None,
2152 };
2153
2154 let response = self.inner.http_update_position_leverage(params).await?;
2155
2156 let instrument = self.instrument_from_cache(Ustr::from(symbol))?;
2157 let ts_init = self.generate_ts_init();
2158
2159 parse_position_report(response, &instrument, ts_init)
2160 }
2161}
2162
2163#[cfg(test)]
2168mod tests {
2169 use nautilus_core::UUID4;
2170 use nautilus_model::enums::OrderStatus;
2171 use rstest::rstest;
2172 use serde_json::json;
2173
2174 use super::*;
2175
2176 fn build_report(
2177 client_order_id: &str,
2178 venue_order_id: &str,
2179 contingency_type: ContingencyType,
2180 order_list_id: Option<&str>,
2181 ) -> OrderStatusReport {
2182 let mut report = OrderStatusReport::new(
2183 AccountId::from("BITMEX-1"),
2184 InstrumentId::from("XBTUSD.BITMEX"),
2185 Some(ClientOrderId::from(client_order_id)),
2186 VenueOrderId::from(venue_order_id),
2187 OrderSide::Buy,
2188 OrderType::Limit,
2189 TimeInForce::Gtc,
2190 OrderStatus::Accepted,
2191 Quantity::new(100.0, 0),
2192 Quantity::default(),
2193 UnixNanos::from(1_u64),
2194 UnixNanos::from(1_u64),
2195 UnixNanos::from(1_u64),
2196 Some(UUID4::new()),
2197 );
2198
2199 if let Some(id) = order_list_id {
2200 report = report.with_order_list_id(OrderListId::from(id));
2201 }
2202
2203 report.with_contingency_type(contingency_type)
2204 }
2205
2206 #[rstest]
2207 fn test_sign_request_generates_correct_headers() {
2208 let client = BitmexHttpInnerClient::with_credentials(
2209 "test_api_key".to_string(),
2210 "test_api_secret".to_string(),
2211 "http://localhost:8080".to_string(),
2212 Some(60),
2213 None, None, None, None, None, None, )
2220 .expect("Failed to create test client");
2221
2222 let headers = client
2223 .sign_request(&Method::GET, "/api/v1/order", None)
2224 .unwrap();
2225
2226 assert!(headers.contains_key("api-key"));
2227 assert!(headers.contains_key("api-signature"));
2228 assert!(headers.contains_key("api-expires"));
2229 assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
2230 }
2231
2232 #[rstest]
2233 fn test_sign_request_with_body() {
2234 let client = BitmexHttpInnerClient::with_credentials(
2235 "test_api_key".to_string(),
2236 "test_api_secret".to_string(),
2237 "http://localhost:8080".to_string(),
2238 Some(60),
2239 None, None, None, None, None, None, )
2246 .expect("Failed to create test client");
2247
2248 let body = json!({"symbol": "XBTUSD", "orderQty": 100});
2249 let body_bytes = serde_json::to_vec(&body).unwrap();
2250
2251 let headers_without_body = client
2252 .sign_request(&Method::POST, "/api/v1/order", None)
2253 .unwrap();
2254 let headers_with_body = client
2255 .sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
2256 .unwrap();
2257
2258 assert_ne!(
2260 headers_without_body.get("api-signature").unwrap(),
2261 headers_with_body.get("api-signature").unwrap()
2262 );
2263 }
2264
2265 #[rstest]
2266 fn test_sign_request_uses_custom_recv_window() {
2267 let client_default = BitmexHttpInnerClient::with_credentials(
2268 "test_api_key".to_string(),
2269 "test_api_secret".to_string(),
2270 "http://localhost:8080".to_string(),
2271 Some(60),
2272 None,
2273 None,
2274 None,
2275 None, None, None, )
2279 .expect("Failed to create test client");
2280
2281 let client_custom = BitmexHttpInnerClient::with_credentials(
2282 "test_api_key".to_string(),
2283 "test_api_secret".to_string(),
2284 "http://localhost:8080".to_string(),
2285 Some(60),
2286 None,
2287 None,
2288 None,
2289 Some(30_000), None, None, )
2293 .expect("Failed to create test client");
2294
2295 let headers_default = client_default
2296 .sign_request(&Method::GET, "/api/v1/order", None)
2297 .unwrap();
2298 let headers_custom = client_custom
2299 .sign_request(&Method::GET, "/api/v1/order", None)
2300 .unwrap();
2301
2302 let expires_default: i64 = headers_default.get("api-expires").unwrap().parse().unwrap();
2304 let expires_custom: i64 = headers_custom.get("api-expires").unwrap().parse().unwrap();
2305
2306 let now = Utc::now().timestamp();
2308 assert!(expires_default > now);
2309 assert!(expires_custom > now);
2310
2311 assert!(expires_custom > expires_default);
2313
2314 let diff = expires_custom - expires_default;
2317 assert!((18..=25).contains(&diff));
2318 }
2319
2320 #[rstest]
2321 fn test_populate_linked_order_ids_from_order_list() {
2322 let base = "O-20250922-002219-001-000";
2323 let entry = format!("{base}-1");
2324 let stop = format!("{base}-2");
2325 let take = format!("{base}-3");
2326
2327 let mut reports = vec![
2328 build_report(&entry, "V-1", ContingencyType::Oto, Some("OL-1")),
2329 build_report(&stop, "V-2", ContingencyType::Ouo, Some("OL-1")),
2330 build_report(&take, "V-3", ContingencyType::Ouo, Some("OL-1")),
2331 ];
2332
2333 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2334
2335 assert_eq!(
2336 reports[0].linked_order_ids,
2337 Some(vec![
2338 ClientOrderId::from(stop.as_str()),
2339 ClientOrderId::from(take.as_str()),
2340 ]),
2341 );
2342 assert_eq!(
2343 reports[1].linked_order_ids,
2344 Some(vec![
2345 ClientOrderId::from(entry.as_str()),
2346 ClientOrderId::from(take.as_str()),
2347 ]),
2348 );
2349 assert_eq!(
2350 reports[2].linked_order_ids,
2351 Some(vec![
2352 ClientOrderId::from(entry.as_str()),
2353 ClientOrderId::from(stop.as_str()),
2354 ]),
2355 );
2356 }
2357
2358 #[rstest]
2359 fn test_populate_linked_order_ids_from_id_prefix() {
2360 let base = "O-20250922-002220-001-000";
2361 let entry = format!("{base}-1");
2362 let stop = format!("{base}-2");
2363 let take = format!("{base}-3");
2364
2365 let mut reports = vec![
2366 build_report(&entry, "V-1", ContingencyType::Oto, None),
2367 build_report(&stop, "V-2", ContingencyType::Ouo, None),
2368 build_report(&take, "V-3", ContingencyType::Ouo, None),
2369 ];
2370
2371 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2372
2373 assert_eq!(
2374 reports[0].linked_order_ids,
2375 Some(vec![
2376 ClientOrderId::from(stop.as_str()),
2377 ClientOrderId::from(take.as_str()),
2378 ]),
2379 );
2380 assert_eq!(
2381 reports[1].linked_order_ids,
2382 Some(vec![
2383 ClientOrderId::from(entry.as_str()),
2384 ClientOrderId::from(take.as_str()),
2385 ]),
2386 );
2387 assert_eq!(
2388 reports[2].linked_order_ids,
2389 Some(vec![
2390 ClientOrderId::from(entry.as_str()),
2391 ClientOrderId::from(stop.as_str()),
2392 ]),
2393 );
2394 }
2395
2396 #[rstest]
2397 fn test_populate_linked_order_ids_respects_non_contingent_orders() {
2398 let base = "O-20250922-002221-001-000";
2399 let entry = format!("{base}-1");
2400 let passive = format!("{base}-2");
2401
2402 let mut reports = vec![
2403 build_report(&entry, "V-1", ContingencyType::NoContingency, None),
2404 build_report(&passive, "V-2", ContingencyType::Ouo, None),
2405 ];
2406
2407 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2408
2409 assert!(reports[0].linked_order_ids.is_none());
2411
2412 assert!(reports[1].linked_order_ids.is_none());
2414 assert_eq!(reports[1].contingency_type, ContingencyType::NoContingency);
2415 }
2416}