1use std::{
26 collections::HashMap,
27 num::NonZeroU32,
28 sync::{Arc, LazyLock, Mutex},
29};
30
31use ahash::AHashMap;
32use chrono::Utc;
33use nautilus_core::{
34 UnixNanos,
35 consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
36 env::get_env_var,
37 time::get_atomic_clock_realtime,
38};
39use nautilus_model::{
40 data::TradeTick,
41 enums::{OrderSide, OrderType, TimeInForce},
42 events::AccountState,
43 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
44 instruments::{Instrument as InstrumentTrait, InstrumentAny},
45 reports::{FillReport, OrderStatusReport, PositionStatusReport},
46 types::{Price, Quantity},
47};
48use nautilus_network::{
49 http::HttpClient,
50 ratelimiter::quota::Quota,
51 retry::{RetryConfig, RetryManager},
52};
53use reqwest::{Method, StatusCode, header::USER_AGENT};
54use serde::{Deserialize, Serialize, de::DeserializeOwned};
55use serde_json::Value;
56use tokio_util::sync::CancellationToken;
57use ustr::Ustr;
58
59use super::{
60 error::{BitmexErrorResponse, BitmexHttpError},
61 models::{
62 BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder, BitmexPosition, BitmexTrade,
63 BitmexWallet,
64 },
65 query::{
66 DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
67 GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeParams,
68 GetTradeParamsBuilder, PostOrderBulkParams, PostOrderParams, PostPositionLeverageParams,
69 PutOrderBulkParams, PutOrderParams,
70 },
71};
72use crate::{
73 common::{
74 consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL},
75 credential::Credential,
76 enums::{BitmexOrderStatus, BitmexSide},
77 parse::{parse_account_state, parse_instrument_id, quantity_to_u32},
78 },
79 http::{
80 parse::{
81 parse_fill_report, parse_instrument_any, parse_order_status_report,
82 parse_position_report, parse_trade,
83 },
84 query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
85 },
86 websocket::messages::BitmexMarginMsg,
87};
88
89pub static BITMEX_REST_QUOTA: LazyLock<Quota> =
98 LazyLock::new(|| Quota::per_second(NonZeroU32::new(10).expect("10 is a valid non-zero u32")));
99
100#[derive(Debug, Serialize, Deserialize)]
102pub struct BitmexResponse<T> {
103 pub data: Vec<T>,
105}
106
107#[derive(Debug, Clone)]
127pub struct BitmexHttpInnerClient {
128 base_url: String,
129 client: HttpClient,
130 credential: Option<Credential>,
131 retry_manager: RetryManager<BitmexHttpError>,
132 cancellation_token: CancellationToken,
133}
134
135impl Default for BitmexHttpInnerClient {
136 fn default() -> Self {
137 Self::new(None, Some(60), None, None, None)
138 .expect("Failed to create default BitmexHttpInnerClient")
139 }
140}
141
142impl BitmexHttpInnerClient {
143 pub fn cancel_all_requests(&self) {
145 self.cancellation_token.cancel();
146 }
147
148 pub fn cancellation_token(&self) -> &CancellationToken {
150 &self.cancellation_token
151 }
152 #[allow(clippy::too_many_arguments)]
162 pub fn new(
163 base_url: Option<String>,
164 timeout_secs: Option<u64>,
165 max_retries: Option<u32>,
166 retry_delay_ms: Option<u64>,
167 retry_delay_max_ms: Option<u64>,
168 ) -> Result<Self, BitmexHttpError> {
169 let retry_config = RetryConfig {
170 max_retries: max_retries.unwrap_or(3),
171 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
172 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
173 backoff_factor: 2.0,
174 jitter_ms: 1000,
175 operation_timeout_ms: Some(60_000),
176 immediate_first: false,
177 max_elapsed_ms: Some(180_000),
178 };
179
180 let retry_manager = RetryManager::new(retry_config).map_err(|e| {
181 BitmexHttpError::NetworkError(format!("Failed to create retry manager: {e}"))
182 })?;
183
184 Ok(Self {
185 base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
186 client: HttpClient::new(
187 Self::default_headers(),
188 vec![],
189 vec![],
190 Some(*BITMEX_REST_QUOTA),
191 timeout_secs,
192 ),
193 credential: None,
194 retry_manager,
195 cancellation_token: CancellationToken::new(),
196 })
197 }
198
199 #[allow(clippy::too_many_arguments)]
206 pub fn with_credentials(
207 api_key: String,
208 api_secret: String,
209 base_url: String,
210 timeout_secs: Option<u64>,
211 max_retries: Option<u32>,
212 retry_delay_ms: Option<u64>,
213 retry_delay_max_ms: Option<u64>,
214 ) -> Result<Self, BitmexHttpError> {
215 let retry_config = RetryConfig {
216 max_retries: max_retries.unwrap_or(3),
217 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
218 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
219 backoff_factor: 2.0,
220 jitter_ms: 1000,
221 operation_timeout_ms: Some(60_000),
222 immediate_first: false,
223 max_elapsed_ms: Some(180_000),
224 };
225
226 let retry_manager = RetryManager::new(retry_config).map_err(|e| {
227 BitmexHttpError::NetworkError(format!("Failed to create retry manager: {e}"))
228 })?;
229
230 Ok(Self {
231 base_url,
232 client: HttpClient::new(
233 Self::default_headers(),
234 vec![],
235 vec![],
236 Some(*BITMEX_REST_QUOTA),
237 timeout_secs,
238 ),
239 credential: Some(Credential::new(api_key, api_secret)),
240 retry_manager,
241 cancellation_token: CancellationToken::new(),
242 })
243 }
244
245 fn default_headers() -> HashMap<String, String> {
246 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
247 }
248
249 fn sign_request(
250 &self,
251 method: &Method,
252 endpoint: &str,
253 body: Option<&[u8]>,
254 ) -> Result<HashMap<String, String>, BitmexHttpError> {
255 let credential = self
256 .credential
257 .as_ref()
258 .ok_or(BitmexHttpError::MissingCredentials)?;
259
260 let expires = Utc::now().timestamp() + 10;
261 let body_str = body
262 .and_then(|b| String::from_utf8(b.to_vec()).ok())
263 .unwrap_or_default();
264
265 let full_path = if endpoint.starts_with("/api/v1") {
266 endpoint.to_string()
267 } else {
268 format!("/api/v1{endpoint}")
269 };
270
271 let signature = credential.sign(method.as_str(), &full_path, expires, &body_str);
272
273 let mut headers = HashMap::new();
274 headers.insert("api-expires".to_string(), expires.to_string());
275 headers.insert("api-key".to_string(), credential.api_key.to_string());
276 headers.insert("api-signature".to_string(), signature);
277
278 Ok(headers)
279 }
280
281 async fn send_request<T: DeserializeOwned>(
282 &self,
283 method: Method,
284 endpoint: &str,
285 body: Option<Vec<u8>>,
286 authenticate: bool,
287 ) -> Result<T, BitmexHttpError> {
288 let url = format!("{}{endpoint}", self.base_url);
289 let method_clone = method.clone();
290 let body_clone = body.clone();
291
292 let operation = || {
293 let url = url.clone();
294 let method = method_clone.clone();
295 let body = body_clone.clone();
296 let endpoint = endpoint.to_string();
297
298 async move {
299 let headers = if authenticate {
300 Some(self.sign_request(&method, &endpoint, body.as_deref())?)
301 } else {
302 None
303 };
304
305 let resp = self
306 .client
307 .request(method, url, headers, body, None, None)
308 .await?;
309
310 if resp.status.is_success() {
311 serde_json::from_slice(&resp.body).map_err(Into::into)
312 } else if let Ok(error_resp) =
313 serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
314 {
315 Err(error_resp.into())
316 } else {
317 Err(BitmexHttpError::UnexpectedStatus {
318 status: StatusCode::from_u16(resp.status.as_u16())
319 .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
320 body: String::from_utf8_lossy(&resp.body).to_string(),
321 })
322 }
323 }
324 };
325
326 let should_retry = |error: &BitmexHttpError| -> bool {
343 match error {
344 BitmexHttpError::NetworkError(_) => true,
345 BitmexHttpError::UnexpectedStatus { status, .. } => {
346 status.as_u16() >= 500 || status.as_u16() == 429
347 }
348 BitmexHttpError::BitmexError {
349 error_name,
350 message,
351 } => {
352 error_name == "RateLimitError"
353 || (error_name == "HTTPError"
354 && message.to_lowercase().contains("rate limit"))
355 }
356 _ => false,
357 }
358 };
359
360 let create_error = |msg: String| -> BitmexHttpError {
361 if msg == "canceled" {
362 BitmexHttpError::NetworkError("Request canceled".to_string())
363 } else {
364 BitmexHttpError::NetworkError(msg)
365 }
366 };
367
368 self.retry_manager
369 .execute_with_retry_with_cancel(
370 endpoint,
371 operation,
372 should_retry,
373 create_error,
374 &self.cancellation_token,
375 )
376 .await
377 }
378
379 pub async fn http_get_instruments(
385 &self,
386 active_only: bool,
387 ) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
388 let path = if active_only {
389 "/instrument/active"
390 } else {
391 "/instrument"
392 };
393 self.send_request(Method::GET, path, None, false).await
394 }
395
396 pub async fn http_get_instrument(
402 &self,
403 symbol: &str,
404 ) -> Result<BitmexInstrument, BitmexHttpError> {
405 let path = &format!("/instrument?symbol={symbol}");
406 self.send_request(Method::GET, path, None, false).await
407 }
408
409 pub async fn http_get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
415 let endpoint = "/user/wallet";
416 self.send_request(Method::GET, endpoint, None, true).await
417 }
418
419 pub async fn http_get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
425 let path = format!("/user/margin?currency={currency}");
426 self.send_request(Method::GET, &path, None, true).await
427 }
428
429 pub async fn http_get_trades(
439 &self,
440 params: GetTradeParams,
441 ) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
442 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
443 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
444 })?;
445 let path = format!("/trade?{query}");
446 self.send_request(Method::GET, &path, None, true).await
447 }
448
449 pub async fn http_get_orders(
459 &self,
460 params: GetOrderParams,
461 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
462 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
463 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
464 })?;
465 let path = format!("/order?{query}");
466 self.send_request(Method::GET, &path, None, true).await
467 }
468
469 pub async fn http_place_order(
479 &self,
480 params: PostOrderParams,
481 ) -> Result<Value, BitmexHttpError> {
482 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
483 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
484 })?;
485 let path = format!("/order?{query}");
486 self.send_request(Method::POST, &path, None, true).await
487 }
488
489 pub async fn http_cancel_orders(
499 &self,
500 params: DeleteOrderParams,
501 ) -> Result<Value, BitmexHttpError> {
502 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
503 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
504 })?;
505 let path = format!("/order?{query}");
506 self.send_request(Method::DELETE, &path, None, true).await
507 }
508
509 pub async fn http_amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
519 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
520 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
521 })?;
522 let path = format!("/order?{query}");
523 self.send_request(Method::PUT, &path, None, true).await
524 }
525
526 pub async fn http_cancel_all_orders(
540 &self,
541 params: DeleteAllOrdersParams,
542 ) -> Result<Value, BitmexHttpError> {
543 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
544 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
545 })?;
546 let path = format!("/order/all?{query}");
547 self.send_request(Method::DELETE, &path, None, true).await
548 }
549
550 pub async fn http_get_executions(
560 &self,
561 params: GetExecutionParams,
562 ) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
563 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
564 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
565 })?;
566 let path = format!("/execution/tradeHistory?{query}");
567 self.send_request(Method::GET, &path, None, true).await
568 }
569
570 pub async fn http_get_positions(
580 &self,
581 params: GetPositionParams,
582 ) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
583 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
584 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
585 })?;
586 let path = format!("/position?{query}");
587 self.send_request(Method::GET, &path, None, true).await
588 }
589
590 pub async fn http_place_orders_bulk(
600 &self,
601 params: PostOrderBulkParams,
602 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
603 let body = serde_json::to_vec(¶ms).map_err(|e| {
604 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
605 })?;
606 let path = "/order/bulk";
607 self.send_request(Method::POST, path, Some(body), true)
608 .await
609 }
610
611 pub async fn http_amend_orders_bulk(
621 &self,
622 params: PutOrderBulkParams,
623 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
624 let body = serde_json::to_vec(¶ms).map_err(|e| {
625 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
626 })?;
627 let path = "/order/bulk";
628 self.send_request(Method::PUT, path, Some(body), true).await
629 }
630
631 pub async fn http_update_position_leverage(
641 &self,
642 params: PostPositionLeverageParams,
643 ) -> Result<BitmexPosition, BitmexHttpError> {
644 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
645 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
646 })?;
647 let path = format!("/position/leverage?{query}");
648 self.send_request(Method::POST, &path, None, true).await
649 }
650}
651
652#[derive(Clone, Debug)]
657#[cfg_attr(
658 feature = "python",
659 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
660)]
661pub struct BitmexHttpClient {
662 inner: Arc<BitmexHttpInnerClient>,
663 instruments_cache: Arc<Mutex<AHashMap<Ustr, InstrumentAny>>>,
664}
665
666impl Default for BitmexHttpClient {
667 fn default() -> Self {
668 Self::new(None, None, None, false, Some(60), None, None, None)
669 .expect("Failed to create default BitmexHttpClient")
670 }
671}
672
673impl BitmexHttpClient {
674 #[allow(clippy::too_many_arguments)]
680 pub fn new(
681 base_url: Option<String>,
682 api_key: Option<String>,
683 api_secret: Option<String>,
684 testnet: bool,
685 timeout_secs: Option<u64>,
686 max_retries: Option<u32>,
687 retry_delay_ms: Option<u64>,
688 retry_delay_max_ms: Option<u64>,
689 ) -> Result<Self, BitmexHttpError> {
690 let url = base_url.unwrap_or_else(|| {
692 if testnet {
693 BITMEX_HTTP_TESTNET_URL.to_string()
694 } else {
695 BITMEX_HTTP_URL.to_string()
696 }
697 });
698
699 let inner = match (api_key, api_secret) {
700 (Some(key), Some(secret)) => BitmexHttpInnerClient::with_credentials(
701 key,
702 secret,
703 url,
704 timeout_secs,
705 max_retries,
706 retry_delay_ms,
707 retry_delay_max_ms,
708 )?,
709 _ => BitmexHttpInnerClient::new(
710 Some(url),
711 timeout_secs,
712 max_retries,
713 retry_delay_ms,
714 retry_delay_max_ms,
715 )?,
716 };
717
718 Ok(Self {
719 inner: Arc::new(inner),
720 instruments_cache: Arc::new(Mutex::new(AHashMap::new())),
721 })
722 }
723
724 pub fn from_env() -> anyhow::Result<Self> {
731 Self::with_credentials(None, None, None, None, None, None, None)
732 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
733 }
734
735 #[allow(clippy::too_many_arguments)]
745 pub fn with_credentials(
746 api_key: Option<String>,
747 api_secret: Option<String>,
748 base_url: Option<String>,
749 timeout_secs: Option<u64>,
750 max_retries: Option<u32>,
751 retry_delay_ms: Option<u64>,
752 retry_delay_max_ms: Option<u64>,
753 ) -> anyhow::Result<Self> {
754 let api_key = api_key.or_else(|| get_env_var("BITMEX_API_KEY").ok());
755 let api_secret = api_secret.or_else(|| get_env_var("BITMEX_API_SECRET").ok());
756
757 let testnet = base_url.as_ref().is_some_and(|url| url.contains("testnet"));
759
760 if api_key.is_some() && api_secret.is_none() {
762 anyhow::bail!("BITMEX_API_SECRET is required when BITMEX_API_KEY is provided");
763 }
764 if api_key.is_none() && api_secret.is_some() {
765 anyhow::bail!("BITMEX_API_KEY is required when BITMEX_API_SECRET is provided");
766 }
767
768 Self::new(
769 base_url,
770 api_key,
771 api_secret,
772 testnet,
773 timeout_secs,
774 max_retries,
775 retry_delay_ms,
776 retry_delay_max_ms,
777 )
778 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
779 }
780
781 #[must_use]
783 pub fn base_url(&self) -> &str {
784 self.inner.base_url.as_str()
785 }
786
787 #[must_use]
789 pub fn api_key(&self) -> Option<&str> {
790 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
791 }
792
793 fn generate_ts_init(&self) -> UnixNanos {
795 get_atomic_clock_realtime().get_time_ns()
796 }
797
798 pub fn cancel_all_requests(&self) {
800 self.inner.cancel_all_requests();
801 }
802
803 pub fn cancellation_token(&self) -> CancellationToken {
805 self.inner.cancellation_token().clone()
806 }
807
808 pub fn add_instrument(&mut self, instrument: InstrumentAny) {
814 self.instruments_cache
815 .lock()
816 .unwrap()
817 .insert(instrument.raw_symbol().inner(), instrument);
818 }
819
820 pub async fn request_instrument(
826 &self,
827 instrument_id: InstrumentId,
828 ) -> anyhow::Result<Option<InstrumentAny>> {
829 let response = self
830 .inner
831 .http_get_instrument(instrument_id.symbol.as_str())
832 .await?;
833
834 let ts_init = self.generate_ts_init();
835
836 Ok(parse_instrument_any(&response, ts_init))
837 }
838
839 pub async fn request_instruments(
845 &self,
846 active_only: bool,
847 ) -> anyhow::Result<Vec<InstrumentAny>> {
848 let instruments = self.inner.http_get_instruments(active_only).await?;
849 let ts_init = self.generate_ts_init();
850
851 let mut parsed_instruments = Vec::new();
852 for inst in instruments {
853 if let Some(instrument_any) = parse_instrument_any(&inst, ts_init) {
854 parsed_instruments.push(instrument_any);
855 }
856 }
857
858 Ok(parsed_instruments)
859 }
860
861 pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
871 let inner = self.inner.clone();
872 inner.http_get_wallet().await
873 }
874
875 pub async fn get_orders(
885 &self,
886 params: GetOrderParams,
887 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
888 let inner = self.inner.clone();
889 inner.http_get_orders(params).await
890 }
891
892 pub async fn http_place_order(
902 &self,
903 params: PostOrderParams,
904 ) -> Result<Value, BitmexHttpError> {
905 let inner = self.inner.clone();
906 inner.http_place_order(params).await
907 }
908
909 pub async fn http_cancel_orders(
919 &self,
920 params: DeleteOrderParams,
921 ) -> Result<Value, BitmexHttpError> {
922 let inner = self.inner.clone();
923 inner.http_cancel_orders(params).await
924 }
925
926 pub async fn http_amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
936 let inner = self.inner.clone();
937 inner.http_amend_order(params).await
938 }
939
940 pub async fn http_cancel_all_orders(
954 &self,
955 params: DeleteAllOrdersParams,
956 ) -> Result<Value, BitmexHttpError> {
957 let inner = self.inner.clone();
958 inner.http_cancel_all_orders(params).await
959 }
960
961 pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
971 let cache = self.instruments_cache.lock().unwrap();
972 cache
973 .get(&symbol)
974 .map(|inst| inst.price_precision())
975 .ok_or_else(|| {
976 anyhow::anyhow!(
977 "Instrument {symbol} not found in cache, ensure instruments loaded first"
978 )
979 })
980 }
981
982 pub async fn http_get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
988 self.inner
989 .http_get_margin(currency)
990 .await
991 .map_err(|e| anyhow::anyhow!(e))
992 }
993
994 pub async fn request_account_state(
1000 &self,
1001 account_id: AccountId,
1002 ) -> anyhow::Result<AccountState> {
1003 let margin = self
1005 .inner
1006 .http_get_margin("XBt")
1007 .await
1008 .map_err(|e| anyhow::anyhow!(e))?;
1009
1010 let ts_init = nautilus_core::nanos::UnixNanos::from(
1011 chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64,
1012 );
1013
1014 let margin_msg = BitmexMarginMsg {
1016 account: margin.account,
1017 currency: margin.currency,
1018 risk_limit: margin.risk_limit,
1019 amount: margin.amount,
1020 prev_realised_pnl: margin.prev_realised_pnl,
1021 gross_comm: margin.gross_comm,
1022 gross_open_cost: margin.gross_open_cost,
1023 gross_open_premium: margin.gross_open_premium,
1024 gross_exec_cost: margin.gross_exec_cost,
1025 gross_mark_value: margin.gross_mark_value,
1026 risk_value: margin.risk_value,
1027 init_margin: margin.init_margin,
1028 maint_margin: margin.maint_margin,
1029 target_excess_margin: margin.target_excess_margin,
1030 realised_pnl: margin.realised_pnl,
1031 unrealised_pnl: margin.unrealised_pnl,
1032 wallet_balance: margin.wallet_balance,
1033 margin_balance: margin.margin_balance,
1034 margin_leverage: margin.margin_leverage,
1035 margin_used_pcnt: margin.margin_used_pcnt,
1036 excess_margin: margin.excess_margin,
1037 available_margin: margin.available_margin,
1038 withdrawable_margin: margin.withdrawable_margin,
1039 maker_fee_discount: None, taker_fee_discount: None, timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
1042 foreign_margin_balance: None,
1043 foreign_requirement: None,
1044 };
1045
1046 parse_account_state(&margin_msg, account_id, ts_init)
1047 }
1048
1049 #[allow(clippy::too_many_arguments)]
1056 pub async fn submit_order(
1057 &self,
1058 instrument_id: InstrumentId,
1059 client_order_id: ClientOrderId,
1060 order_side: OrderSide,
1061 order_type: OrderType,
1062 quantity: Quantity,
1063 time_in_force: TimeInForce,
1064 price: Option<Price>,
1065 trigger_price: Option<Price>,
1066 display_qty: Option<Quantity>,
1067 post_only: bool,
1068 reduce_only: bool,
1069 ) -> anyhow::Result<OrderStatusReport> {
1070 use crate::common::enums::{
1071 BitmexExecInstruction, BitmexOrderType, BitmexSide, BitmexTimeInForce,
1072 };
1073
1074 let mut params = super::query::PostOrderParamsBuilder::default();
1075 params.text(NAUTILUS_TRADER);
1076 params.symbol(instrument_id.symbol.as_str());
1077 params.cl_ord_id(client_order_id.as_str());
1078
1079 let side = BitmexSide::try_from_order_side(order_side)?;
1080 params.side(side);
1081
1082 let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
1083 params.ord_type(ord_type);
1084
1085 params.order_qty(quantity_to_u32(&quantity));
1086
1087 let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
1088 params.time_in_force(tif);
1089
1090 if let Some(price) = price {
1091 params.price(price.as_f64());
1092 }
1093
1094 if let Some(trigger_price) = trigger_price {
1095 params.stop_px(trigger_price.as_f64());
1096 }
1097
1098 if let Some(display_qty) = display_qty {
1099 params.display_qty(quantity_to_u32(&display_qty));
1100 }
1101
1102 let mut exec_inst = Vec::new();
1103
1104 if post_only {
1105 exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
1106 }
1107
1108 if reduce_only {
1109 exec_inst.push(BitmexExecInstruction::ReduceOnly);
1110 }
1111
1112 if !exec_inst.is_empty() {
1113 params.exec_inst(exec_inst);
1114 }
1115
1116 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1117
1118 let response = self.inner.http_place_order(params).await?;
1119
1120 let order: BitmexOrder = serde_json::from_value(response)?;
1121
1122 if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1123 let reason = order
1124 .ord_rej_reason
1125 .map(|r| r.to_string())
1126 .unwrap_or_else(|| "No reason provided".to_string());
1127 return Err(anyhow::anyhow!("Order rejected: {reason}"));
1128 }
1129
1130 let price_precision = self.get_price_precision(instrument_id.symbol.inner())?;
1131 let ts_init = self.generate_ts_init();
1132
1133 parse_order_status_report(&order, instrument_id, price_precision, ts_init)
1134 }
1135
1136 pub async fn cancel_order(
1146 &self,
1147 instrument_id: InstrumentId,
1148 client_order_id: Option<ClientOrderId>,
1149 venue_order_id: Option<VenueOrderId>,
1150 ) -> anyhow::Result<OrderStatusReport> {
1151 let mut params = super::query::DeleteOrderParamsBuilder::default();
1152 params.text(NAUTILUS_TRADER);
1153
1154 if let Some(venue_order_id) = venue_order_id {
1155 params.order_id(vec![venue_order_id.as_str().to_string()]);
1156 } else if let Some(client_order_id) = client_order_id {
1157 params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
1158 } else {
1159 return Err(anyhow::anyhow!(
1160 "Either client_order_id or venue_order_id must be provided"
1161 ));
1162 }
1163
1164 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1165
1166 let response = self.inner.http_cancel_orders(params).await?;
1167
1168 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1169 let order = orders
1170 .into_iter()
1171 .next()
1172 .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1173
1174 let price_precision = self.get_price_precision(instrument_id.symbol.inner())?;
1175 let ts_init = self.generate_ts_init();
1176
1177 parse_order_status_report(&order, instrument_id, price_precision, ts_init)
1178 }
1179
1180 pub async fn cancel_orders(
1190 &self,
1191 instrument_id: InstrumentId,
1192 client_order_ids: Option<Vec<ClientOrderId>>,
1193 venue_order_ids: Option<Vec<VenueOrderId>>,
1194 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1195 let mut params = super::query::DeleteOrderParamsBuilder::default();
1196 params.text(NAUTILUS_TRADER);
1197
1198 if let Some(client_order_ids) = client_order_ids {
1199 params.cl_ord_id(
1200 client_order_ids
1201 .iter()
1202 .map(|id| id.to_string())
1203 .collect::<Vec<_>>(),
1204 );
1205 }
1206
1207 if let Some(venue_order_ids) = venue_order_ids {
1208 params.order_id(
1209 venue_order_ids
1210 .iter()
1211 .map(|id| id.to_string())
1212 .collect::<Vec<_>>(),
1213 );
1214 }
1215
1216 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1217
1218 let response = self.inner.http_cancel_orders(params).await?;
1219
1220 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1221
1222 let ts_init = self.generate_ts_init();
1223
1224 let mut reports = Vec::new();
1225
1226 for order in orders {
1227 let price_precision = self.get_price_precision(instrument_id.symbol.inner())?;
1228
1229 reports.push(parse_order_status_report(
1230 &order,
1231 instrument_id,
1232 price_precision,
1233 ts_init,
1234 )?);
1235 }
1236
1237 Ok(reports)
1238 }
1239
1240 pub async fn cancel_all_orders(
1250 &self,
1251 instrument_id: InstrumentId,
1252 order_side: Option<OrderSide>,
1253 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1254 let mut params = DeleteAllOrdersParamsBuilder::default();
1255 params.text(NAUTILUS_TRADER);
1256 params.symbol(instrument_id.symbol.as_str());
1257
1258 if let Some(side) = order_side {
1259 let side = BitmexSide::try_from_order_side(side)?;
1260 params.filter(serde_json::json!({
1261 "side": side
1262 }));
1263 }
1264
1265 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1266
1267 let response = self.inner.http_cancel_all_orders(params).await?;
1268
1269 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1270
1271 let price_precision = self.get_price_precision(instrument_id.symbol.inner())?;
1272 let ts_init = self.generate_ts_init();
1273
1274 let mut reports = Vec::new();
1275
1276 for order in orders {
1277 reports.push(parse_order_status_report(
1278 &order,
1279 instrument_id,
1280 price_precision,
1281 ts_init,
1282 )?);
1283 }
1284
1285 Ok(reports)
1286 }
1287
1288 pub async fn modify_order(
1299 &self,
1300 instrument_id: InstrumentId,
1301 client_order_id: Option<ClientOrderId>,
1302 venue_order_id: Option<VenueOrderId>,
1303 quantity: Option<Quantity>,
1304 price: Option<Price>,
1305 trigger_price: Option<Price>,
1306 ) -> anyhow::Result<OrderStatusReport> {
1307 let mut params = PutOrderParamsBuilder::default();
1308 params.text(NAUTILUS_TRADER);
1309
1310 if let Some(venue_order_id) = venue_order_id {
1312 params.order_id(venue_order_id.as_str());
1313 } else if let Some(client_order_id) = client_order_id {
1314 params.orig_cl_ord_id(client_order_id.as_str());
1315 } else {
1316 return Err(anyhow::anyhow!(
1317 "Either client_order_id or venue_order_id must be provided"
1318 ));
1319 }
1320
1321 if let Some(quantity) = quantity {
1322 params.order_qty(quantity_to_u32(&quantity));
1323 }
1324
1325 if let Some(price) = price {
1326 params.price(price.as_f64());
1327 }
1328
1329 if let Some(trigger_price) = trigger_price {
1330 params.stop_px(trigger_price.as_f64());
1331 }
1332
1333 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1334
1335 let response = self.inner.http_amend_order(params).await?;
1336
1337 let order: BitmexOrder = serde_json::from_value(response)?;
1338
1339 if let Some(BitmexOrderStatus::Rejected) = order.ord_status {
1340 let reason = order
1341 .ord_rej_reason
1342 .map(|r| r.to_string())
1343 .unwrap_or_else(|| "No reason provided".to_string());
1344 return Err(anyhow::anyhow!("Order modification rejected: {}", reason));
1345 }
1346
1347 let price_precision = self.get_price_precision(instrument_id.symbol.inner())?;
1348 let ts_init = self.generate_ts_init();
1349
1350 parse_order_status_report(&order, instrument_id, price_precision, ts_init)
1351 }
1352
1353 pub async fn query_order(
1362 &self,
1363 instrument_id: InstrumentId,
1364 client_order_id: Option<ClientOrderId>,
1365 venue_order_id: Option<VenueOrderId>,
1366 ) -> anyhow::Result<Option<OrderStatusReport>> {
1367 let mut params = GetOrderParamsBuilder::default();
1368
1369 let filter_json = if let Some(client_order_id) = client_order_id {
1370 serde_json::json!({
1371 "clOrdID": client_order_id.to_string()
1372 })
1373 } else if let Some(venue_order_id) = venue_order_id {
1374 serde_json::json!({
1375 "orderID": venue_order_id.to_string()
1376 })
1377 } else {
1378 return Err(anyhow::anyhow!(
1379 "Either client_order_id or venue_order_id must be provided"
1380 ));
1381 };
1382
1383 params.filter(filter_json);
1384 params.count(1); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1387
1388 let response = self.inner.http_get_orders(params).await?;
1389
1390 if response.is_empty() {
1391 return Ok(None);
1392 }
1393
1394 let order = &response[0];
1395
1396 let price_precision = self.get_price_precision(instrument_id.symbol.inner())?;
1397 let ts_init = self.generate_ts_init();
1398
1399 let report = parse_order_status_report(order, instrument_id, price_precision, ts_init)?;
1400
1401 Ok(Some(report))
1402 }
1403
1404 pub async fn request_order_status_report(
1413 &self,
1414 instrument_id: InstrumentId,
1415 client_order_id: Option<ClientOrderId>,
1416 venue_order_id: Option<VenueOrderId>,
1417 ) -> anyhow::Result<OrderStatusReport> {
1418 let mut params = GetOrderParamsBuilder::default();
1419 params.symbol(instrument_id.symbol.as_str());
1420
1421 if let Some(venue_order_id) = venue_order_id {
1422 params.filter(serde_json::json!({
1423 "orderID": venue_order_id.as_str()
1424 }));
1425 } else if let Some(client_order_id) = client_order_id {
1426 params.filter(serde_json::json!({
1427 "clOrdID": client_order_id.as_str()
1428 }));
1429 }
1430
1431 params.count(1i32);
1432 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1433
1434 let response = self.inner.http_get_orders(params).await?;
1435
1436 let order = response
1437 .into_iter()
1438 .next()
1439 .ok_or_else(|| anyhow::anyhow!("Order not found"))?;
1440
1441 let price_precision = self.get_price_precision(instrument_id.symbol.inner())?;
1442 let ts_init = self.generate_ts_init();
1443
1444 parse_order_status_report(&order, instrument_id, price_precision, ts_init)
1445 }
1446
1447 pub async fn request_order_status_reports(
1456 &self,
1457 instrument_id: Option<InstrumentId>,
1458 open_only: bool,
1459 limit: Option<u32>,
1460 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1461 let mut params = GetOrderParamsBuilder::default();
1462
1463 if let Some(instrument_id) = &instrument_id {
1464 params.symbol(instrument_id.symbol.as_str());
1465 }
1466
1467 if open_only {
1468 params.filter(serde_json::json!({
1469 "open": true
1470 }));
1471 }
1472
1473 if let Some(limit) = limit {
1474 params.count(limit as i32);
1475 } else {
1476 params.count(500); }
1478
1479 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1482
1483 let response = self.inner.http_get_orders(params).await?;
1484
1485 let ts_init = self.generate_ts_init();
1486
1487 let mut reports = Vec::new();
1488
1489 for order in response {
1490 let Some(symbol) = order.symbol else {
1492 tracing::warn!("Order response missing symbol, skipping");
1493 continue;
1494 };
1495
1496 let instrument_id = parse_instrument_id(symbol);
1497 let price_precision = self.get_price_precision(symbol)?;
1498
1499 match parse_order_status_report(&order, instrument_id, price_precision, ts_init) {
1500 Ok(report) => reports.push(report),
1501 Err(e) => tracing::error!("Failed to parse order status report: {e}"),
1502 }
1503 }
1504
1505 Ok(reports)
1506 }
1507
1508 pub async fn request_trades(
1514 &self,
1515 instrument_id: InstrumentId,
1516 limit: Option<u32>,
1517 ) -> anyhow::Result<Vec<TradeTick>> {
1518 let mut params = GetTradeParamsBuilder::default();
1519 params.symbol(instrument_id.symbol.as_str());
1520
1521 if let Some(limit) = limit {
1522 params.count(limit as i32);
1523 }
1524 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1525
1526 let response = self.inner.http_get_trades(params).await?;
1527
1528 let ts_init = self.generate_ts_init();
1529
1530 let mut parsed_trades = Vec::new();
1531
1532 for trade in response {
1533 let price_precision = self.get_price_precision(trade.symbol)?;
1534
1535 match parse_trade(trade, price_precision, ts_init) {
1536 Ok(trade) => parsed_trades.push(trade),
1537 Err(e) => tracing::error!("Failed to parse trade: {e}"),
1538 }
1539 }
1540
1541 Ok(parsed_trades)
1542 }
1543
1544 pub async fn request_fill_reports(
1550 &self,
1551 instrument_id: Option<InstrumentId>,
1552 limit: Option<u32>,
1553 ) -> anyhow::Result<Vec<FillReport>> {
1554 let mut params = GetExecutionParamsBuilder::default();
1555 if let Some(instrument_id) = instrument_id {
1556 params.symbol(instrument_id.symbol.as_str());
1557 }
1558 if let Some(limit) = limit {
1559 params.count(limit as i32);
1560 } else {
1561 params.count(500); }
1563 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1566
1567 let response = self.inner.http_get_executions(params).await?;
1568
1569 let ts_init = self.generate_ts_init();
1570
1571 let mut reports = Vec::new();
1572
1573 for exec in response {
1574 let Some(symbol) = exec.symbol else {
1576 tracing::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
1577 continue;
1578 };
1579 let price_precision = self.get_price_precision(symbol)?;
1580
1581 match parse_fill_report(exec, price_precision, ts_init) {
1582 Ok(report) => reports.push(report),
1583 Err(e) => {
1584 let error_msg = e.to_string();
1586 if error_msg.starts_with("Skipping non-trade execution")
1587 || error_msg.starts_with("Skipping execution without order_id")
1588 {
1589 tracing::debug!("{e}");
1590 } else {
1591 tracing::error!("Failed to parse fill report: {e}");
1592 }
1593 }
1594 }
1595 }
1596
1597 Ok(reports)
1598 }
1599
1600 pub async fn request_position_status_reports(
1606 &self,
1607 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1608 let params = GetPositionParamsBuilder::default()
1609 .count(500) .build()
1611 .map_err(|e| anyhow::anyhow!(e))?;
1612
1613 let response = self.inner.http_get_positions(params).await?;
1614
1615 let ts_init = self.generate_ts_init();
1616
1617 let mut reports = Vec::new();
1618
1619 for pos in response {
1620 match parse_position_report(pos, ts_init) {
1621 Ok(report) => reports.push(report),
1622 Err(e) => tracing::error!("Failed to parse position report: {e}"),
1623 }
1624 }
1625
1626 Ok(reports)
1627 }
1628
1629 pub async fn submit_orders_bulk(
1639 &self,
1640 orders: Vec<PostOrderParams>,
1641 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1642 let params = PostOrderBulkParams { orders };
1643
1644 let response = self.inner.http_place_orders_bulk(params).await?;
1645
1646 let ts_init = self.generate_ts_init();
1647 let mut reports = Vec::new();
1648
1649 for order in response {
1650 let Some(symbol) = order.symbol else {
1652 tracing::warn!("Order response missing symbol, skipping");
1653 continue;
1654 };
1655
1656 let instrument_id = parse_instrument_id(symbol);
1657 let price_precision = self.get_price_precision(symbol)?;
1658
1659 match parse_order_status_report(&order, instrument_id, price_precision, ts_init) {
1660 Ok(report) => reports.push(report),
1661 Err(e) => tracing::error!("Failed to parse order status report: {e}"),
1662 }
1663 }
1664
1665 Ok(reports)
1666 }
1667
1668 pub async fn modify_orders_bulk(
1679 &self,
1680 orders: Vec<PutOrderParams>,
1681 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1682 let params = PutOrderBulkParams { orders };
1683
1684 let response = self.inner.http_amend_orders_bulk(params).await?;
1685
1686 let ts_init = self.generate_ts_init();
1687 let mut reports = Vec::new();
1688
1689 for order in response {
1690 let Some(symbol) = order.symbol else {
1692 tracing::warn!("Order response missing symbol, skipping");
1693 continue;
1694 };
1695
1696 let instrument_id = parse_instrument_id(symbol);
1697 let price_precision = self.get_price_precision(symbol)?;
1698
1699 match parse_order_status_report(&order, instrument_id, price_precision, ts_init) {
1700 Ok(report) => reports.push(report),
1701 Err(e) => tracing::error!("Failed to parse order status report: {e}"),
1702 }
1703 }
1704
1705 Ok(reports)
1706 }
1707
1708 pub async fn update_position_leverage(
1716 &self,
1717 symbol: &str,
1718 leverage: f64,
1719 ) -> anyhow::Result<PositionStatusReport> {
1720 let params = PostPositionLeverageParams {
1721 symbol: symbol.to_string(),
1722 leverage,
1723 target_account_id: None,
1724 };
1725
1726 let response = self.inner.http_update_position_leverage(params).await?;
1727
1728 let ts_init = self.generate_ts_init();
1729
1730 parse_position_report(response, ts_init)
1731 }
1732}
1733
1734#[cfg(test)]
1739mod tests {
1740 use rstest::rstest;
1741 use serde_json::json;
1742
1743 use super::*;
1744
1745 #[rstest]
1746 fn test_sign_request_generates_correct_headers() {
1747 let client = BitmexHttpInnerClient::with_credentials(
1748 "test_api_key".to_string(),
1749 "test_api_secret".to_string(),
1750 "http://localhost:8080".to_string(),
1751 Some(60),
1752 None, None, None, )
1756 .expect("Failed to create test client");
1757
1758 let headers = client
1759 .sign_request(&Method::GET, "/api/v1/order", None)
1760 .unwrap();
1761
1762 assert!(headers.contains_key("api-key"));
1763 assert!(headers.contains_key("api-signature"));
1764 assert!(headers.contains_key("api-expires"));
1765 assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
1766 }
1767
1768 #[rstest]
1769 fn test_sign_request_with_body() {
1770 let client = BitmexHttpInnerClient::with_credentials(
1771 "test_api_key".to_string(),
1772 "test_api_secret".to_string(),
1773 "http://localhost:8080".to_string(),
1774 Some(60),
1775 None, None, None, )
1779 .expect("Failed to create test client");
1780
1781 let body = json!({"symbol": "XBTUSD", "orderQty": 100});
1782 let body_bytes = serde_json::to_vec(&body).unwrap();
1783
1784 let headers_without_body = client
1785 .sign_request(&Method::POST, "/api/v1/order", None)
1786 .unwrap();
1787 let headers_with_body = client
1788 .sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
1789 .unwrap();
1790
1791 assert_ne!(
1793 headers_without_body.get("api-signature").unwrap(),
1794 headers_with_body.get("api-signature").unwrap()
1795 );
1796 }
1797}