1use std::{
26 collections::HashMap,
27 num::NonZeroU32,
28 sync::{
29 Arc, LazyLock, RwLock,
30 atomic::{AtomicBool, Ordering},
31 },
32};
33
34use chrono::{DateTime, Utc};
35use dashmap::DashMap;
36use nautilus_core::{
37 UUID4, UnixNanos,
38 consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
39 env::get_or_env_var_opt,
40 time::get_atomic_clock_realtime,
41};
42use nautilus_model::{
43 data::{Bar, BarType, TradeTick},
44 enums::{
45 AccountType, AggregationSource, BarAggregation, ContingencyType, OrderSide, OrderType,
46 PriceType, TimeInForce, TrailingOffsetType, TriggerType,
47 },
48 events::AccountState,
49 identifiers::{AccountId, ClientOrderId, InstrumentId, OrderListId, VenueOrderId},
50 instruments::{Instrument as InstrumentTrait, InstrumentAny},
51 reports::{FillReport, OrderStatusReport, PositionStatusReport},
52 types::{Price, Quantity},
53};
54use nautilus_network::{
55 http::{HttpClient, Method, StatusCode, USER_AGENT},
56 ratelimiter::quota::Quota,
57 retry::{RetryConfig, RetryManager},
58};
59use serde::{Deserialize, Serialize, de::DeserializeOwned};
60use serde_json::Value;
61use tokio_util::sync::CancellationToken;
62use ustr::Ustr;
63
64use super::{
65 error::{BitmexErrorResponse, BitmexHttpError},
66 models::{
67 BitmexApiInfo, BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder,
68 BitmexPosition, BitmexTrade, BitmexTradeBin, BitmexWallet,
69 },
70 query::{
71 DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
72 GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeBucketedParams,
73 GetTradeBucketedParamsBuilder, GetTradeParams, GetTradeParamsBuilder, PostOrderParams,
74 PostPositionLeverageParams, PutOrderParams,
75 },
76};
77use crate::{
78 common::{
79 consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL},
80 credential::Credential,
81 enums::{
82 BitmexContingencyType, BitmexExecInstruction, BitmexOrderStatus, BitmexOrderType,
83 BitmexPegPriceType, BitmexSide, BitmexTimeInForce,
84 },
85 parse::{parse_account_balance, quantity_to_u32},
86 },
87 http::{
88 parse::{
89 InstrumentParseResult, parse_fill_report, parse_instrument_any,
90 parse_order_status_report, parse_position_report, parse_trade, parse_trade_bin,
91 },
92 query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
93 },
94 websocket::messages::BitmexMarginMsg,
95};
96
97const BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 10;
103const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED: u32 = 120;
104const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED: u32 = 30;
105
106const BITMEX_GLOBAL_RATE_KEY: &str = "bitmex:global";
107const BITMEX_MINUTE_RATE_KEY: &str = "bitmex:minute";
108
109static RATE_LIMIT_KEYS: LazyLock<Vec<Ustr>> = LazyLock::new(|| {
110 vec![
111 Ustr::from(BITMEX_GLOBAL_RATE_KEY),
112 Ustr::from(BITMEX_MINUTE_RATE_KEY),
113 ]
114});
115
116#[derive(Debug, Serialize, Deserialize)]
118pub struct BitmexResponse<T> {
119 pub data: Vec<T>,
121}
122
123#[derive(Debug, Clone)]
143pub struct BitmexRawHttpClient {
144 base_url: String,
145 client: HttpClient,
146 credential: Option<Credential>,
147 recv_window_ms: u64,
148 retry_manager: RetryManager<BitmexHttpError>,
149 cancellation_token: Arc<RwLock<CancellationToken>>,
150}
151
152impl Default for BitmexRawHttpClient {
153 fn default() -> Self {
154 Self::new(None, Some(60), None, None, None, None, None, None, None)
155 .expect("Failed to create default BitmexHttpInnerClient")
156 }
157}
158
159impl BitmexRawHttpClient {
160 #[allow(clippy::too_many_arguments)]
170 pub fn new(
171 base_url: Option<String>,
172 timeout_secs: Option<u64>,
173 max_retries: Option<u32>,
174 retry_delay_ms: Option<u64>,
175 retry_delay_max_ms: Option<u64>,
176 recv_window_ms: Option<u64>,
177 max_requests_per_second: Option<u32>,
178 max_requests_per_minute: Option<u32>,
179 proxy_url: Option<String>,
180 ) -> Result<Self, BitmexHttpError> {
181 let retry_config = RetryConfig {
182 max_retries: max_retries.unwrap_or(3),
183 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
184 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
185 backoff_factor: 2.0,
186 jitter_ms: 1000,
187 operation_timeout_ms: Some(60_000),
188 immediate_first: false,
189 max_elapsed_ms: Some(180_000),
190 };
191
192 let retry_manager = RetryManager::new(retry_config);
193
194 let max_req_per_sec =
195 max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
196 let max_req_per_min =
197 max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED);
198
199 Ok(Self {
200 base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
201 client: HttpClient::new(
202 Self::default_headers(),
203 vec![],
204 Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
205 Some(Self::default_quota(max_req_per_sec)),
206 timeout_secs,
207 proxy_url,
208 )
209 .map_err(|e| {
210 BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
211 })?,
212 credential: None,
213 recv_window_ms: recv_window_ms.unwrap_or(10_000),
214 retry_manager,
215 cancellation_token: Arc::new(RwLock::new(CancellationToken::new())),
216 })
217 }
218
219 #[allow(clippy::too_many_arguments)]
226 pub fn with_credentials(
227 api_key: String,
228 api_secret: String,
229 base_url: String,
230 timeout_secs: Option<u64>,
231 max_retries: Option<u32>,
232 retry_delay_ms: Option<u64>,
233 retry_delay_max_ms: Option<u64>,
234 recv_window_ms: Option<u64>,
235 max_requests_per_second: Option<u32>,
236 max_requests_per_minute: Option<u32>,
237 proxy_url: Option<String>,
238 ) -> Result<Self, BitmexHttpError> {
239 let retry_config = RetryConfig {
240 max_retries: max_retries.unwrap_or(3),
241 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
242 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
243 backoff_factor: 2.0,
244 jitter_ms: 1000,
245 operation_timeout_ms: Some(60_000),
246 immediate_first: false,
247 max_elapsed_ms: Some(180_000),
248 };
249
250 let retry_manager = RetryManager::new(retry_config);
251
252 let max_req_per_sec =
253 max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
254 let max_req_per_min =
255 max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED);
256
257 Ok(Self {
258 base_url,
259 client: HttpClient::new(
260 Self::default_headers(),
261 vec![],
262 Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min),
263 Some(Self::default_quota(max_req_per_sec)),
264 timeout_secs,
265 proxy_url,
266 )
267 .map_err(|e| {
268 BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
269 })?,
270 credential: Some(Credential::new(api_key, api_secret)),
271 recv_window_ms: recv_window_ms.unwrap_or(10_000),
272 retry_manager,
273 cancellation_token: Arc::new(RwLock::new(CancellationToken::new())),
274 })
275 }
276
277 fn default_headers() -> HashMap<String, String> {
278 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
279 }
280
281 fn default_quota(max_requests_per_second: u32) -> Quota {
282 Quota::per_second(
283 NonZeroU32::new(max_requests_per_second)
284 .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
285 )
286 }
287
288 fn rate_limiter_quotas(
289 max_requests_per_second: u32,
290 max_requests_per_minute: u32,
291 ) -> Vec<(String, Quota)> {
292 let per_sec_quota = Quota::per_second(
293 NonZeroU32::new(max_requests_per_second)
294 .unwrap_or_else(|| NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()),
295 );
296 let per_min_quota =
297 Quota::per_minute(NonZeroU32::new(max_requests_per_minute).unwrap_or_else(|| {
298 NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED).unwrap()
299 }));
300
301 vec![
302 (BITMEX_GLOBAL_RATE_KEY.to_string(), per_sec_quota),
303 (BITMEX_MINUTE_RATE_KEY.to_string(), per_min_quota),
304 ]
305 }
306
307 fn rate_limit_keys() -> Vec<Ustr> {
308 RATE_LIMIT_KEYS.clone()
309 }
310
311 pub fn cancel_all_requests(&self) {
317 self.cancellation_token
318 .read()
319 .expect("cancellation token lock poisoned")
320 .cancel();
321 }
322
323 pub fn reset_cancellation_token(&self) {
329 *self
330 .cancellation_token
331 .write()
332 .expect("cancellation token lock poisoned") = CancellationToken::new();
333 }
334
335 pub fn cancellation_token(&self) -> CancellationToken {
341 self.cancellation_token
342 .read()
343 .expect("cancellation token lock poisoned")
344 .clone()
345 }
346
347 fn sign_request(
348 &self,
349 method: &Method,
350 endpoint: &str,
351 body: Option<&[u8]>,
352 ) -> Result<HashMap<String, String>, BitmexHttpError> {
353 let credential = self
354 .credential
355 .as_ref()
356 .ok_or(BitmexHttpError::MissingCredentials)?;
357
358 let expires = Utc::now().timestamp() + (self.recv_window_ms / 1000) as i64;
359 let body_str = body.and_then(|b| std::str::from_utf8(b).ok()).unwrap_or("");
360
361 let full_path = if endpoint.starts_with("/api/v1") {
362 endpoint.to_string()
363 } else {
364 format!("/api/v1{endpoint}")
365 };
366
367 let signature = credential.sign(method.as_str(), &full_path, expires, body_str);
368
369 let mut headers = HashMap::new();
370 headers.insert("api-expires".to_string(), expires.to_string());
371 headers.insert("api-key".to_string(), credential.api_key.to_string());
372 headers.insert("api-signature".to_string(), signature);
373
374 if body.is_some()
376 && (*method == Method::POST || *method == Method::PUT || *method == Method::DELETE)
377 {
378 headers.insert(
379 "Content-Type".to_string(),
380 "application/x-www-form-urlencoded".to_string(),
381 );
382 }
383
384 Ok(headers)
385 }
386
387 async fn send_request<T: DeserializeOwned, P: Serialize>(
388 &self,
389 method: Method,
390 endpoint: &str,
391 params: Option<&P>,
392 body: Option<Vec<u8>>,
393 authenticate: bool,
394 ) -> Result<T, BitmexHttpError> {
395 let endpoint = endpoint.to_string();
396 let method_clone = method.clone();
397 let body_clone = body.clone();
398
399 let params_str = if method == Method::GET || method == Method::DELETE {
402 params
403 .map(serde_urlencoded::to_string)
404 .transpose()
405 .map_err(|e| {
406 BitmexHttpError::JsonError(format!("Failed to serialize params: {e}"))
407 })?
408 } else {
409 None
410 };
411
412 let full_endpoint = match params_str {
413 Some(ref query) if !query.is_empty() => format!("{endpoint}?{query}"),
414 _ => endpoint.clone(),
415 };
416
417 let url = format!("{}{}", self.base_url, full_endpoint);
418
419 let operation = || {
420 let url = url.clone();
421 let method = method_clone.clone();
422 let body = body_clone.clone();
423 let full_endpoint = full_endpoint.clone();
424
425 async move {
426 let headers = if authenticate {
427 Some(self.sign_request(&method, &full_endpoint, body.as_deref())?)
428 } else {
429 None
430 };
431
432 let rate_keys = Self::rate_limit_keys();
433 let resp = self
434 .client
435 .request_with_ustr_keys(method, url, None, headers, body, None, Some(rate_keys))
436 .await?;
437
438 if resp.status.is_success() {
439 serde_json::from_slice(&resp.body).map_err(Into::into)
440 } else if let Ok(error_resp) =
441 serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
442 {
443 Err(error_resp.into())
444 } else {
445 Err(BitmexHttpError::UnexpectedStatus {
446 status: StatusCode::from_u16(resp.status.as_u16())
447 .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
448 body: String::from_utf8_lossy(&resp.body).to_string(),
449 })
450 }
451 }
452 };
453
454 let should_retry = |error: &BitmexHttpError| -> bool {
471 match error {
472 BitmexHttpError::NetworkError(_) => true,
473 BitmexHttpError::UnexpectedStatus { status, .. } => {
474 status.as_u16() >= 500 || status.as_u16() == 429
475 }
476 BitmexHttpError::BitmexError {
477 error_name,
478 message,
479 } => {
480 error_name == "RateLimitError"
481 || (error_name == "HTTPError"
482 && message.to_lowercase().contains("rate limit"))
483 }
484 _ => false,
485 }
486 };
487
488 let create_error = |msg: String| -> BitmexHttpError {
489 if msg == "canceled" {
490 BitmexHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
491 } else {
492 BitmexHttpError::NetworkError(msg)
493 }
494 };
495
496 let cancel_token = self.cancellation_token();
497
498 self.retry_manager
499 .execute_with_retry_with_cancel(
500 endpoint.as_str(),
501 operation,
502 should_retry,
503 create_error,
504 &cancel_token,
505 )
506 .await
507 }
508
509 pub async fn get_instruments(
515 &self,
516 active_only: bool,
517 ) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
518 let path = if active_only {
519 "/instrument/active"
520 } else {
521 "/instrument"
522 };
523 self.send_request::<_, ()>(Method::GET, path, None, None, false)
524 .await
525 }
526
527 pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
537 let response: BitmexApiInfo = self
538 .send_request::<_, ()>(Method::GET, "", None, None, false)
539 .await?;
540 Ok(response.timestamp)
541 }
542
543 pub async fn get_instrument(
554 &self,
555 symbol: &str,
556 ) -> Result<Option<BitmexInstrument>, BitmexHttpError> {
557 let path = &format!("/instrument?symbol={symbol}");
558 let instruments: Vec<BitmexInstrument> = self
559 .send_request::<_, ()>(Method::GET, path, None, None, false)
560 .await?;
561
562 Ok(instruments.into_iter().next())
563 }
564
565 pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
571 let endpoint = "/user/wallet";
572 self.send_request::<_, ()>(Method::GET, endpoint, None, None, true)
573 .await
574 }
575
576 pub async fn get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
582 let path = format!("/user/margin?currency={currency}");
583 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
584 .await
585 }
586
587 pub async fn get_all_margins(&self) -> Result<Vec<BitmexMargin>, BitmexHttpError> {
593 self.send_request::<_, ()>(Method::GET, "/user/margin?currency=all", None, None, true)
594 .await
595 }
596
597 pub async fn get_trades(
607 &self,
608 params: GetTradeParams,
609 ) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
610 self.send_request(Method::GET, "/trade", Some(¶ms), None, true)
611 .await
612 }
613
614 pub async fn get_trade_bucketed(
620 &self,
621 params: GetTradeBucketedParams,
622 ) -> Result<Vec<BitmexTradeBin>, BitmexHttpError> {
623 self.send_request(Method::GET, "/trade/bucketed", Some(¶ms), None, true)
624 .await
625 }
626
627 pub async fn get_orders(
637 &self,
638 params: GetOrderParams,
639 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
640 self.send_request(Method::GET, "/order", Some(¶ms), None, true)
641 .await
642 }
643
644 pub async fn place_order(&self, params: PostOrderParams) -> Result<Value, BitmexHttpError> {
654 let body = serde_urlencoded::to_string(¶ms)
656 .map_err(|e| {
657 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
658 })?
659 .into_bytes();
660 let path = "/order";
661 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
662 .await
663 }
664
665 pub async fn cancel_orders(&self, params: DeleteOrderParams) -> Result<Value, BitmexHttpError> {
675 let body = serde_urlencoded::to_string(¶ms)
677 .map_err(|e| {
678 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
679 })?
680 .into_bytes();
681 let path = "/order";
682 self.send_request::<_, ()>(Method::DELETE, path, None, Some(body), true)
683 .await
684 }
685
686 pub async fn amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
696 let body = serde_urlencoded::to_string(¶ms)
698 .map_err(|e| {
699 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
700 })?
701 .into_bytes();
702 let path = "/order";
703 self.send_request::<_, ()>(Method::PUT, path, None, Some(body), true)
704 .await
705 }
706
707 pub async fn cancel_all_orders(
721 &self,
722 params: DeleteAllOrdersParams,
723 ) -> Result<Value, BitmexHttpError> {
724 self.send_request(Method::DELETE, "/order/all", Some(¶ms), None, true)
725 .await
726 }
727
728 pub async fn get_executions(
738 &self,
739 params: GetExecutionParams,
740 ) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
741 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
742 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
743 })?;
744 let path = format!("/execution/tradeHistory?{query}");
745 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
746 .await
747 }
748
749 pub async fn get_positions(
759 &self,
760 params: GetPositionParams,
761 ) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
762 self.send_request(Method::GET, "/position", Some(¶ms), None, true)
763 .await
764 }
765
766 pub async fn update_position_leverage(
776 &self,
777 params: PostPositionLeverageParams,
778 ) -> Result<BitmexPosition, BitmexHttpError> {
779 let body = serde_urlencoded::to_string(¶ms)
781 .map_err(|e| {
782 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
783 })?
784 .into_bytes();
785 let path = "/position/leverage";
786 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
787 .await
788 }
789}
790
791#[derive(Debug)]
796#[cfg_attr(
797 feature = "python",
798 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.bitmex")
799)]
800pub struct BitmexHttpClient {
801 inner: Arc<BitmexRawHttpClient>,
802 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
803 pub(crate) order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
804 cache_initialized: AtomicBool,
805}
806
807impl Clone for BitmexHttpClient {
808 fn clone(&self) -> Self {
809 let cache_initialized = AtomicBool::new(false);
810
811 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
812 if is_initialized {
813 cache_initialized.store(true, Ordering::Release);
814 }
815
816 Self {
817 inner: self.inner.clone(),
818 instruments_cache: self.instruments_cache.clone(),
819 order_type_cache: self.order_type_cache.clone(),
820 cache_initialized,
821 }
822 }
823}
824
825impl Default for BitmexHttpClient {
826 fn default() -> Self {
827 Self::new(
828 None,
829 None,
830 None,
831 false,
832 Some(60),
833 None,
834 None,
835 None,
836 None,
837 None,
838 None,
839 None, )
841 .expect("Failed to create default BitmexHttpClient")
842 }
843}
844
845impl BitmexHttpClient {
846 #[allow(clippy::too_many_arguments)]
852 pub fn new(
853 base_url: Option<String>,
854 api_key: Option<String>,
855 api_secret: Option<String>,
856 testnet: bool,
857 timeout_secs: Option<u64>,
858 max_retries: Option<u32>,
859 retry_delay_ms: Option<u64>,
860 retry_delay_max_ms: Option<u64>,
861 recv_window_ms: Option<u64>,
862 max_requests_per_second: Option<u32>,
863 max_requests_per_minute: Option<u32>,
864 proxy_url: Option<String>,
865 ) -> Result<Self, BitmexHttpError> {
866 let url = base_url.unwrap_or_else(|| {
868 if testnet {
869 BITMEX_HTTP_TESTNET_URL.to_string()
870 } else {
871 BITMEX_HTTP_URL.to_string()
872 }
873 });
874
875 let inner = match (api_key, api_secret) {
876 (Some(key), Some(secret)) => BitmexRawHttpClient::with_credentials(
877 key,
878 secret,
879 url,
880 timeout_secs,
881 max_retries,
882 retry_delay_ms,
883 retry_delay_max_ms,
884 recv_window_ms,
885 max_requests_per_second,
886 max_requests_per_minute,
887 proxy_url,
888 )?,
889 (Some(_), None) | (None, Some(_)) => {
890 return Err(BitmexHttpError::ValidationError(
891 "Both api_key and api_secret must be provided, or neither".to_string(),
892 ));
893 }
894 (None, None) => BitmexRawHttpClient::new(
895 Some(url),
896 timeout_secs,
897 max_retries,
898 retry_delay_ms,
899 retry_delay_max_ms,
900 recv_window_ms,
901 max_requests_per_second,
902 max_requests_per_minute,
903 proxy_url,
904 )?,
905 };
906
907 Ok(Self {
908 inner: Arc::new(inner),
909 instruments_cache: Arc::new(DashMap::new()),
910 order_type_cache: Arc::new(DashMap::new()),
911 cache_initialized: AtomicBool::new(false),
912 })
913 }
914
915 pub fn from_env() -> anyhow::Result<Self> {
922 Self::with_credentials(
923 None, None, None, None, None, None, None, None, None, None, None,
924 )
925 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
926 }
927
928 #[allow(clippy::too_many_arguments)]
938 pub fn with_credentials(
939 api_key: Option<String>,
940 api_secret: Option<String>,
941 base_url: Option<String>,
942 timeout_secs: Option<u64>,
943 max_retries: Option<u32>,
944 retry_delay_ms: Option<u64>,
945 retry_delay_max_ms: Option<u64>,
946 recv_window_ms: Option<u64>,
947 max_requests_per_second: Option<u32>,
948 max_requests_per_minute: Option<u32>,
949 proxy_url: Option<String>,
950 ) -> anyhow::Result<Self> {
951 let testnet = base_url.as_ref().is_some_and(|url| url.contains("testnet"));
953
954 let (key_var, secret_var) = if testnet {
956 ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
957 } else {
958 ("BITMEX_API_KEY", "BITMEX_API_SECRET")
959 };
960
961 let api_key = get_or_env_var_opt(api_key, key_var);
962 let api_secret = get_or_env_var_opt(api_secret, secret_var);
963
964 if api_key.is_some() && api_secret.is_none() {
966 anyhow::bail!("{secret_var} is required when {key_var} is provided");
967 }
968 if api_key.is_none() && api_secret.is_some() {
969 anyhow::bail!("{key_var} is required when {secret_var} is provided");
970 }
971
972 Self::new(
973 base_url,
974 api_key,
975 api_secret,
976 testnet,
977 timeout_secs,
978 max_retries,
979 retry_delay_ms,
980 retry_delay_max_ms,
981 recv_window_ms,
982 max_requests_per_second,
983 max_requests_per_minute,
984 proxy_url,
985 )
986 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
987 }
988
989 #[must_use]
991 pub fn base_url(&self) -> &str {
992 self.inner.base_url.as_str()
993 }
994
995 #[must_use]
997 pub fn api_key(&self) -> Option<&str> {
998 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
999 }
1000
1001 #[must_use]
1003 pub fn api_key_masked(&self) -> Option<String> {
1004 self.inner.credential.as_ref().map(|c| c.api_key_masked())
1005 }
1006
1007 pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
1015 self.inner.get_server_time().await
1016 }
1017
1018 fn generate_ts_init(&self) -> UnixNanos {
1020 get_atomic_clock_realtime().get_time_ns()
1021 }
1022
1023 fn is_contingent_order(contingency_type: ContingencyType) -> bool {
1025 matches!(
1026 contingency_type,
1027 ContingencyType::Oco | ContingencyType::Oto | ContingencyType::Ouo
1028 )
1029 }
1030
1031 fn is_parent_contingency(contingency_type: ContingencyType) -> bool {
1033 matches!(
1034 contingency_type,
1035 ContingencyType::Oco | ContingencyType::Oto
1036 )
1037 }
1038
1039 fn populate_linked_order_ids(reports: &mut [OrderStatusReport]) {
1041 let mut order_list_groups: HashMap<OrderListId, Vec<ClientOrderId>> = HashMap::new();
1042 let mut order_list_parents: HashMap<OrderListId, ClientOrderId> = HashMap::new();
1043 let mut prefix_groups: HashMap<String, Vec<ClientOrderId>> = HashMap::new();
1044 let mut prefix_parents: HashMap<String, ClientOrderId> = HashMap::new();
1045
1046 for report in reports.iter() {
1047 let Some(client_order_id) = report.client_order_id else {
1048 continue;
1049 };
1050
1051 if let Some(order_list_id) = report.order_list_id {
1052 order_list_groups
1053 .entry(order_list_id)
1054 .or_default()
1055 .push(client_order_id);
1056
1057 if Self::is_parent_contingency(report.contingency_type) {
1058 order_list_parents
1059 .entry(order_list_id)
1060 .or_insert(client_order_id);
1061 }
1062 }
1063
1064 if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1065 && Self::is_contingent_order(report.contingency_type)
1066 {
1067 prefix_groups
1068 .entry(base.to_owned())
1069 .or_default()
1070 .push(client_order_id);
1071
1072 if Self::is_parent_contingency(report.contingency_type) {
1073 prefix_parents
1074 .entry(base.to_owned())
1075 .or_insert(client_order_id);
1076 }
1077 }
1078 }
1079
1080 for report in reports.iter_mut() {
1081 let Some(client_order_id) = report.client_order_id else {
1082 continue;
1083 };
1084
1085 if report.linked_order_ids.is_some() {
1086 continue;
1087 }
1088
1089 if !Self::is_contingent_order(report.contingency_type) {
1091 continue;
1092 }
1093
1094 if let Some(order_list_id) = report.order_list_id
1095 && let Some(group) = order_list_groups.get(&order_list_id)
1096 {
1097 let mut linked: Vec<ClientOrderId> = group
1098 .iter()
1099 .copied()
1100 .filter(|candidate| candidate != &client_order_id)
1101 .collect();
1102
1103 if !linked.is_empty() {
1104 if let Some(parent_id) = order_list_parents.get(&order_list_id) {
1105 if client_order_id == *parent_id {
1106 report.parent_order_id = None;
1107 } else {
1108 linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1109 report.parent_order_id = Some(*parent_id);
1110 }
1111 } else {
1112 report.parent_order_id = None;
1113 }
1114
1115 log::trace!(
1116 "BitMEX linked ids sourced from order list id: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}, linked_order_ids={:?}",
1117 client_order_id,
1118 order_list_id,
1119 report.contingency_type,
1120 linked,
1121 );
1122 report.linked_order_ids = Some(linked);
1123 continue;
1124 }
1125
1126 log::trace!(
1127 "BitMEX order list id group had no peers: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}, order_list_group={:?}",
1128 client_order_id,
1129 order_list_id,
1130 report.contingency_type,
1131 group,
1132 );
1133 report.parent_order_id = None;
1134 } else if report.order_list_id.is_none() {
1135 report.parent_order_id = None;
1136 }
1137
1138 if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1139 && let Some(group) = prefix_groups.get(base)
1140 {
1141 let mut linked: Vec<ClientOrderId> = group
1142 .iter()
1143 .copied()
1144 .filter(|candidate| candidate != &client_order_id)
1145 .collect();
1146
1147 if !linked.is_empty() {
1148 if let Some(parent_id) = prefix_parents.get(base) {
1149 if client_order_id == *parent_id {
1150 report.parent_order_id = None;
1151 } else {
1152 linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1153 report.parent_order_id = Some(*parent_id);
1154 }
1155 } else {
1156 report.parent_order_id = None;
1157 }
1158
1159 log::trace!(
1160 "BitMEX linked ids constructed from client order id prefix: client_order_id={:?}, contingency_type={:?}, base={}, linked_order_ids={:?}",
1161 client_order_id,
1162 report.contingency_type,
1163 base,
1164 linked,
1165 );
1166 report.linked_order_ids = Some(linked);
1167 continue;
1168 }
1169
1170 log::trace!(
1171 "BitMEX client order id prefix group had no peers: client_order_id={:?}, contingency_type={:?}, base={}, prefix_group={:?}",
1172 client_order_id,
1173 report.contingency_type,
1174 base,
1175 group,
1176 );
1177 report.parent_order_id = None;
1178 } else if client_order_id.as_str().contains('-') {
1179 report.parent_order_id = None;
1180 }
1181
1182 if Self::is_contingent_order(report.contingency_type) {
1183 log::warn!(
1184 "BitMEX order status report missing linked ids after grouping: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}",
1185 report.client_order_id,
1186 report.order_list_id,
1187 report.contingency_type,
1188 );
1189 report.contingency_type = ContingencyType::NoContingency;
1190 report.parent_order_id = None;
1191 }
1192
1193 report.linked_order_ids = None;
1194 }
1195 }
1196
1197 pub fn cancel_all_requests(&self) {
1199 self.inner.cancel_all_requests();
1200 }
1201
1202 pub fn reset_cancellation_token(&self) {
1204 self.inner.reset_cancellation_token();
1205 }
1206
1207 pub fn cancellation_token(&self) -> CancellationToken {
1209 self.inner.cancellation_token()
1210 }
1211
1212 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1216 self.instruments_cache
1217 .insert(instrument.raw_symbol().inner(), instrument);
1218 self.cache_initialized.store(true, Ordering::Release);
1219 }
1220
1221 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1223 self.instruments_cache
1224 .get(symbol)
1225 .map(|entry| entry.value().clone())
1226 }
1227
1228 pub async fn request_instrument(
1236 &self,
1237 instrument_id: InstrumentId,
1238 ) -> anyhow::Result<Option<InstrumentAny>> {
1239 let response = self
1240 .inner
1241 .get_instrument(instrument_id.symbol.as_str())
1242 .await?;
1243
1244 let instrument = match response {
1245 Some(instrument) => instrument,
1246 None => return Ok(None),
1247 };
1248
1249 let ts_init = self.generate_ts_init();
1250
1251 match parse_instrument_any(&instrument, ts_init) {
1252 InstrumentParseResult::Ok(inst) => Ok(Some(*inst)),
1253 InstrumentParseResult::Unsupported {
1254 symbol,
1255 instrument_type,
1256 } => {
1257 log::debug!(
1258 "Instrument {symbol} has unsupported type {instrument_type:?}, returning None"
1259 );
1260 Ok(None)
1261 }
1262 InstrumentParseResult::Inactive { symbol, state } => {
1263 log::debug!("Instrument {symbol} is inactive (state={state}), returning None");
1264 Ok(None)
1265 }
1266 InstrumentParseResult::Failed {
1267 symbol,
1268 instrument_type,
1269 error,
1270 } => {
1271 log::error!(
1272 "Failed to parse instrument {symbol} (type={instrument_type:?}): {error}"
1273 );
1274 Ok(None)
1275 }
1276 }
1277 }
1278
1279 pub async fn request_instruments(
1285 &self,
1286 active_only: bool,
1287 ) -> anyhow::Result<Vec<InstrumentAny>> {
1288 let instruments = self.inner.get_instruments(active_only).await?;
1289 let ts_init = self.generate_ts_init();
1290
1291 let mut parsed_instruments = Vec::new();
1292 let mut skipped_count = 0;
1293 let mut inactive_count = 0;
1294 let mut failed_count = 0;
1295 let total_count = instruments.len();
1296
1297 for inst in instruments {
1298 match parse_instrument_any(&inst, ts_init) {
1299 InstrumentParseResult::Ok(instrument_any) => {
1300 parsed_instruments.push(*instrument_any);
1301 }
1302 InstrumentParseResult::Unsupported {
1303 symbol,
1304 instrument_type,
1305 } => {
1306 skipped_count += 1;
1307 log::debug!(
1308 "Skipping unsupported instrument type: symbol={symbol}, type={instrument_type:?}"
1309 );
1310 }
1311 InstrumentParseResult::Inactive { symbol, state } => {
1312 inactive_count += 1;
1313 log::debug!("Skipping inactive instrument: symbol={symbol}, state={state}");
1314 }
1315 InstrumentParseResult::Failed {
1316 symbol,
1317 instrument_type,
1318 error,
1319 } => {
1320 failed_count += 1;
1321 log::error!(
1322 "Failed to parse instrument: symbol={symbol}, type={instrument_type:?}, error={error}"
1323 );
1324 }
1325 }
1326 }
1327
1328 if skipped_count > 0 {
1329 log::info!(
1330 "Skipped {skipped_count} unsupported instrument type(s) out of {total_count} total"
1331 );
1332 }
1333
1334 if inactive_count > 0 {
1335 log::info!(
1336 "Skipped {inactive_count} inactive instrument(s) out of {total_count} total"
1337 );
1338 }
1339
1340 if failed_count > 0 {
1341 log::error!(
1342 "Instrument parse failures: {failed_count} failed out of {total_count} total ({} successfully parsed)",
1343 parsed_instruments.len()
1344 );
1345 }
1346
1347 Ok(parsed_instruments)
1348 }
1349
1350 pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
1360 let inner = self.inner.clone();
1361 inner.get_wallet().await
1362 }
1363
1364 pub async fn get_orders(
1374 &self,
1375 params: GetOrderParams,
1376 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
1377 let inner = self.inner.clone();
1378 inner.get_orders(params).await
1379 }
1380
1381 fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1387 self.get_instrument(&symbol).ok_or_else(|| {
1388 anyhow::anyhow!(
1389 "Instrument {symbol} not found in cache, ensure instruments loaded first"
1390 )
1391 })
1392 }
1393
1394 pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
1401 self.instrument_from_cache(symbol)
1402 .map(|instrument| instrument.price_precision())
1403 }
1404
1405 pub async fn get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
1411 self.inner
1412 .get_margin(currency)
1413 .await
1414 .map_err(|e| anyhow::anyhow!(e))
1415 }
1416
1417 pub async fn get_all_margins(&self) -> anyhow::Result<Vec<BitmexMargin>> {
1423 self.inner
1424 .get_all_margins()
1425 .await
1426 .map_err(|e| anyhow::anyhow!(e))
1427 }
1428
1429 pub async fn request_account_state(
1435 &self,
1436 account_id: AccountId,
1437 ) -> anyhow::Result<AccountState> {
1438 let margins = self
1439 .inner
1440 .get_all_margins()
1441 .await
1442 .map_err(|e| anyhow::anyhow!(e))?;
1443
1444 let ts_init =
1445 UnixNanos::from(chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64);
1446
1447 let mut balances = Vec::with_capacity(margins.len());
1448 let mut latest_timestamp: Option<chrono::DateTime<chrono::Utc>> = None;
1449
1450 for margin in margins {
1451 if let Some(ts) = margin.timestamp {
1452 latest_timestamp = Some(latest_timestamp.map_or(ts, |prev| prev.max(ts)));
1453 }
1454
1455 let margin_msg = BitmexMarginMsg {
1456 account: margin.account,
1457 currency: margin.currency,
1458 risk_limit: margin.risk_limit,
1459 amount: margin.amount,
1460 prev_realised_pnl: margin.prev_realised_pnl,
1461 gross_comm: margin.gross_comm,
1462 gross_open_cost: margin.gross_open_cost,
1463 gross_open_premium: margin.gross_open_premium,
1464 gross_exec_cost: margin.gross_exec_cost,
1465 gross_mark_value: margin.gross_mark_value,
1466 risk_value: margin.risk_value,
1467 init_margin: margin.init_margin,
1468 maint_margin: margin.maint_margin,
1469 target_excess_margin: margin.target_excess_margin,
1470 realised_pnl: margin.realised_pnl,
1471 unrealised_pnl: margin.unrealised_pnl,
1472 wallet_balance: margin.wallet_balance,
1473 margin_balance: margin.margin_balance,
1474 margin_leverage: margin.margin_leverage,
1475 margin_used_pcnt: margin.margin_used_pcnt,
1476 excess_margin: margin.excess_margin,
1477 available_margin: margin.available_margin,
1478 withdrawable_margin: margin.withdrawable_margin,
1479 maker_fee_discount: None,
1480 taker_fee_discount: None,
1481 timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
1482 foreign_margin_balance: None,
1483 foreign_requirement: None,
1484 };
1485
1486 balances.push(parse_account_balance(&margin_msg));
1487 }
1488
1489 if balances.is_empty() {
1490 anyhow::bail!("No margin data returned from BitMEX");
1491 }
1492
1493 let account_type = AccountType::Margin;
1494 let margins_vec = Vec::new();
1495 let is_reported = true;
1496 let event_id = UUID4::new();
1497
1498 let ts_event = latest_timestamp.map_or(ts_init, |ts| {
1500 UnixNanos::from(ts.timestamp_nanos_opt().unwrap_or_default() as u64)
1501 });
1502
1503 Ok(AccountState::new(
1504 account_id,
1505 account_type,
1506 balances,
1507 margins_vec,
1508 is_reported,
1509 event_id,
1510 ts_event,
1511 ts_init,
1512 None,
1513 ))
1514 }
1515
1516 #[allow(clippy::too_many_arguments)]
1523 pub async fn submit_order(
1524 &self,
1525 instrument_id: InstrumentId,
1526 client_order_id: ClientOrderId,
1527 order_side: OrderSide,
1528 order_type: OrderType,
1529 quantity: Quantity,
1530 time_in_force: TimeInForce,
1531 price: Option<Price>,
1532 trigger_price: Option<Price>,
1533 trigger_type: Option<TriggerType>,
1534 trailing_offset: Option<f64>,
1535 trailing_offset_type: Option<TrailingOffsetType>,
1536 display_qty: Option<Quantity>,
1537 post_only: bool,
1538 reduce_only: bool,
1539 order_list_id: Option<OrderListId>,
1540 contingency_type: Option<ContingencyType>,
1541 peg_price_type: Option<BitmexPegPriceType>,
1542 peg_offset_value: Option<f64>,
1543 ) -> anyhow::Result<OrderStatusReport> {
1544 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1545
1546 let mut params = super::query::PostOrderParamsBuilder::default();
1547 params.text(NAUTILUS_TRADER);
1548 params.symbol(instrument_id.symbol.as_str());
1549 params.cl_ord_id(client_order_id.as_str());
1550
1551 if order_side == OrderSide::NoOrderSide {
1552 anyhow::bail!("Order side must be Buy or Sell");
1553 }
1554 let side = BitmexSide::from(order_side.as_specified());
1555 params.side(side);
1556
1557 let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
1558 params.ord_type(ord_type);
1559
1560 params.order_qty(quantity_to_u32(&quantity, &instrument));
1561
1562 let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
1563 params.time_in_force(tif);
1564
1565 if let Some(price) = price {
1566 params.price(price.as_f64());
1567 }
1568
1569 if let Some(trigger_price) = trigger_price {
1570 params.stop_px(trigger_price.as_f64());
1571 }
1572
1573 if let Some(display_qty) = display_qty {
1574 params.display_qty(quantity_to_u32(&display_qty, &instrument));
1575 }
1576
1577 if let Some(order_list_id) = order_list_id {
1578 params.cl_ord_link_id(order_list_id.as_str());
1579 }
1580
1581 let is_trailing_stop = matches!(
1582 order_type,
1583 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
1584 );
1585
1586 if is_trailing_stop && let Some(offset) = trailing_offset {
1587 if let Some(offset_type) = trailing_offset_type
1588 && offset_type != TrailingOffsetType::Price
1589 {
1590 anyhow::bail!(
1591 "BitMEX only supports PRICE trailing offset type, was {offset_type:?}"
1592 );
1593 }
1594
1595 params.peg_price_type(BitmexPegPriceType::TrailingStopPeg);
1596
1597 let signed_offset = match order_side {
1599 OrderSide::Sell => -offset.abs(),
1600 OrderSide::Buy => offset.abs(),
1601 _ => offset,
1602 };
1603 params.peg_offset_value(signed_offset);
1604 }
1605
1606 if peg_price_type.is_none() && peg_offset_value.is_some() {
1608 anyhow::bail!("`peg_offset_value` requires `peg_price_type`");
1609 }
1610 if let Some(peg_type) = peg_price_type {
1611 if order_type != OrderType::Limit {
1612 anyhow::bail!(
1613 "Pegged orders only supported for LIMIT order type, was {order_type:?}"
1614 );
1615 }
1616 params.ord_type(BitmexOrderType::Pegged);
1617 params.peg_price_type(peg_type);
1618 if let Some(offset) = peg_offset_value {
1619 params.peg_offset_value(offset);
1620 }
1621 }
1622
1623 let mut exec_inst = Vec::new();
1624
1625 if post_only {
1626 exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
1627 }
1628
1629 if reduce_only {
1630 exec_inst.push(BitmexExecInstruction::ReduceOnly);
1631 }
1632
1633 if (trigger_price.is_some() || is_trailing_stop)
1635 && let Some(trigger_type) = trigger_type
1636 {
1637 match trigger_type {
1638 TriggerType::LastPrice => exec_inst.push(BitmexExecInstruction::LastPrice),
1639 TriggerType::MarkPrice => exec_inst.push(BitmexExecInstruction::MarkPrice),
1640 TriggerType::IndexPrice => exec_inst.push(BitmexExecInstruction::IndexPrice),
1641 _ => {} }
1643 }
1644
1645 if !exec_inst.is_empty() {
1646 params.exec_inst(exec_inst);
1647 }
1648
1649 if let Some(contingency_type) = contingency_type {
1650 let bitmex_contingency = BitmexContingencyType::try_from(contingency_type)?;
1651 params.contingency_type(bitmex_contingency);
1652 }
1653
1654 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1655
1656 let response = self.inner.place_order(params).await?;
1657
1658 let order: BitmexOrder = serde_json::from_value(response)?;
1659
1660 if order.ord_status == Some(BitmexOrderStatus::Rejected) {
1661 let reason = order
1662 .ord_rej_reason
1663 .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1664 anyhow::bail!("Order rejected: {reason}");
1665 }
1666
1667 self.order_type_cache.insert(client_order_id, order_type);
1669
1670 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1671 let ts_init = self.generate_ts_init();
1672
1673 parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1674 }
1675
1676 pub async fn cancel_order(
1686 &self,
1687 instrument_id: InstrumentId,
1688 client_order_id: Option<ClientOrderId>,
1689 venue_order_id: Option<VenueOrderId>,
1690 ) -> anyhow::Result<OrderStatusReport> {
1691 let mut params = super::query::DeleteOrderParamsBuilder::default();
1692 params.text(NAUTILUS_TRADER);
1693
1694 if let Some(venue_order_id) = venue_order_id {
1695 params.order_id(vec![venue_order_id.as_str().to_string()]);
1696 } else if let Some(client_order_id) = client_order_id {
1697 params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
1698 } else {
1699 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1700 }
1701
1702 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1703
1704 let response = self.inner.cancel_orders(params).await?;
1705
1706 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1707 let order = orders
1708 .into_iter()
1709 .next()
1710 .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1711
1712 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1713 let ts_init = self.generate_ts_init();
1714
1715 parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1716 }
1717
1718 pub async fn cancel_orders(
1728 &self,
1729 instrument_id: InstrumentId,
1730 client_order_ids: Option<Vec<ClientOrderId>>,
1731 venue_order_ids: Option<Vec<VenueOrderId>>,
1732 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1733 let mut params = super::query::DeleteOrderParamsBuilder::default();
1734 params.text(NAUTILUS_TRADER);
1735
1736 if let Some(venue_order_ids) = venue_order_ids {
1739 if venue_order_ids.is_empty() {
1740 anyhow::bail!("venue_order_ids cannot be empty");
1741 }
1742 params.order_id(
1743 venue_order_ids
1744 .iter()
1745 .map(|id| id.to_string())
1746 .collect::<Vec<_>>(),
1747 );
1748 } else if let Some(client_order_ids) = client_order_ids {
1749 if client_order_ids.is_empty() {
1750 anyhow::bail!("client_order_ids cannot be empty");
1751 }
1752 params.cl_ord_id(
1753 client_order_ids
1754 .iter()
1755 .map(|id| id.to_string())
1756 .collect::<Vec<_>>(),
1757 );
1758 } else {
1759 anyhow::bail!("Either client_order_ids or venue_order_ids must be provided");
1760 }
1761
1762 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1763
1764 let response = self.inner.cancel_orders(params).await?;
1765
1766 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1767
1768 let ts_init = self.generate_ts_init();
1769 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1770
1771 let mut reports = Vec::new();
1772
1773 for order in orders {
1774 reports.push(parse_order_status_report(
1775 &order,
1776 &instrument,
1777 &self.order_type_cache,
1778 ts_init,
1779 )?);
1780 }
1781
1782 Self::populate_linked_order_ids(&mut reports);
1783
1784 Ok(reports)
1785 }
1786
1787 pub async fn cancel_all_orders(
1797 &self,
1798 instrument_id: InstrumentId,
1799 order_side: Option<OrderSide>,
1800 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1801 let mut params = DeleteAllOrdersParamsBuilder::default();
1802 params.text(NAUTILUS_TRADER);
1803 params.symbol(instrument_id.symbol.as_str());
1804
1805 if let Some(side) = order_side {
1806 if side == OrderSide::NoOrderSide {
1807 log::debug!("Ignoring NoOrderSide filter for cancel_all_orders on {instrument_id}",);
1808 } else {
1809 let side = BitmexSide::from(side.as_specified());
1810 params.filter(serde_json::json!({
1811 "side": side
1812 }));
1813 }
1814 }
1815
1816 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1817
1818 let response = self.inner.cancel_all_orders(params).await?;
1819
1820 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1821
1822 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1823 let ts_init = self.generate_ts_init();
1824
1825 let mut reports = Vec::new();
1826
1827 for order in orders {
1828 reports.push(parse_order_status_report(
1829 &order,
1830 &instrument,
1831 &self.order_type_cache,
1832 ts_init,
1833 )?);
1834 }
1835
1836 Self::populate_linked_order_ids(&mut reports);
1837
1838 Ok(reports)
1839 }
1840
1841 pub async fn modify_order(
1852 &self,
1853 instrument_id: InstrumentId,
1854 client_order_id: Option<ClientOrderId>,
1855 venue_order_id: Option<VenueOrderId>,
1856 quantity: Option<Quantity>,
1857 price: Option<Price>,
1858 trigger_price: Option<Price>,
1859 ) -> anyhow::Result<OrderStatusReport> {
1860 let mut params = PutOrderParamsBuilder::default();
1861 params.text(NAUTILUS_TRADER);
1862
1863 if let Some(venue_order_id) = venue_order_id {
1865 params.order_id(venue_order_id.as_str());
1866 } else if let Some(client_order_id) = client_order_id {
1867 params.orig_cl_ord_id(client_order_id.as_str());
1868 } else {
1869 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1870 }
1871
1872 if let Some(quantity) = quantity {
1873 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1874 params.order_qty(quantity_to_u32(&quantity, &instrument));
1875 }
1876
1877 if let Some(price) = price {
1878 params.price(price.as_f64());
1879 }
1880
1881 if let Some(trigger_price) = trigger_price {
1882 params.stop_px(trigger_price.as_f64());
1883 }
1884
1885 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1886
1887 let response = self.inner.amend_order(params).await?;
1888
1889 let order: BitmexOrder = serde_json::from_value(response)?;
1890
1891 if order.ord_status == Some(BitmexOrderStatus::Rejected) {
1892 let reason = order
1893 .ord_rej_reason
1894 .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1895 anyhow::bail!("Order modification rejected: {reason}");
1896 }
1897
1898 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1899 let ts_init = self.generate_ts_init();
1900
1901 parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1902 }
1903
1904 pub async fn query_order(
1913 &self,
1914 instrument_id: InstrumentId,
1915 client_order_id: Option<ClientOrderId>,
1916 venue_order_id: Option<VenueOrderId>,
1917 ) -> anyhow::Result<Option<OrderStatusReport>> {
1918 let mut params = GetOrderParamsBuilder::default();
1919
1920 let filter_json = if let Some(client_order_id) = client_order_id {
1921 serde_json::json!({
1922 "clOrdID": client_order_id.to_string()
1923 })
1924 } else if let Some(venue_order_id) = venue_order_id {
1925 serde_json::json!({
1926 "orderID": venue_order_id.to_string()
1927 })
1928 } else {
1929 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1930 };
1931
1932 params.filter(filter_json);
1933 params.count(1); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1936
1937 let response = self.inner.get_orders(params).await?;
1938
1939 if response.is_empty() {
1940 return Ok(None);
1941 }
1942
1943 let order = &response[0];
1944
1945 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1946 let ts_init = self.generate_ts_init();
1947
1948 let report =
1949 parse_order_status_report(order, &instrument, &self.order_type_cache, ts_init)?;
1950
1951 Ok(Some(report))
1952 }
1953
1954 pub async fn request_order_status_report(
1963 &self,
1964 instrument_id: InstrumentId,
1965 client_order_id: Option<ClientOrderId>,
1966 venue_order_id: Option<VenueOrderId>,
1967 ) -> anyhow::Result<OrderStatusReport> {
1968 if venue_order_id.is_none() && client_order_id.is_none() {
1969 anyhow::bail!("Either venue_order_id or client_order_id must be provided");
1970 }
1971
1972 let mut params = GetOrderParamsBuilder::default();
1973 params.symbol(instrument_id.symbol.as_str());
1974
1975 if let Some(venue_order_id) = venue_order_id {
1976 params.filter(serde_json::json!({
1977 "orderID": venue_order_id.as_str()
1978 }));
1979 } else if let Some(client_order_id) = client_order_id {
1980 params.filter(serde_json::json!({
1981 "clOrdID": client_order_id.as_str()
1982 }));
1983 }
1984
1985 params.count(1i32);
1986 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1987
1988 let response = self.inner.get_orders(params).await?;
1989
1990 let order = response
1991 .into_iter()
1992 .next()
1993 .ok_or_else(|| anyhow::anyhow!("Order not found"))?;
1994
1995 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1996 let ts_init = self.generate_ts_init();
1997
1998 parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1999 }
2000
2001 pub async fn request_order_status_reports(
2010 &self,
2011 instrument_id: Option<InstrumentId>,
2012 open_only: bool,
2013 start: Option<DateTime<Utc>>,
2014 end: Option<DateTime<Utc>>,
2015 limit: Option<u32>,
2016 ) -> anyhow::Result<Vec<OrderStatusReport>> {
2017 if let (Some(start), Some(end)) = (start, end) {
2018 anyhow::ensure!(
2019 start < end,
2020 "Invalid time range: start={start:?} end={end:?}",
2021 );
2022 }
2023
2024 let mut params = GetOrderParamsBuilder::default();
2025
2026 if let Some(instrument_id) = &instrument_id {
2027 params.symbol(instrument_id.symbol.as_str());
2028 }
2029
2030 if open_only {
2031 params.filter(serde_json::json!({
2032 "open": true
2033 }));
2034 }
2035
2036 if let Some(start) = start {
2037 params.start_time(start);
2038 }
2039
2040 if let Some(end) = end {
2041 params.end_time(end);
2042 }
2043
2044 if let Some(limit) = limit {
2045 params.count(limit as i32);
2046 } else {
2047 params.count(500); }
2049
2050 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2053
2054 let response = self.inner.get_orders(params).await?;
2055
2056 let ts_init = self.generate_ts_init();
2057
2058 let mut reports = Vec::new();
2059
2060 for order in response {
2061 if let Some(start) = start {
2062 match order.timestamp {
2063 Some(timestamp) if timestamp < start => continue,
2064 Some(_) => {}
2065 None => {
2066 log::debug!("Skipping order report without timestamp for bounded query");
2067 continue;
2068 }
2069 }
2070 }
2071
2072 if let Some(end) = end {
2073 match order.timestamp {
2074 Some(timestamp) if timestamp > end => continue,
2075 Some(_) => {}
2076 None => {
2077 log::debug!("Skipping order report without timestamp for bounded query");
2078 continue;
2079 }
2080 }
2081 }
2082
2083 let Some(symbol) = order.symbol else {
2085 log::warn!("Order response missing symbol, skipping");
2086 continue;
2087 };
2088
2089 let Ok(instrument) = self.instrument_from_cache(symbol) else {
2090 log::debug!("Skipping order report for instrument not in cache: symbol={symbol}");
2091 continue;
2092 };
2093
2094 match parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init) {
2095 Ok(report) => reports.push(report),
2096 Err(e) => log::error!("Failed to parse order status report: {e}"),
2097 }
2098 }
2099
2100 Self::populate_linked_order_ids(&mut reports);
2101
2102 Ok(reports)
2103 }
2104
2105 pub async fn request_trades(
2111 &self,
2112 instrument_id: InstrumentId,
2113 start: Option<DateTime<Utc>>,
2114 end: Option<DateTime<Utc>>,
2115 limit: Option<u32>,
2116 ) -> anyhow::Result<Vec<TradeTick>> {
2117 let mut params = GetTradeParamsBuilder::default();
2118 params.symbol(instrument_id.symbol.as_str());
2119
2120 if let Some(start) = start {
2121 params.start_time(start);
2122 }
2123
2124 if let Some(end) = end {
2125 params.end_time(end);
2126 }
2127
2128 if let (Some(start), Some(end)) = (start, end) {
2129 anyhow::ensure!(
2130 start < end,
2131 "Invalid time range: start={start:?} end={end:?}",
2132 );
2133 }
2134
2135 if let Some(limit) = limit {
2136 let clamped_limit = limit.min(1000);
2137 if limit > 1000 {
2138 log::warn!(
2139 "BitMEX trade request limit exceeds venue maximum; clamping: limit={limit}, clamped_limit={clamped_limit}",
2140 );
2141 }
2142 params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2143 }
2144 params.reverse(false);
2145 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2146
2147 let response = self.inner.get_trades(params).await?;
2148
2149 let ts_init = self.generate_ts_init();
2150
2151 let mut parsed_trades = Vec::new();
2152
2153 for trade in response {
2154 if let Some(start) = start
2155 && trade.timestamp < start
2156 {
2157 continue;
2158 }
2159
2160 if let Some(end) = end
2161 && trade.timestamp > end
2162 {
2163 continue;
2164 }
2165
2166 let Some(instrument) = self.get_instrument(&trade.symbol) else {
2167 log::error!(
2168 "Instrument {} not found in cache, skipping trade",
2169 trade.symbol
2170 );
2171 continue;
2172 };
2173
2174 match parse_trade(trade, &instrument, ts_init) {
2175 Ok(trade) => parsed_trades.push(trade),
2176 Err(e) => log::error!("Failed to parse trade: {e}"),
2177 }
2178 }
2179
2180 Ok(parsed_trades)
2181 }
2182
2183 pub async fn request_bars(
2190 &self,
2191 mut bar_type: BarType,
2192 start: Option<DateTime<Utc>>,
2193 end: Option<DateTime<Utc>>,
2194 limit: Option<u32>,
2195 partial: bool,
2196 ) -> anyhow::Result<Vec<Bar>> {
2197 bar_type = bar_type.standard();
2198
2199 anyhow::ensure!(
2200 bar_type.aggregation_source() == AggregationSource::External,
2201 "Only EXTERNAL aggregation bars are supported"
2202 );
2203 anyhow::ensure!(
2204 bar_type.spec().price_type == PriceType::Last,
2205 "Only LAST price type bars are supported"
2206 );
2207 if let (Some(start), Some(end)) = (start, end) {
2208 anyhow::ensure!(
2209 start < end,
2210 "Invalid time range: start={start:?} end={end:?}"
2211 );
2212 }
2213
2214 let spec = bar_type.spec();
2215 let bin_size = match (spec.aggregation, spec.step.get()) {
2216 (BarAggregation::Minute, 1) => "1m",
2217 (BarAggregation::Minute, 5) => "5m",
2218 (BarAggregation::Hour, 1) => "1h",
2219 (BarAggregation::Day, 1) => "1d",
2220 _ => anyhow::bail!(
2221 "BitMEX does not support {}-{:?}-{:?} bars",
2222 spec.step.get(),
2223 spec.aggregation,
2224 spec.price_type,
2225 ),
2226 };
2227
2228 let instrument_id = bar_type.instrument_id();
2229 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2230
2231 let mut params = GetTradeBucketedParamsBuilder::default();
2232 params.symbol(instrument_id.symbol.as_str());
2233 params.bin_size(bin_size);
2234 if partial {
2235 params.partial(true);
2236 }
2237 if let Some(start) = start {
2238 params.start_time(start);
2239 }
2240 if let Some(end) = end {
2241 params.end_time(end);
2242 }
2243 if let Some(limit) = limit {
2244 let clamped_limit = limit.min(1000);
2245 if limit > 1000 {
2246 log::warn!(
2247 "BitMEX bar request limit exceeds venue maximum; clamping: limit={limit}, clamped_limit={clamped_limit}",
2248 );
2249 }
2250 params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2251 }
2252 params.reverse(false);
2253 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2254
2255 let response = self.inner.get_trade_bucketed(params).await?;
2256 let ts_init = self.generate_ts_init();
2257 let mut bars = Vec::new();
2258
2259 for bin in response {
2260 if let Some(start) = start
2261 && bin.timestamp < start
2262 {
2263 continue;
2264 }
2265 if let Some(end) = end
2266 && bin.timestamp > end
2267 {
2268 continue;
2269 }
2270 if bin.symbol != instrument_id.symbol.inner() {
2271 log::warn!(
2272 "Skipping trade bin for unexpected symbol: symbol={}, expected={}",
2273 bin.symbol,
2274 instrument_id.symbol,
2275 );
2276 continue;
2277 }
2278
2279 match parse_trade_bin(bin, &instrument, &bar_type, ts_init) {
2280 Ok(bar) => bars.push(bar),
2281 Err(e) => log::warn!("Failed to parse trade bin: {e}"),
2282 }
2283 }
2284
2285 Ok(bars)
2286 }
2287
2288 pub async fn request_fill_reports(
2294 &self,
2295 instrument_id: Option<InstrumentId>,
2296 start: Option<DateTime<Utc>>,
2297 end: Option<DateTime<Utc>>,
2298 limit: Option<u32>,
2299 ) -> anyhow::Result<Vec<FillReport>> {
2300 if let (Some(start), Some(end)) = (start, end) {
2301 anyhow::ensure!(
2302 start < end,
2303 "Invalid time range: start={start:?} end={end:?}",
2304 );
2305 }
2306
2307 let mut params = GetExecutionParamsBuilder::default();
2308 if let Some(instrument_id) = instrument_id {
2309 params.symbol(instrument_id.symbol.as_str());
2310 }
2311 if let Some(start) = start {
2312 params.start_time(start);
2313 }
2314 if let Some(end) = end {
2315 params.end_time(end);
2316 }
2317 if let Some(limit) = limit {
2318 params.count(limit as i32);
2319 } else {
2320 params.count(500); }
2322 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2325
2326 let response = self.inner.get_executions(params).await?;
2327
2328 let ts_init = self.generate_ts_init();
2329
2330 let mut reports = Vec::new();
2331
2332 for exec in response {
2333 if let Some(start) = start {
2334 match exec.transact_time {
2335 Some(timestamp) if timestamp < start => continue,
2336 Some(_) => {}
2337 None => {
2338 log::debug!("Skipping fill report without transact_time for bounded query");
2339 continue;
2340 }
2341 }
2342 }
2343
2344 if let Some(end) = end {
2345 match exec.transact_time {
2346 Some(timestamp) if timestamp > end => continue,
2347 Some(_) => {}
2348 None => {
2349 log::debug!("Skipping fill report without transact_time for bounded query");
2350 continue;
2351 }
2352 }
2353 }
2354
2355 let Some(symbol) = exec.symbol else {
2357 log::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
2358 continue;
2359 };
2360 let symbol_str = symbol.to_string();
2361
2362 let instrument = match self.instrument_from_cache(symbol) {
2363 Ok(instrument) => instrument,
2364 Err(e) => {
2365 log::error!(
2366 "Instrument not found in cache for execution parsing: symbol={symbol_str}, {e}"
2367 );
2368 continue;
2369 }
2370 };
2371
2372 match parse_fill_report(exec, &instrument, ts_init) {
2373 Ok(report) => reports.push(report),
2374 Err(e) => {
2375 let error_msg = e.to_string();
2377 if error_msg.starts_with("Skipping non-trade execution")
2378 || error_msg.starts_with("Skipping execution without order_id")
2379 {
2380 log::debug!("{e}");
2381 } else {
2382 log::error!("Failed to parse fill report: {e}");
2383 }
2384 }
2385 }
2386 }
2387
2388 Ok(reports)
2389 }
2390
2391 pub async fn request_position_status_reports(
2397 &self,
2398 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2399 let params = GetPositionParamsBuilder::default()
2400 .count(500) .build()
2402 .map_err(|e| anyhow::anyhow!(e))?;
2403
2404 let response = self.inner.get_positions(params).await?;
2405
2406 let ts_init = self.generate_ts_init();
2407
2408 let mut reports = Vec::new();
2409
2410 for pos in response {
2411 let symbol = pos.symbol;
2412 let instrument = match self.instrument_from_cache(symbol) {
2413 Ok(instrument) => instrument,
2414 Err(e) => {
2415 log::error!(
2416 "Instrument not found in cache for position parsing: symbol={}, {e}",
2417 pos.symbol.as_str(),
2418 );
2419 continue;
2420 }
2421 };
2422
2423 match parse_position_report(pos, &instrument, ts_init) {
2424 Ok(report) => reports.push(report),
2425 Err(e) => log::error!("Failed to parse position report: {e}"),
2426 }
2427 }
2428
2429 Ok(reports)
2430 }
2431
2432 pub async fn update_position_leverage(
2440 &self,
2441 symbol: &str,
2442 leverage: f64,
2443 ) -> anyhow::Result<PositionStatusReport> {
2444 let params = PostPositionLeverageParams {
2445 symbol: symbol.to_string(),
2446 leverage,
2447 target_account_id: None,
2448 };
2449
2450 let response = self.inner.update_position_leverage(params).await?;
2451
2452 let instrument = self.instrument_from_cache(Ustr::from(symbol))?;
2453 let ts_init = self.generate_ts_init();
2454
2455 parse_position_report(response, &instrument, ts_init)
2456 }
2457}
2458
2459#[cfg(test)]
2460mod tests {
2461 use nautilus_core::UUID4;
2462 use nautilus_model::enums::OrderStatus;
2463 use rstest::rstest;
2464 use serde_json::json;
2465
2466 use super::*;
2467
2468 fn build_report(
2469 client_order_id: &str,
2470 venue_order_id: &str,
2471 contingency_type: ContingencyType,
2472 order_list_id: Option<&str>,
2473 ) -> OrderStatusReport {
2474 let mut report = OrderStatusReport::new(
2475 AccountId::from("BITMEX-1"),
2476 InstrumentId::from("XBTUSD.BITMEX"),
2477 Some(ClientOrderId::from(client_order_id)),
2478 VenueOrderId::from(venue_order_id),
2479 OrderSide::Buy,
2480 OrderType::Limit,
2481 TimeInForce::Gtc,
2482 OrderStatus::Accepted,
2483 Quantity::new(100.0, 0),
2484 Quantity::default(),
2485 UnixNanos::from(1_u64),
2486 UnixNanos::from(1_u64),
2487 UnixNanos::from(1_u64),
2488 Some(UUID4::new()),
2489 );
2490
2491 if let Some(id) = order_list_id {
2492 report = report.with_order_list_id(OrderListId::from(id));
2493 }
2494
2495 report.with_contingency_type(contingency_type)
2496 }
2497
2498 #[rstest]
2499 fn test_sign_request_generates_correct_headers() {
2500 let client = BitmexRawHttpClient::with_credentials(
2501 "test_api_key".to_string(),
2502 "test_api_secret".to_string(),
2503 "http://localhost:8080".to_string(),
2504 Some(60),
2505 None, None, None, None, None, None, None, )
2513 .expect("Failed to create test client");
2514
2515 let headers = client
2516 .sign_request(&Method::GET, "/api/v1/order", None)
2517 .unwrap();
2518
2519 assert!(headers.contains_key("api-key"));
2520 assert!(headers.contains_key("api-signature"));
2521 assert!(headers.contains_key("api-expires"));
2522 assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
2523 }
2524
2525 #[rstest]
2526 fn test_sign_request_with_body() {
2527 let client = BitmexRawHttpClient::with_credentials(
2528 "test_api_key".to_string(),
2529 "test_api_secret".to_string(),
2530 "http://localhost:8080".to_string(),
2531 Some(60),
2532 None, None, None, None, None, None, None, )
2540 .expect("Failed to create test client");
2541
2542 let body = json!({"symbol": "XBTUSD", "orderQty": 100});
2543 let body_bytes = serde_json::to_vec(&body).unwrap();
2544
2545 let headers_without_body = client
2546 .sign_request(&Method::POST, "/api/v1/order", None)
2547 .unwrap();
2548 let headers_with_body = client
2549 .sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
2550 .unwrap();
2551
2552 assert_ne!(
2554 headers_without_body.get("api-signature").unwrap(),
2555 headers_with_body.get("api-signature").unwrap()
2556 );
2557 }
2558
2559 #[rstest]
2560 fn test_sign_request_uses_custom_recv_window() {
2561 let client_default = BitmexRawHttpClient::with_credentials(
2562 "test_api_key".to_string(),
2563 "test_api_secret".to_string(),
2564 "http://localhost:8080".to_string(),
2565 Some(60),
2566 None,
2567 None,
2568 None,
2569 None, None, None, None, )
2574 .expect("Failed to create test client");
2575
2576 let client_custom = BitmexRawHttpClient::with_credentials(
2577 "test_api_key".to_string(),
2578 "test_api_secret".to_string(),
2579 "http://localhost:8080".to_string(),
2580 Some(60),
2581 None,
2582 None,
2583 None,
2584 Some(30_000), None, None, None, )
2589 .expect("Failed to create test client");
2590
2591 let headers_default = client_default
2592 .sign_request(&Method::GET, "/api/v1/order", None)
2593 .unwrap();
2594 let headers_custom = client_custom
2595 .sign_request(&Method::GET, "/api/v1/order", None)
2596 .unwrap();
2597
2598 let expires_default: i64 = headers_default.get("api-expires").unwrap().parse().unwrap();
2600 let expires_custom: i64 = headers_custom.get("api-expires").unwrap().parse().unwrap();
2601
2602 let now = Utc::now().timestamp();
2604 assert!(expires_default > now);
2605 assert!(expires_custom > now);
2606
2607 assert!(expires_custom > expires_default);
2609
2610 let diff = expires_custom - expires_default;
2613 assert!((18..=25).contains(&diff));
2614 }
2615
2616 #[rstest]
2617 fn test_populate_linked_order_ids_from_order_list() {
2618 let base = "O-20250922-002219-001-000";
2619 let entry = format!("{base}-1");
2620 let stop = format!("{base}-2");
2621 let take = format!("{base}-3");
2622
2623 let mut reports = vec![
2624 build_report(&entry, "V-1", ContingencyType::Oto, Some("OL-1")),
2625 build_report(&stop, "V-2", ContingencyType::Ouo, Some("OL-1")),
2626 build_report(&take, "V-3", ContingencyType::Ouo, Some("OL-1")),
2627 ];
2628
2629 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2630
2631 assert_eq!(
2632 reports[0].linked_order_ids,
2633 Some(vec![
2634 ClientOrderId::from(stop.as_str()),
2635 ClientOrderId::from(take.as_str()),
2636 ]),
2637 );
2638 assert_eq!(
2639 reports[1].linked_order_ids,
2640 Some(vec![
2641 ClientOrderId::from(entry.as_str()),
2642 ClientOrderId::from(take.as_str()),
2643 ]),
2644 );
2645 assert_eq!(
2646 reports[2].linked_order_ids,
2647 Some(vec![
2648 ClientOrderId::from(entry.as_str()),
2649 ClientOrderId::from(stop.as_str()),
2650 ]),
2651 );
2652 }
2653
2654 #[rstest]
2655 fn test_populate_linked_order_ids_from_id_prefix() {
2656 let base = "O-20250922-002220-001-000";
2657 let entry = format!("{base}-1");
2658 let stop = format!("{base}-2");
2659 let take = format!("{base}-3");
2660
2661 let mut reports = vec![
2662 build_report(&entry, "V-1", ContingencyType::Oto, None),
2663 build_report(&stop, "V-2", ContingencyType::Ouo, None),
2664 build_report(&take, "V-3", ContingencyType::Ouo, None),
2665 ];
2666
2667 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2668
2669 assert_eq!(
2670 reports[0].linked_order_ids,
2671 Some(vec![
2672 ClientOrderId::from(stop.as_str()),
2673 ClientOrderId::from(take.as_str()),
2674 ]),
2675 );
2676 assert_eq!(
2677 reports[1].linked_order_ids,
2678 Some(vec![
2679 ClientOrderId::from(entry.as_str()),
2680 ClientOrderId::from(take.as_str()),
2681 ]),
2682 );
2683 assert_eq!(
2684 reports[2].linked_order_ids,
2685 Some(vec![
2686 ClientOrderId::from(entry.as_str()),
2687 ClientOrderId::from(stop.as_str()),
2688 ]),
2689 );
2690 }
2691
2692 #[rstest]
2693 fn test_populate_linked_order_ids_respects_non_contingent_orders() {
2694 let base = "O-20250922-002221-001-000";
2695 let entry = format!("{base}-1");
2696 let passive = format!("{base}-2");
2697
2698 let mut reports = vec![
2699 build_report(&entry, "V-1", ContingencyType::NoContingency, None),
2700 build_report(&passive, "V-2", ContingencyType::Ouo, None),
2701 ];
2702
2703 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2704
2705 assert!(reports[0].linked_order_ids.is_none());
2707
2708 assert!(reports[1].linked_order_ids.is_none());
2710 assert_eq!(reports[1].contingency_type, ContingencyType::NoContingency);
2711 }
2712}