1use std::{
21 collections::HashMap,
22 fmt::{Debug, Formatter},
23 num::NonZeroU32,
24 sync::{
25 Arc, LazyLock,
26 atomic::{AtomicBool, Ordering},
27 },
28};
29
30use chrono::{DateTime, Utc};
31use dashmap::DashMap;
32use nautilus_core::{
33 consts::NAUTILUS_USER_AGENT, nanos::UnixNanos, time::get_atomic_clock_realtime,
34};
35use nautilus_model::{
36 data::{Bar, BarType, TradeTick},
37 enums::{OrderSide, OrderType, PositionSideSpecified, TimeInForce},
38 events::account::state::AccountState,
39 identifiers::{AccountId, ClientOrderId, InstrumentId, Symbol, VenueOrderId},
40 instruments::{Instrument, InstrumentAny},
41 reports::{FillReport, OrderStatusReport, PositionStatusReport},
42 types::{Price, Quantity},
43};
44use nautilus_network::{
45 http::HttpClient,
46 ratelimiter::quota::Quota,
47 retry::{RetryConfig, RetryManager},
48};
49use reqwest::{Method, header::USER_AGENT};
50use rust_decimal::Decimal;
51use serde::{Serialize, de::DeserializeOwned};
52use tokio_util::sync::CancellationToken;
53use ustr::Ustr;
54
55use super::{
56 error::BybitHttpError,
57 models::{
58 BybitAccountDetailsResponse, BybitBorrowResponse, BybitFeeRate, BybitFeeRateResponse,
59 BybitInstrumentInverseResponse, BybitInstrumentLinearResponse,
60 BybitInstrumentOptionResponse, BybitInstrumentSpotResponse, BybitKlinesResponse,
61 BybitNoConvertRepayResponse, BybitOpenOrdersResponse, BybitOrderHistoryResponse,
62 BybitPlaceOrderResponse, BybitPositionListResponse, BybitServerTimeResponse,
63 BybitSetLeverageResponse, BybitSetMarginModeResponse, BybitSetTradingStopResponse,
64 BybitSwitchModeResponse, BybitTradeHistoryResponse, BybitTradesResponse,
65 BybitWalletBalanceResponse,
66 },
67 query::{
68 BybitAmendOrderParamsBuilder, BybitBatchAmendOrderEntryBuilder,
69 BybitBatchCancelOrderEntryBuilder, BybitBatchCancelOrderParamsBuilder,
70 BybitBatchPlaceOrderEntryBuilder, BybitBorrowParamsBuilder,
71 BybitCancelAllOrdersParamsBuilder, BybitCancelOrderParamsBuilder, BybitFeeRateParams,
72 BybitInstrumentsInfoParams, BybitKlinesParams, BybitKlinesParamsBuilder,
73 BybitNoConvertRepayParamsBuilder, BybitOpenOrdersParamsBuilder,
74 BybitOrderHistoryParamsBuilder, BybitPlaceOrderParamsBuilder, BybitPositionListParams,
75 BybitSetLeverageParamsBuilder, BybitSetMarginModeParamsBuilder, BybitSetTradingStopParams,
76 BybitSwitchModeParamsBuilder, BybitTickersParams, BybitTradeHistoryParams,
77 BybitTradesParams, BybitTradesParamsBuilder, BybitWalletBalanceParams,
78 },
79};
80use crate::{
81 common::{
82 consts::BYBIT_NAUTILUS_BROKER_ID,
83 credential::Credential,
84 enums::{
85 BybitAccountType, BybitEnvironment, BybitMarginMode, BybitOrderSide, BybitOrderType,
86 BybitPositionMode, BybitProductType, BybitTimeInForce,
87 },
88 models::BybitResponse,
89 parse::{
90 bar_spec_to_bybit_interval, make_bybit_symbol, parse_account_state, parse_fill_report,
91 parse_inverse_instrument, parse_kline_bar, parse_linear_instrument,
92 parse_option_instrument, parse_order_status_report, parse_position_status_report,
93 parse_spot_instrument, parse_trade_tick,
94 },
95 symbol::BybitSymbol,
96 urls::bybit_http_base_url,
97 },
98 http::query::BybitFeeRateParamsBuilder,
99};
100
101const DEFAULT_RECV_WINDOW_MS: u64 = 5_000;
102
103pub static BYBIT_REST_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
108 Quota::per_second(NonZeroU32::new(10).expect("Should be a valid non-zero u32"))
109});
110
111pub static BYBIT_REPAY_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
115 Quota::per_second(NonZeroU32::new(1).expect("Should be a valid non-zero u32"))
116});
117
118const BYBIT_GLOBAL_RATE_KEY: &str = "bybit:global";
119const BYBIT_REPAY_ROUTE_KEY: &str = "bybit:/v5/account/no-convert-repay";
120
121pub struct BybitRawHttpClient {
126 base_url: String,
127 client: HttpClient,
128 credential: Option<Credential>,
129 recv_window_ms: u64,
130 retry_manager: RetryManager<BybitHttpError>,
131 cancellation_token: CancellationToken,
132}
133
134impl Default for BybitRawHttpClient {
135 fn default() -> Self {
136 Self::new(None, Some(60), None, None, None, None, None)
137 .expect("Failed to create default BybitRawHttpClient")
138 }
139}
140
141impl Debug for BybitRawHttpClient {
142 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
143 f.debug_struct("BybitRawHttpClient")
144 .field("base_url", &self.base_url)
145 .field("has_credentials", &self.credential.is_some())
146 .field("recv_window_ms", &self.recv_window_ms)
147 .finish()
148 }
149}
150
151impl BybitRawHttpClient {
152 pub fn cancel_all_requests(&self) {
154 self.cancellation_token.cancel();
155 }
156
157 pub fn cancellation_token(&self) -> &CancellationToken {
159 &self.cancellation_token
160 }
161
162 #[allow(clippy::too_many_arguments)]
168 pub fn new(
169 base_url: Option<String>,
170 timeout_secs: Option<u64>,
171 max_retries: Option<u32>,
172 retry_delay_ms: Option<u64>,
173 retry_delay_max_ms: Option<u64>,
174 recv_window_ms: Option<u64>,
175 proxy_url: Option<String>,
176 ) -> Result<Self, BybitHttpError> {
177 let retry_config = RetryConfig {
178 max_retries: max_retries.unwrap_or(3),
179 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
180 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
181 backoff_factor: 2.0,
182 jitter_ms: 1000,
183 operation_timeout_ms: Some(60_000),
184 immediate_first: false,
185 max_elapsed_ms: Some(180_000),
186 };
187
188 let retry_manager = RetryManager::new(retry_config);
189
190 Ok(Self {
191 base_url: base_url
192 .unwrap_or_else(|| bybit_http_base_url(BybitEnvironment::Mainnet).to_string()),
193 client: HttpClient::new(
194 Self::default_headers(),
195 vec![],
196 Self::rate_limiter_quotas(),
197 Some(*BYBIT_REST_QUOTA),
198 timeout_secs,
199 proxy_url,
200 )
201 .map_err(|e| {
202 BybitHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
203 })?,
204 credential: None,
205 recv_window_ms: recv_window_ms.unwrap_or(DEFAULT_RECV_WINDOW_MS),
206 retry_manager,
207 cancellation_token: CancellationToken::new(),
208 })
209 }
210
211 #[allow(clippy::too_many_arguments)]
217 pub fn with_credentials(
218 api_key: String,
219 api_secret: String,
220 base_url: Option<String>,
221 timeout_secs: Option<u64>,
222 max_retries: Option<u32>,
223 retry_delay_ms: Option<u64>,
224 retry_delay_max_ms: Option<u64>,
225 recv_window_ms: Option<u64>,
226 proxy_url: Option<String>,
227 ) -> Result<Self, BybitHttpError> {
228 let retry_config = RetryConfig {
229 max_retries: max_retries.unwrap_or(3),
230 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
231 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
232 backoff_factor: 2.0,
233 jitter_ms: 1000,
234 operation_timeout_ms: Some(60_000),
235 immediate_first: false,
236 max_elapsed_ms: Some(180_000),
237 };
238
239 let retry_manager = RetryManager::new(retry_config);
240
241 Ok(Self {
242 base_url: base_url
243 .unwrap_or_else(|| bybit_http_base_url(BybitEnvironment::Mainnet).to_string()),
244 client: HttpClient::new(
245 Self::default_headers(),
246 vec![],
247 Self::rate_limiter_quotas(),
248 Some(*BYBIT_REST_QUOTA),
249 timeout_secs,
250 proxy_url,
251 )
252 .map_err(|e| {
253 BybitHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
254 })?,
255 credential: Some(Credential::new(api_key, api_secret)),
256 recv_window_ms: recv_window_ms.unwrap_or(DEFAULT_RECV_WINDOW_MS),
257 retry_manager,
258 cancellation_token: CancellationToken::new(),
259 })
260 }
261
262 fn default_headers() -> HashMap<String, String> {
263 HashMap::from([
264 (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string()),
265 ("Referer".to_string(), BYBIT_NAUTILUS_BROKER_ID.to_string()),
266 ])
267 }
268
269 fn rate_limiter_quotas() -> Vec<(String, Quota)> {
270 vec![
271 (BYBIT_GLOBAL_RATE_KEY.to_string(), *BYBIT_REST_QUOTA),
272 (BYBIT_REPAY_ROUTE_KEY.to_string(), *BYBIT_REPAY_QUOTA),
273 ]
274 }
275
276 fn rate_limit_keys(endpoint: &str) -> Vec<String> {
277 let normalized = endpoint.split('?').next().unwrap_or(endpoint);
278 let route = format!("bybit:{normalized}");
279
280 vec![BYBIT_GLOBAL_RATE_KEY.to_string(), route]
281 }
282
283 fn sign_request(
284 &self,
285 timestamp: &str,
286 params: Option<&str>,
287 ) -> Result<HashMap<String, String>, BybitHttpError> {
288 let credential = self
289 .credential
290 .as_ref()
291 .ok_or(BybitHttpError::MissingCredentials)?;
292
293 let signature = credential.sign_with_payload(timestamp, self.recv_window_ms, params);
294
295 let mut headers = HashMap::new();
296 headers.insert(
297 "X-BAPI-API-KEY".to_string(),
298 credential.api_key().to_string(),
299 );
300 headers.insert("X-BAPI-TIMESTAMP".to_string(), timestamp.to_string());
301 headers.insert("X-BAPI-SIGN".to_string(), signature);
302 headers.insert(
303 "X-BAPI-RECV-WINDOW".to_string(),
304 self.recv_window_ms.to_string(),
305 );
306
307 Ok(headers)
308 }
309
310 async fn send_request<T: DeserializeOwned, P: Serialize>(
311 &self,
312 method: Method,
313 endpoint: &str,
314 params: Option<&P>,
315 body: Option<Vec<u8>>,
316 authenticate: bool,
317 ) -> Result<T, BybitHttpError> {
318 let endpoint = endpoint.to_string();
319 let url = format!("{}{endpoint}", self.base_url);
320 let method_clone = method.clone();
321 let body_clone = body.clone();
322
323 let params_str = if method == Method::GET {
325 params
326 .map(serde_urlencoded::to_string)
327 .transpose()
328 .map_err(|e| {
329 BybitHttpError::JsonError(format!("Failed to serialize params: {e}"))
330 })?
331 } else {
332 None
333 };
334
335 let operation = || {
336 let url = url.clone();
337 let method = method_clone.clone();
338 let body = body_clone.clone();
339 let endpoint = endpoint.clone();
340 let params_str = params_str.clone();
341
342 async move {
343 let mut headers = Self::default_headers();
344
345 if authenticate {
346 let timestamp = get_atomic_clock_realtime().get_time_ms().to_string();
347
348 let sign_payload = if method == Method::GET {
349 params_str.as_deref()
350 } else {
351 body.as_ref().and_then(|b| std::str::from_utf8(b).ok())
352 };
353
354 let auth_headers = self.sign_request(×tamp, sign_payload)?;
355 headers.extend(auth_headers);
356 }
357
358 if method == Method::POST || method == Method::PUT {
359 headers.insert("Content-Type".to_string(), "application/json".to_string());
360 }
361
362 let full_url = if let Some(ref query) = params_str {
363 if query.is_empty() {
364 url
365 } else {
366 format!("{}?{}", url, query)
367 }
368 } else {
369 url
370 };
371
372 let rate_limit_keys = Self::rate_limit_keys(&endpoint);
373
374 let response = self
375 .client
376 .request(
377 method,
378 full_url,
379 None,
380 Some(headers),
381 body,
382 None,
383 Some(rate_limit_keys),
384 )
385 .await?;
386
387 if response.status.as_u16() >= 400 {
388 let body = String::from_utf8_lossy(&response.body).to_string();
389 return Err(BybitHttpError::UnexpectedStatus {
390 status: response.status.as_u16(),
391 body,
392 });
393 }
394
395 let bybit_response: BybitResponse<serde_json::Value> =
397 serde_json::from_slice(&response.body)?;
398
399 if bybit_response.ret_code != 0 {
400 return Err(BybitHttpError::BybitError {
401 error_code: bybit_response.ret_code as i32,
402 message: bybit_response.ret_msg,
403 });
404 }
405
406 let result: T = serde_json::from_slice(&response.body)?;
408 Ok(result)
409 }
410 };
411
412 let should_retry = |error: &BybitHttpError| -> bool {
413 match error {
414 BybitHttpError::NetworkError(_) => true,
415 BybitHttpError::UnexpectedStatus { status, .. } => *status >= 500,
416 _ => false,
417 }
418 };
419
420 let create_error = |msg: String| -> BybitHttpError {
421 if msg == "canceled" {
422 BybitHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
423 } else {
424 BybitHttpError::NetworkError(msg)
425 }
426 };
427
428 self.retry_manager
429 .execute_with_retry_with_cancel(
430 endpoint.as_str(),
431 operation,
432 should_retry,
433 create_error,
434 &self.cancellation_token,
435 )
436 .await
437 }
438
439 #[cfg(test)]
440 fn build_path<S: Serialize>(base: &str, params: &S) -> Result<String, BybitHttpError> {
441 let query = serde_urlencoded::to_string(params)
442 .map_err(|e| BybitHttpError::JsonError(e.to_string()))?;
443 if query.is_empty() {
444 Ok(base.to_owned())
445 } else {
446 Ok(format!("{base}?{query}"))
447 }
448 }
449
450 pub async fn get_server_time(&self) -> Result<BybitServerTimeResponse, BybitHttpError> {
460 self.send_request::<_, ()>(Method::GET, "/v5/market/time", None, None, false)
461 .await
462 }
463
464 pub async fn get_instruments<T: DeserializeOwned>(
474 &self,
475 params: &BybitInstrumentsInfoParams,
476 ) -> Result<T, BybitHttpError> {
477 self.send_request(
478 Method::GET,
479 "/v5/market/instruments-info",
480 Some(params),
481 None,
482 false,
483 )
484 .await
485 }
486
487 pub async fn get_instruments_spot(
497 &self,
498 params: &BybitInstrumentsInfoParams,
499 ) -> Result<BybitInstrumentSpotResponse, BybitHttpError> {
500 self.get_instruments(params).await
501 }
502
503 pub async fn get_instruments_linear(
513 &self,
514 params: &BybitInstrumentsInfoParams,
515 ) -> Result<BybitInstrumentLinearResponse, BybitHttpError> {
516 self.get_instruments(params).await
517 }
518
519 pub async fn get_instruments_inverse(
529 &self,
530 params: &BybitInstrumentsInfoParams,
531 ) -> Result<BybitInstrumentInverseResponse, BybitHttpError> {
532 self.get_instruments(params).await
533 }
534
535 pub async fn get_instruments_option(
545 &self,
546 params: &BybitInstrumentsInfoParams,
547 ) -> Result<BybitInstrumentOptionResponse, BybitHttpError> {
548 self.get_instruments(params).await
549 }
550
551 pub async fn get_klines(
561 &self,
562 params: &BybitKlinesParams,
563 ) -> Result<BybitKlinesResponse, BybitHttpError> {
564 self.send_request(Method::GET, "/v5/market/kline", Some(params), None, false)
565 .await
566 }
567
568 pub async fn get_recent_trades(
578 &self,
579 params: &BybitTradesParams,
580 ) -> Result<BybitTradesResponse, BybitHttpError> {
581 self.send_request(
582 Method::GET,
583 "/v5/market/recent-trade",
584 Some(params),
585 None,
586 false,
587 )
588 .await
589 }
590
591 pub async fn get_open_orders(
601 &self,
602 category: BybitProductType,
603 symbol: Option<&str>,
604 ) -> Result<BybitOpenOrdersResponse, BybitHttpError> {
605 #[derive(Serialize)]
606 #[serde(rename_all = "camelCase")]
607 struct Params<'a> {
608 category: BybitProductType,
609 #[serde(skip_serializing_if = "Option::is_none")]
610 symbol: Option<&'a str>,
611 }
612
613 let params = Params { category, symbol };
614 self.send_request(Method::GET, "/v5/order/realtime", Some(¶ms), None, true)
615 .await
616 }
617
618 pub async fn place_order(
628 &self,
629 request: &serde_json::Value,
630 ) -> Result<BybitPlaceOrderResponse, BybitHttpError> {
631 let body = serde_json::to_vec(request)?;
632 self.send_request::<_, ()>(Method::POST, "/v5/order/create", None, Some(body), true)
633 .await
634 }
635
636 pub async fn get_wallet_balance(
646 &self,
647 params: &BybitWalletBalanceParams,
648 ) -> Result<BybitWalletBalanceResponse, BybitHttpError> {
649 self.send_request(
650 Method::GET,
651 "/v5/account/wallet-balance",
652 Some(params),
653 None,
654 true,
655 )
656 .await
657 }
658
659 pub async fn get_account_details(&self) -> Result<BybitAccountDetailsResponse, BybitHttpError> {
669 self.send_request::<_, ()>(Method::GET, "/v5/user/query-api", None, None, true)
670 .await
671 }
672
673 pub async fn get_fee_rate(
683 &self,
684 params: &BybitFeeRateParams,
685 ) -> Result<BybitFeeRateResponse, BybitHttpError> {
686 self.send_request(
687 Method::GET,
688 "/v5/account/fee-rate",
689 Some(params),
690 None,
691 true,
692 )
693 .await
694 }
695
696 pub async fn set_margin_mode(
713 &self,
714 margin_mode: BybitMarginMode,
715 ) -> Result<BybitSetMarginModeResponse, BybitHttpError> {
716 let params = BybitSetMarginModeParamsBuilder::default()
717 .set_margin_mode(margin_mode)
718 .build()
719 .expect("Failed to build BybitSetMarginModeParams");
720
721 let body = serde_json::to_vec(¶ms)?;
722 self.send_request::<_, ()>(
723 Method::POST,
724 "/v5/account/set-margin-mode",
725 None,
726 Some(body),
727 true,
728 )
729 .await
730 }
731
732 pub async fn set_leverage(
749 &self,
750 product_type: BybitProductType,
751 symbol: &str,
752 buy_leverage: &str,
753 sell_leverage: &str,
754 ) -> Result<BybitSetLeverageResponse, BybitHttpError> {
755 let params = BybitSetLeverageParamsBuilder::default()
756 .category(product_type)
757 .symbol(symbol.to_string())
758 .buy_leverage(buy_leverage.to_string())
759 .sell_leverage(sell_leverage.to_string())
760 .build()
761 .expect("Failed to build BybitSetLeverageParams");
762
763 let body = serde_json::to_vec(¶ms)?;
764 self.send_request::<_, ()>(
765 Method::POST,
766 "/v5/position/set-leverage",
767 None,
768 Some(body),
769 true,
770 )
771 .await
772 }
773
774 pub async fn switch_mode(
791 &self,
792 product_type: BybitProductType,
793 mode: BybitPositionMode,
794 symbol: Option<String>,
795 coin: Option<String>,
796 ) -> Result<BybitSwitchModeResponse, BybitHttpError> {
797 let mut builder = BybitSwitchModeParamsBuilder::default();
798 builder.category(product_type);
799 builder.mode(mode);
800
801 if let Some(s) = symbol {
802 builder.symbol(s);
803 }
804 if let Some(c) = coin {
805 builder.coin(c);
806 }
807
808 let params = builder
809 .build()
810 .expect("Failed to build BybitSwitchModeParams");
811
812 let body = serde_json::to_vec(¶ms)?;
813 self.send_request::<_, ()>(
814 Method::POST,
815 "/v5/position/switch-mode",
816 None,
817 Some(body),
818 true,
819 )
820 .await
821 }
822
823 pub async fn set_trading_stop(
836 &self,
837 params: &BybitSetTradingStopParams,
838 ) -> Result<BybitSetTradingStopResponse, BybitHttpError> {
839 let body = serde_json::to_vec(params)?;
840 self.send_request::<_, ()>(
841 Method::POST,
842 "/v5/position/trading-stop",
843 None,
844 Some(body),
845 true,
846 )
847 .await
848 }
849
850 pub async fn borrow(
867 &self,
868 coin: &str,
869 amount: &str,
870 ) -> Result<BybitBorrowResponse, BybitHttpError> {
871 let params = BybitBorrowParamsBuilder::default()
872 .coin(coin.to_string())
873 .amount(amount.to_string())
874 .build()
875 .expect("Failed to build BybitBorrowParams");
876
877 let body = serde_json::to_vec(¶ms)?;
878 self.send_request::<_, ()>(Method::POST, "/v5/account/borrow", None, Some(body), true)
879 .await
880 }
881
882 pub async fn no_convert_repay(
900 &self,
901 coin: &str,
902 amount: Option<&str>,
903 ) -> Result<BybitNoConvertRepayResponse, BybitHttpError> {
904 let mut builder = BybitNoConvertRepayParamsBuilder::default();
905 builder.coin(coin.to_string());
906
907 if let Some(amt) = amount {
908 builder.amount(amt.to_string());
909 }
910
911 let params = builder
912 .build()
913 .expect("Failed to build BybitNoConvertRepayParams");
914
915 if let Ok(params_json) = serde_json::to_string(¶ms) {
917 tracing::debug!("Repay request params: {params_json}");
918 }
919
920 let body = serde_json::to_vec(¶ms)?;
921 let result = self
922 .send_request::<_, ()>(
923 Method::POST,
924 "/v5/account/no-convert-repay",
925 None,
926 Some(body),
927 true,
928 )
929 .await;
930
931 if let Err(ref e) = result
933 && let Ok(params_json) = serde_json::to_string(¶ms)
934 {
935 tracing::error!("Repay request failed with params {params_json}: {e}");
936 }
937
938 result
939 }
940
941 pub async fn get_tickers<T: DeserializeOwned>(
951 &self,
952 params: &BybitTickersParams,
953 ) -> Result<T, BybitHttpError> {
954 self.send_request(Method::GET, "/v5/market/tickers", Some(params), None, false)
955 .await
956 }
957
958 pub async fn get_trade_history(
968 &self,
969 params: &BybitTradeHistoryParams,
970 ) -> Result<BybitTradeHistoryResponse, BybitHttpError> {
971 self.send_request(Method::GET, "/v5/execution/list", Some(params), None, true)
972 .await
973 }
974
975 pub async fn get_positions(
988 &self,
989 params: &BybitPositionListParams,
990 ) -> Result<BybitPositionListResponse, BybitHttpError> {
991 self.send_request(Method::GET, "/v5/position/list", Some(params), None, true)
992 .await
993 }
994
995 #[must_use]
997 pub fn base_url(&self) -> &str {
998 &self.base_url
999 }
1000
1001 #[must_use]
1003 pub fn recv_window_ms(&self) -> u64 {
1004 self.recv_window_ms
1005 }
1006
1007 #[must_use]
1009 pub fn credential(&self) -> Option<&Credential> {
1010 self.credential.as_ref()
1011 }
1012}
1013
1014#[cfg_attr(
1016 feature = "python",
1017 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
1018)]
1019pub struct BybitHttpClient {
1024 pub(crate) inner: Arc<BybitRawHttpClient>,
1025 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
1026 cache_initialized: Arc<AtomicBool>,
1027 use_spot_position_reports: Arc<AtomicBool>,
1028}
1029
1030impl Clone for BybitHttpClient {
1031 fn clone(&self) -> Self {
1032 Self {
1033 inner: self.inner.clone(),
1034 instruments_cache: self.instruments_cache.clone(),
1035 cache_initialized: self.cache_initialized.clone(),
1036 use_spot_position_reports: self.use_spot_position_reports.clone(),
1037 }
1038 }
1039}
1040
1041impl Default for BybitHttpClient {
1042 fn default() -> Self {
1043 Self::new(None, Some(60), None, None, None, None, None)
1044 .expect("Failed to create default BybitHttpClient")
1045 }
1046}
1047
1048impl Debug for BybitHttpClient {
1049 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1050 f.debug_struct("BybitHttpClient")
1051 .field("inner", &self.inner)
1052 .finish()
1053 }
1054}
1055
1056impl BybitHttpClient {
1057 #[allow(clippy::too_many_arguments)]
1063 pub fn new(
1064 base_url: Option<String>,
1065 timeout_secs: Option<u64>,
1066 max_retries: Option<u32>,
1067 retry_delay_ms: Option<u64>,
1068 retry_delay_max_ms: Option<u64>,
1069 recv_window_ms: Option<u64>,
1070 proxy_url: Option<String>,
1071 ) -> Result<Self, BybitHttpError> {
1072 Ok(Self {
1073 inner: Arc::new(BybitRawHttpClient::new(
1074 base_url,
1075 timeout_secs,
1076 max_retries,
1077 retry_delay_ms,
1078 retry_delay_max_ms,
1079 recv_window_ms,
1080 proxy_url,
1081 )?),
1082 instruments_cache: Arc::new(DashMap::new()),
1083 cache_initialized: Arc::new(AtomicBool::new(false)),
1084 use_spot_position_reports: Arc::new(AtomicBool::new(false)),
1085 })
1086 }
1087
1088 #[allow(clippy::too_many_arguments)]
1094 pub fn with_credentials(
1095 api_key: String,
1096 api_secret: String,
1097 base_url: Option<String>,
1098 timeout_secs: Option<u64>,
1099 max_retries: Option<u32>,
1100 retry_delay_ms: Option<u64>,
1101 retry_delay_max_ms: Option<u64>,
1102 recv_window_ms: Option<u64>,
1103 proxy_url: Option<String>,
1104 ) -> Result<Self, BybitHttpError> {
1105 Ok(Self {
1106 inner: Arc::new(BybitRawHttpClient::with_credentials(
1107 api_key,
1108 api_secret,
1109 base_url,
1110 timeout_secs,
1111 max_retries,
1112 retry_delay_ms,
1113 retry_delay_max_ms,
1114 recv_window_ms,
1115 proxy_url,
1116 )?),
1117 instruments_cache: Arc::new(DashMap::new()),
1118 cache_initialized: Arc::new(AtomicBool::new(false)),
1119 use_spot_position_reports: Arc::new(AtomicBool::new(false)),
1120 })
1121 }
1122
1123 #[must_use]
1124 pub fn base_url(&self) -> &str {
1125 self.inner.base_url()
1126 }
1127
1128 #[must_use]
1129 pub fn recv_window_ms(&self) -> u64 {
1130 self.inner.recv_window_ms()
1131 }
1132
1133 #[must_use]
1134 pub fn credential(&self) -> Option<&Credential> {
1135 self.inner.credential()
1136 }
1137
1138 pub fn set_use_spot_position_reports(&self, use_spot_position_reports: bool) {
1139 self.use_spot_position_reports
1140 .store(use_spot_position_reports, Ordering::Relaxed);
1141 }
1142
1143 pub fn cancel_all_requests(&self) {
1144 self.inner.cancel_all_requests();
1145 }
1146
1147 pub fn cancellation_token(&self) -> &CancellationToken {
1148 self.inner.cancellation_token()
1149 }
1150
1151 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1153 self.instruments_cache
1154 .insert(instrument.symbol().inner(), instrument);
1155 self.cache_initialized.store(true, Ordering::Release);
1156 }
1157
1158 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1160 for instrument in instruments {
1161 self.instruments_cache
1162 .insert(instrument.symbol().inner(), instrument);
1163 }
1164 self.cache_initialized.store(true, Ordering::Release);
1165 }
1166
1167 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1168 self.instruments_cache
1169 .get(symbol)
1170 .map(|entry| entry.value().clone())
1171 }
1172
1173 fn instrument_from_cache(&self, symbol: &Symbol) -> anyhow::Result<InstrumentAny> {
1174 self.get_instrument(&symbol.inner()).ok_or_else(|| {
1175 anyhow::anyhow!(
1176 "Instrument {symbol} not found in cache, ensure instruments loaded first"
1177 )
1178 })
1179 }
1180
1181 #[must_use]
1182 fn generate_ts_init(&self) -> UnixNanos {
1183 get_atomic_clock_realtime().get_time_ns()
1184 }
1185
1186 pub async fn get_server_time(&self) -> Result<BybitServerTimeResponse, BybitHttpError> {
1198 self.inner.get_server_time().await
1199 }
1200
1201 pub async fn get_instruments<T: DeserializeOwned>(
1213 &self,
1214 params: &BybitInstrumentsInfoParams,
1215 ) -> Result<T, BybitHttpError> {
1216 self.inner.get_instruments(params).await
1217 }
1218
1219 pub async fn get_instruments_spot(
1231 &self,
1232 params: &BybitInstrumentsInfoParams,
1233 ) -> Result<BybitInstrumentSpotResponse, BybitHttpError> {
1234 self.inner.get_instruments_spot(params).await
1235 }
1236
1237 pub async fn get_instruments_linear(
1249 &self,
1250 params: &BybitInstrumentsInfoParams,
1251 ) -> Result<BybitInstrumentLinearResponse, BybitHttpError> {
1252 self.inner.get_instruments_linear(params).await
1253 }
1254
1255 pub async fn get_instruments_inverse(
1267 &self,
1268 params: &BybitInstrumentsInfoParams,
1269 ) -> Result<BybitInstrumentInverseResponse, BybitHttpError> {
1270 self.inner.get_instruments_inverse(params).await
1271 }
1272
1273 pub async fn get_instruments_option(
1285 &self,
1286 params: &BybitInstrumentsInfoParams,
1287 ) -> Result<BybitInstrumentOptionResponse, BybitHttpError> {
1288 self.inner.get_instruments_option(params).await
1289 }
1290
1291 pub async fn get_klines(
1303 &self,
1304 params: &BybitKlinesParams,
1305 ) -> Result<BybitKlinesResponse, BybitHttpError> {
1306 self.inner.get_klines(params).await
1307 }
1308
1309 pub async fn get_recent_trades(
1321 &self,
1322 params: &BybitTradesParams,
1323 ) -> Result<BybitTradesResponse, BybitHttpError> {
1324 self.inner.get_recent_trades(params).await
1325 }
1326
1327 pub async fn get_open_orders(
1339 &self,
1340 category: BybitProductType,
1341 symbol: Option<&str>,
1342 ) -> Result<BybitOpenOrdersResponse, BybitHttpError> {
1343 self.inner.get_open_orders(category, symbol).await
1344 }
1345
1346 pub async fn place_order(
1358 &self,
1359 request: &serde_json::Value,
1360 ) -> Result<BybitPlaceOrderResponse, BybitHttpError> {
1361 self.inner.place_order(request).await
1362 }
1363
1364 pub async fn get_wallet_balance(
1376 &self,
1377 params: &BybitWalletBalanceParams,
1378 ) -> Result<BybitWalletBalanceResponse, BybitHttpError> {
1379 self.inner.get_wallet_balance(params).await
1380 }
1381
1382 pub async fn get_account_details(&self) -> Result<BybitAccountDetailsResponse, BybitHttpError> {
1394 self.inner.get_account_details().await
1395 }
1396
1397 pub async fn get_positions(
1410 &self,
1411 params: &BybitPositionListParams,
1412 ) -> Result<BybitPositionListResponse, BybitHttpError> {
1413 self.inner.get_positions(params).await
1414 }
1415
1416 pub async fn get_fee_rate(
1429 &self,
1430 params: &BybitFeeRateParams,
1431 ) -> Result<BybitFeeRateResponse, BybitHttpError> {
1432 self.inner.get_fee_rate(params).await
1433 }
1434
1435 pub async fn set_margin_mode(
1448 &self,
1449 margin_mode: BybitMarginMode,
1450 ) -> Result<BybitSetMarginModeResponse, BybitHttpError> {
1451 self.inner.set_margin_mode(margin_mode).await
1452 }
1453
1454 pub async fn set_leverage(
1467 &self,
1468 product_type: BybitProductType,
1469 symbol: &str,
1470 buy_leverage: &str,
1471 sell_leverage: &str,
1472 ) -> Result<BybitSetLeverageResponse, BybitHttpError> {
1473 self.inner
1474 .set_leverage(product_type, symbol, buy_leverage, sell_leverage)
1475 .await
1476 }
1477
1478 pub async fn switch_mode(
1491 &self,
1492 product_type: BybitProductType,
1493 mode: BybitPositionMode,
1494 symbol: Option<String>,
1495 coin: Option<String>,
1496 ) -> Result<BybitSwitchModeResponse, BybitHttpError> {
1497 self.inner
1498 .switch_mode(product_type, mode, symbol, coin)
1499 .await
1500 }
1501
1502 pub async fn set_trading_stop(
1515 &self,
1516 params: &BybitSetTradingStopParams,
1517 ) -> Result<BybitSetTradingStopResponse, BybitHttpError> {
1518 self.inner.set_trading_stop(params).await
1519 }
1520
1521 pub async fn get_spot_borrow_amount(&self, coin: &str) -> anyhow::Result<Decimal> {
1536 let params = BybitWalletBalanceParams {
1537 account_type: BybitAccountType::Unified,
1538 coin: Some(coin.to_string()),
1539 };
1540
1541 let response = self.inner.get_wallet_balance(¶ms).await?;
1542
1543 let borrow_amount = response
1544 .result
1545 .list
1546 .first()
1547 .and_then(|wallet| wallet.coin.iter().find(|c| c.coin.as_str() == coin))
1548 .map_or(Decimal::ZERO, |balance| balance.spot_borrow);
1549
1550 Ok(borrow_amount)
1551 }
1552
1553 pub async fn borrow_spot(
1569 &self,
1570 coin: &str,
1571 amount: Quantity,
1572 ) -> anyhow::Result<BybitBorrowResponse> {
1573 let amount_str = amount.to_string();
1574 self.inner
1575 .borrow(coin, &amount_str)
1576 .await
1577 .map_err(|e| anyhow::anyhow!("Failed to borrow {} {}: {}", amount, coin, e))
1578 }
1579
1580 pub async fn repay_spot_borrow(
1597 &self,
1598 coin: &str,
1599 amount: Option<Quantity>,
1600 ) -> anyhow::Result<BybitNoConvertRepayResponse> {
1601 let amount_str = amount.as_ref().map(|q| q.to_string());
1602 self.inner
1603 .no_convert_repay(coin, amount_str.as_deref())
1604 .await
1605 .map_err(|e| anyhow::anyhow!("Failed to repay spot borrow for {coin}: {e}"))
1606 }
1607
1608 async fn generate_spot_position_reports_from_wallet(
1616 &self,
1617 account_id: AccountId,
1618 instrument_id: Option<InstrumentId>,
1619 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1620 let params = BybitWalletBalanceParams {
1621 account_type: BybitAccountType::Unified,
1622 coin: None,
1623 };
1624
1625 let response = self.inner.get_wallet_balance(¶ms).await?;
1626 let ts_init = self.generate_ts_init();
1627
1628 let mut wallet_by_coin: HashMap<Ustr, Decimal> = HashMap::new();
1629
1630 for wallet in &response.result.list {
1631 for coin_balance in &wallet.coin {
1632 let balance = coin_balance.wallet_balance - coin_balance.spot_borrow;
1633 *wallet_by_coin
1634 .entry(coin_balance.coin)
1635 .or_insert(Decimal::ZERO) += balance;
1636 }
1637 }
1638
1639 let mut reports = Vec::new();
1640
1641 if let Some(instrument_id) = instrument_id {
1642 if let Some(instrument) = self.instruments_cache.get(&instrument_id.symbol.inner()) {
1643 let base_currency = instrument
1644 .base_currency()
1645 .expect("SPOT instrument should have base currency");
1646 let coin = base_currency.code;
1647 let wallet_balance = wallet_by_coin.get(&coin).copied().unwrap_or(Decimal::ZERO);
1648
1649 let side = if wallet_balance > Decimal::ZERO {
1650 PositionSideSpecified::Long
1651 } else if wallet_balance < Decimal::ZERO {
1652 PositionSideSpecified::Short
1653 } else {
1654 PositionSideSpecified::Flat
1655 };
1656
1657 let abs_balance = wallet_balance.abs();
1658 let quantity = Quantity::from_decimal_dp(abs_balance, instrument.size_precision())?;
1659
1660 let report = PositionStatusReport::new(
1661 account_id,
1662 instrument_id,
1663 side,
1664 quantity,
1665 ts_init,
1666 ts_init,
1667 None,
1668 None,
1669 None,
1670 );
1671
1672 reports.push(report);
1673 }
1674 } else {
1675 for entry in self.instruments_cache.iter() {
1677 let symbol = entry.key();
1678 let instrument = entry.value();
1679 if !symbol.as_str().ends_with("-SPOT") {
1681 continue;
1682 }
1683
1684 let base_currency = match instrument.base_currency() {
1685 Some(currency) => currency,
1686 None => continue,
1687 };
1688
1689 let coin = base_currency.code;
1690 let wallet_balance = wallet_by_coin.get(&coin).copied().unwrap_or(Decimal::ZERO);
1691
1692 if wallet_balance.is_zero() {
1693 continue;
1694 }
1695
1696 let side = if wallet_balance > Decimal::ZERO {
1697 PositionSideSpecified::Long
1698 } else if wallet_balance < Decimal::ZERO {
1699 PositionSideSpecified::Short
1700 } else {
1701 PositionSideSpecified::Flat
1702 };
1703
1704 let abs_balance = wallet_balance.abs();
1705 let quantity = Quantity::from_decimal_dp(abs_balance, instrument.size_precision())?;
1706
1707 if quantity.is_zero() {
1708 continue;
1709 }
1710
1711 let report = PositionStatusReport::new(
1712 account_id,
1713 instrument.id(),
1714 side,
1715 quantity,
1716 ts_init,
1717 ts_init,
1718 None,
1719 None,
1720 None,
1721 );
1722
1723 reports.push(report);
1724 }
1725 }
1726
1727 Ok(reports)
1728 }
1729
1730 #[allow(clippy::too_many_arguments)]
1741 pub async fn submit_order(
1742 &self,
1743 account_id: AccountId,
1744 product_type: BybitProductType,
1745 instrument_id: InstrumentId,
1746 client_order_id: ClientOrderId,
1747 order_side: OrderSide,
1748 order_type: OrderType,
1749 quantity: Quantity,
1750 time_in_force: TimeInForce,
1751 price: Option<Price>,
1752 reduce_only: bool,
1753 is_leverage: bool,
1754 ) -> anyhow::Result<OrderStatusReport> {
1755 let instrument = self.instrument_from_cache(&instrument_id.symbol)?;
1756 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())?;
1757
1758 let bybit_side = match order_side {
1759 OrderSide::Buy => BybitOrderSide::Buy,
1760 OrderSide::Sell => BybitOrderSide::Sell,
1761 _ => anyhow::bail!("Invalid order side: {order_side:?}"),
1762 };
1763
1764 let bybit_order_type = match order_type {
1765 OrderType::Market => BybitOrderType::Market,
1766 OrderType::Limit => BybitOrderType::Limit,
1767 _ => anyhow::bail!("Unsupported order type: {order_type:?}"),
1768 };
1769
1770 let bybit_tif = match time_in_force {
1771 TimeInForce::Gtc => BybitTimeInForce::Gtc,
1772 TimeInForce::Ioc => BybitTimeInForce::Ioc,
1773 TimeInForce::Fok => BybitTimeInForce::Fok,
1774 _ => anyhow::bail!("Unsupported time in force: {time_in_force:?}"),
1775 };
1776
1777 let mut order_entry = BybitBatchPlaceOrderEntryBuilder::default();
1778 order_entry.symbol(bybit_symbol.raw_symbol().to_string());
1779 order_entry.side(bybit_side);
1780 order_entry.order_type(bybit_order_type);
1781 order_entry.qty(quantity.to_string());
1782 order_entry.time_in_force(Some(bybit_tif));
1783 order_entry.order_link_id(client_order_id.to_string());
1784
1785 if let Some(price) = price {
1786 order_entry.price(Some(price.to_string()));
1787 }
1788
1789 if reduce_only {
1790 order_entry.reduce_only(Some(true));
1791 }
1792
1793 let is_leverage_value = if product_type == BybitProductType::Spot {
1795 Some(i32::from(is_leverage))
1796 } else {
1797 None
1798 };
1799 order_entry.is_leverage(is_leverage_value);
1800
1801 let order_entry = order_entry.build().map_err(|e| anyhow::anyhow!(e))?;
1802
1803 let mut params = BybitPlaceOrderParamsBuilder::default();
1804 params.category(product_type);
1805 params.order(order_entry);
1806
1807 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1808
1809 let body = serde_json::to_value(¶ms)?;
1810 let response = self.inner.place_order(&body).await?;
1811
1812 let order_id = response
1813 .result
1814 .order_id
1815 .ok_or_else(|| anyhow::anyhow!("No order_id in response"))?;
1816
1817 let mut query_params = BybitOpenOrdersParamsBuilder::default();
1819 query_params.category(product_type);
1820 query_params.order_id(order_id.as_str().to_string());
1821
1822 let query_params = query_params.build().map_err(|e| anyhow::anyhow!(e))?;
1823 let order_response: BybitOpenOrdersResponse = self
1824 .inner
1825 .send_request(
1826 Method::GET,
1827 "/v5/order/realtime",
1828 Some(&query_params),
1829 None,
1830 true,
1831 )
1832 .await?;
1833
1834 let order = order_response
1835 .result
1836 .list
1837 .into_iter()
1838 .next()
1839 .ok_or_else(|| anyhow::anyhow!("No order returned after submission"))?;
1840
1841 if order.order_status == crate::common::enums::BybitOrderStatus::Rejected
1844 && (order.cum_exec_qty.as_str() == "0" || order.cum_exec_qty.is_empty())
1845 {
1846 anyhow::bail!("Order rejected: {}", order.reject_reason);
1847 }
1848
1849 let ts_init = self.generate_ts_init();
1850
1851 parse_order_status_report(&order, &instrument, account_id, ts_init)
1852 }
1853
1854 pub async fn cancel_order(
1864 &self,
1865 account_id: AccountId,
1866 product_type: BybitProductType,
1867 instrument_id: InstrumentId,
1868 client_order_id: Option<ClientOrderId>,
1869 venue_order_id: Option<VenueOrderId>,
1870 ) -> anyhow::Result<OrderStatusReport> {
1871 let instrument = self.instrument_from_cache(&instrument_id.symbol)?;
1872 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())?;
1873
1874 let mut cancel_entry = BybitBatchCancelOrderEntryBuilder::default();
1875 cancel_entry.symbol(bybit_symbol.raw_symbol().to_string());
1876
1877 if let Some(venue_order_id) = venue_order_id {
1878 cancel_entry.order_id(venue_order_id.to_string());
1879 } else if let Some(client_order_id) = client_order_id {
1880 cancel_entry.order_link_id(client_order_id.to_string());
1881 } else {
1882 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1883 }
1884
1885 let cancel_entry = cancel_entry.build().map_err(|e| anyhow::anyhow!(e))?;
1886
1887 let mut params = BybitCancelOrderParamsBuilder::default();
1888 params.category(product_type);
1889 params.order(cancel_entry);
1890
1891 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1892 let body = serde_json::to_vec(¶ms)?;
1893
1894 let response: BybitPlaceOrderResponse = self
1895 .inner
1896 .send_request::<_, ()>(Method::POST, "/v5/order/cancel", None, Some(body), true)
1897 .await?;
1898
1899 let order_id = response
1900 .result
1901 .order_id
1902 .ok_or_else(|| anyhow::anyhow!("No order_id in cancel response"))?;
1903
1904 let mut query_params = BybitOpenOrdersParamsBuilder::default();
1906 query_params.category(product_type);
1907 query_params.order_id(order_id.as_str().to_string());
1908
1909 let query_params = query_params.build().map_err(|e| anyhow::anyhow!(e))?;
1910 let order_response: BybitOrderHistoryResponse = self
1911 .inner
1912 .send_request(
1913 Method::GET,
1914 "/v5/order/history",
1915 Some(&query_params),
1916 None,
1917 true,
1918 )
1919 .await?;
1920
1921 let order = order_response
1922 .result
1923 .list
1924 .into_iter()
1925 .next()
1926 .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1927
1928 let ts_init = self.generate_ts_init();
1929
1930 parse_order_status_report(&order, &instrument, account_id, ts_init)
1931 }
1932
1933 pub async fn batch_cancel_orders(
1943 &self,
1944 account_id: AccountId,
1945 product_type: BybitProductType,
1946 instrument_ids: Vec<InstrumentId>,
1947 client_order_ids: Vec<Option<ClientOrderId>>,
1948 venue_order_ids: Vec<Option<VenueOrderId>>,
1949 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1950 if instrument_ids.len() != client_order_ids.len()
1951 || instrument_ids.len() != venue_order_ids.len()
1952 {
1953 anyhow::bail!(
1954 "instrument_ids, client_order_ids, and venue_order_ids must have the same length"
1955 );
1956 }
1957
1958 if instrument_ids.is_empty() {
1959 return Ok(Vec::new());
1960 }
1961
1962 if instrument_ids.len() > 20 {
1963 anyhow::bail!("Batch cancel limit is 20 orders per request");
1964 }
1965
1966 let mut cancel_entries = Vec::new();
1967
1968 for ((instrument_id, client_order_id), venue_order_id) in instrument_ids
1969 .iter()
1970 .zip(client_order_ids.iter())
1971 .zip(venue_order_ids.iter())
1972 {
1973 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())?;
1974 let mut cancel_entry = BybitBatchCancelOrderEntryBuilder::default();
1975 cancel_entry.symbol(bybit_symbol.raw_symbol().to_string());
1976
1977 if let Some(venue_order_id) = venue_order_id {
1978 cancel_entry.order_id(venue_order_id.to_string());
1979 } else if let Some(client_order_id) = client_order_id {
1980 cancel_entry.order_link_id(client_order_id.to_string());
1981 } else {
1982 anyhow::bail!(
1983 "Either client_order_id or venue_order_id must be provided for each order"
1984 );
1985 }
1986
1987 cancel_entries.push(cancel_entry.build().map_err(|e| anyhow::anyhow!(e))?);
1988 }
1989
1990 let mut params = BybitBatchCancelOrderParamsBuilder::default();
1991 params.category(product_type);
1992 params.request(cancel_entries);
1993
1994 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1995 let body = serde_json::to_vec(¶ms)?;
1996
1997 let _response: BybitPlaceOrderResponse = self
1998 .inner
1999 .send_request::<_, ()>(
2000 Method::POST,
2001 "/v5/order/cancel-batch",
2002 None,
2003 Some(body),
2004 true,
2005 )
2006 .await?;
2007
2008 let mut reports = Vec::new();
2010 for (instrument_id, (client_order_id, venue_order_id)) in instrument_ids
2011 .iter()
2012 .zip(client_order_ids.iter().zip(venue_order_ids.iter()))
2013 {
2014 let Ok(instrument) = self.instrument_from_cache(&instrument_id.symbol) else {
2015 tracing::debug!(
2016 symbol = %instrument_id.symbol,
2017 "Skipping cancelled order report for instrument not in cache"
2018 );
2019 continue;
2020 };
2021
2022 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())?;
2023
2024 let mut query_params = BybitOpenOrdersParamsBuilder::default();
2025 query_params.category(product_type);
2026 query_params.symbol(bybit_symbol.raw_symbol().to_string());
2027
2028 if let Some(venue_order_id) = venue_order_id {
2029 query_params.order_id(venue_order_id.to_string());
2030 } else if let Some(client_order_id) = client_order_id {
2031 query_params.order_link_id(client_order_id.to_string());
2032 }
2033
2034 let query_params = query_params.build().map_err(|e| anyhow::anyhow!(e))?;
2035 let order_response: BybitOrderHistoryResponse = self
2036 .inner
2037 .send_request(
2038 Method::GET,
2039 "/v5/order/history",
2040 Some(&query_params),
2041 None,
2042 true,
2043 )
2044 .await?;
2045
2046 if let Some(order) = order_response.result.list.into_iter().next() {
2047 let ts_init = self.generate_ts_init();
2048 let report = parse_order_status_report(&order, &instrument, account_id, ts_init)?;
2049 reports.push(report);
2050 }
2051 }
2052
2053 Ok(reports)
2054 }
2055
2056 pub async fn cancel_all_orders(
2065 &self,
2066 account_id: AccountId,
2067 product_type: BybitProductType,
2068 instrument_id: InstrumentId,
2069 ) -> anyhow::Result<Vec<OrderStatusReport>> {
2070 let instrument = self.instrument_from_cache(&instrument_id.symbol)?;
2071 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())?;
2072
2073 let mut params = BybitCancelAllOrdersParamsBuilder::default();
2074 params.category(product_type);
2075 params.symbol(bybit_symbol.raw_symbol().to_string());
2076
2077 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2078 let body = serde_json::to_vec(¶ms)?;
2079
2080 let _response: crate::common::models::BybitListResponse<serde_json::Value> = self
2081 .inner
2082 .send_request::<_, ()>(Method::POST, "/v5/order/cancel-all", None, Some(body), true)
2083 .await?;
2084
2085 let mut query_params = BybitOrderHistoryParamsBuilder::default();
2087 query_params.category(product_type);
2088 query_params.symbol(bybit_symbol.raw_symbol().to_string());
2089 query_params.limit(50u32);
2090
2091 let query_params = query_params.build().map_err(|e| anyhow::anyhow!(e))?;
2092 let order_response: BybitOrderHistoryResponse = self
2093 .inner
2094 .send_request(
2095 Method::GET,
2096 "/v5/order/history",
2097 Some(&query_params),
2098 None,
2099 true,
2100 )
2101 .await?;
2102
2103 let ts_init = self.generate_ts_init();
2104
2105 let mut reports = Vec::new();
2106 for order in order_response.result.list {
2107 if let Ok(report) = parse_order_status_report(&order, &instrument, account_id, ts_init)
2108 {
2109 reports.push(report);
2110 }
2111 }
2112
2113 Ok(reports)
2114 }
2115
2116 #[allow(clippy::too_many_arguments)]
2127 pub async fn modify_order(
2128 &self,
2129 account_id: AccountId,
2130 product_type: BybitProductType,
2131 instrument_id: InstrumentId,
2132 client_order_id: Option<ClientOrderId>,
2133 venue_order_id: Option<VenueOrderId>,
2134 quantity: Option<Quantity>,
2135 price: Option<Price>,
2136 ) -> anyhow::Result<OrderStatusReport> {
2137 let instrument = self.instrument_from_cache(&instrument_id.symbol)?;
2138 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())?;
2139
2140 let mut amend_entry = BybitBatchAmendOrderEntryBuilder::default();
2141 amend_entry.symbol(bybit_symbol.raw_symbol().to_string());
2142
2143 if let Some(venue_order_id) = venue_order_id {
2144 amend_entry.order_id(venue_order_id.to_string());
2145 } else if let Some(client_order_id) = client_order_id {
2146 amend_entry.order_link_id(client_order_id.to_string());
2147 } else {
2148 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
2149 }
2150
2151 if let Some(quantity) = quantity {
2152 amend_entry.qty(Some(quantity.to_string()));
2153 }
2154
2155 if let Some(price) = price {
2156 amend_entry.price(Some(price.to_string()));
2157 }
2158
2159 let amend_entry = amend_entry.build().map_err(|e| anyhow::anyhow!(e))?;
2160
2161 let mut params = BybitAmendOrderParamsBuilder::default();
2162 params.category(product_type);
2163 params.order(amend_entry);
2164
2165 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2166 let body = serde_json::to_vec(¶ms)?;
2167
2168 let response: BybitPlaceOrderResponse = self
2169 .inner
2170 .send_request::<_, ()>(Method::POST, "/v5/order/amend", None, Some(body), true)
2171 .await?;
2172
2173 let order_id = response
2174 .result
2175 .order_id
2176 .ok_or_else(|| anyhow::anyhow!("No order_id in amend response"))?;
2177
2178 let mut query_params = BybitOpenOrdersParamsBuilder::default();
2180 query_params.category(product_type);
2181 query_params.order_id(order_id.as_str().to_string());
2182
2183 let query_params = query_params.build().map_err(|e| anyhow::anyhow!(e))?;
2184 let order_response: BybitOpenOrdersResponse = self
2185 .inner
2186 .send_request(
2187 Method::GET,
2188 "/v5/order/realtime",
2189 Some(&query_params),
2190 None,
2191 true,
2192 )
2193 .await?;
2194
2195 let order = order_response
2196 .result
2197 .list
2198 .into_iter()
2199 .next()
2200 .ok_or_else(|| anyhow::anyhow!("No order returned after modification"))?;
2201
2202 let ts_init = self.generate_ts_init();
2203
2204 parse_order_status_report(&order, &instrument, account_id, ts_init)
2205 }
2206
2207 pub async fn query_order(
2216 &self,
2217 account_id: AccountId,
2218 product_type: BybitProductType,
2219 instrument_id: InstrumentId,
2220 client_order_id: Option<ClientOrderId>,
2221 venue_order_id: Option<VenueOrderId>,
2222 ) -> anyhow::Result<Option<OrderStatusReport>> {
2223 tracing::info!(
2224 "query_order called: instrument_id={}, client_order_id={:?}, venue_order_id={:?}",
2225 instrument_id,
2226 client_order_id,
2227 venue_order_id
2228 );
2229
2230 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())?;
2231
2232 let mut params = BybitOpenOrdersParamsBuilder::default();
2233 params.category(product_type);
2234 params.symbol(bybit_symbol.raw_symbol().to_string());
2236
2237 if let Some(venue_order_id) = venue_order_id {
2238 params.order_id(venue_order_id.to_string());
2239 } else if let Some(client_order_id) = client_order_id {
2240 params.order_link_id(client_order_id.to_string());
2241 } else {
2242 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
2243 }
2244
2245 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2246 let mut response: BybitOpenOrdersResponse = self
2247 .inner
2248 .send_request(Method::GET, "/v5/order/realtime", Some(¶ms), None, true)
2249 .await?;
2250
2251 if response.result.list.is_empty() {
2252 tracing::debug!("Order not found in open orders, trying with StopOrder filter");
2253
2254 let mut stop_params = BybitOpenOrdersParamsBuilder::default();
2255 stop_params.category(product_type);
2256 stop_params.symbol(bybit_symbol.raw_symbol().to_string());
2257 stop_params.order_filter("StopOrder".to_string());
2258
2259 if let Some(venue_order_id) = venue_order_id {
2260 stop_params.order_id(venue_order_id.to_string());
2261 } else if let Some(client_order_id) = client_order_id {
2262 stop_params.order_link_id(client_order_id.to_string());
2263 }
2264
2265 let stop_params = stop_params.build().map_err(|e| anyhow::anyhow!(e))?;
2266 response = self
2267 .inner
2268 .send_request(
2269 Method::GET,
2270 "/v5/order/realtime",
2271 Some(&stop_params),
2272 None,
2273 true,
2274 )
2275 .await?;
2276 }
2277
2278 if response.result.list.is_empty() {
2280 tracing::debug!("Order not found in open orders, checking order history");
2281
2282 let mut history_params = BybitOrderHistoryParamsBuilder::default();
2283 history_params.category(product_type);
2284 history_params.symbol(bybit_symbol.raw_symbol().to_string());
2285
2286 if let Some(venue_order_id) = venue_order_id {
2287 history_params.order_id(venue_order_id.to_string());
2288 } else if let Some(client_order_id) = client_order_id {
2289 history_params.order_link_id(client_order_id.to_string());
2290 }
2291
2292 let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
2293
2294 let mut history_response: BybitOrderHistoryResponse = self
2295 .inner
2296 .send_request(
2297 Method::GET,
2298 "/v5/order/history",
2299 Some(&history_params),
2300 None,
2301 true,
2302 )
2303 .await?;
2304
2305 if history_response.result.list.is_empty() {
2306 tracing::debug!("Order not found in order history, trying with StopOrder filter");
2307
2308 let mut stop_history_params = BybitOrderHistoryParamsBuilder::default();
2309 stop_history_params.category(product_type);
2310 stop_history_params.symbol(bybit_symbol.raw_symbol().to_string());
2311 stop_history_params.order_filter("StopOrder".to_string());
2312
2313 if let Some(venue_order_id) = venue_order_id {
2314 stop_history_params.order_id(venue_order_id.to_string());
2315 } else if let Some(client_order_id) = client_order_id {
2316 stop_history_params.order_link_id(client_order_id.to_string());
2317 }
2318
2319 let stop_history_params = stop_history_params
2320 .build()
2321 .map_err(|e| anyhow::anyhow!(e))?;
2322
2323 history_response = self
2324 .inner
2325 .send_request(
2326 Method::GET,
2327 "/v5/order/history",
2328 Some(&stop_history_params),
2329 None,
2330 true,
2331 )
2332 .await?;
2333
2334 if history_response.result.list.is_empty() {
2335 tracing::debug!(
2336 "Order not found in order history with StopOrder filter either"
2337 );
2338 return Ok(None);
2339 }
2340 }
2341
2342 response.result.list = history_response.result.list;
2344 }
2345
2346 let order = &response.result.list[0];
2347 let ts_init = self.generate_ts_init();
2348
2349 tracing::debug!(
2350 "Query order response: symbol={}, order_id={}, order_link_id={}",
2351 order.symbol.as_str(),
2352 order.order_id.as_str(),
2353 order.order_link_id.as_str()
2354 );
2355
2356 let instrument = self
2357 .instrument_from_cache(&instrument_id.symbol)
2358 .map_err(|e| {
2359 tracing::error!(
2360 "Instrument cache miss for symbol '{}': {}",
2361 instrument_id.symbol.as_str(),
2362 e
2363 );
2364 anyhow::anyhow!(
2365 "Failed to query order {}: {}",
2366 client_order_id
2367 .as_ref()
2368 .map(|id| id.to_string())
2369 .or_else(|| venue_order_id.as_ref().map(|id| id.to_string()))
2370 .unwrap_or_else(|| "unknown".to_string()),
2371 e
2372 )
2373 })?;
2374
2375 tracing::debug!("Retrieved instrument from cache: id={}", instrument.id());
2376
2377 let report =
2378 parse_order_status_report(order, &instrument, account_id, ts_init).map_err(|e| {
2379 tracing::error!(
2380 "Failed to parse order status report for {}: {}",
2381 order.order_link_id.as_str(),
2382 e
2383 );
2384 e
2385 })?;
2386
2387 tracing::debug!(
2388 "Successfully created OrderStatusReport for {}",
2389 order.order_link_id.as_str()
2390 );
2391
2392 Ok(Some(report))
2393 }
2394
2395 pub async fn request_instruments(
2401 &self,
2402 product_type: BybitProductType,
2403 symbol: Option<String>,
2404 ) -> anyhow::Result<Vec<InstrumentAny>> {
2405 let ts_init = self.generate_ts_init();
2406
2407 let mut instruments = Vec::new();
2408
2409 let default_fee_rate = |symbol: ustr::Ustr| BybitFeeRate {
2410 symbol,
2411 taker_fee_rate: "0.001".to_string(),
2412 maker_fee_rate: "0.001".to_string(),
2413 base_coin: None,
2414 };
2415
2416 match product_type {
2417 BybitProductType::Spot => {
2418 let fee_map: HashMap<_, _> = {
2420 let mut fee_params = BybitFeeRateParamsBuilder::default();
2421 fee_params.category(product_type);
2422 if let Ok(params) = fee_params.build() {
2423 match self.inner.get_fee_rate(¶ms).await {
2424 Ok(fee_response) => fee_response
2425 .result
2426 .list
2427 .into_iter()
2428 .map(|f| (f.symbol, f))
2429 .collect(),
2430 Err(BybitHttpError::MissingCredentials) => {
2431 tracing::warn!("Missing credentials for fee rates, using defaults");
2432 HashMap::new()
2433 }
2434 Err(e) => return Err(e.into()),
2435 }
2436 } else {
2437 HashMap::new()
2438 }
2439 };
2440
2441 let mut cursor: Option<String> = None;
2442
2443 loop {
2444 let params = BybitInstrumentsInfoParams {
2445 category: product_type,
2446 symbol: symbol.clone(),
2447 status: None,
2448 base_coin: None,
2449 limit: Some(1000),
2450 cursor: cursor.clone(),
2451 };
2452
2453 let response: BybitInstrumentSpotResponse =
2454 self.inner.get_instruments(¶ms).await?;
2455
2456 for definition in response.result.list {
2457 let fee_rate = fee_map
2458 .get(&definition.symbol)
2459 .cloned()
2460 .unwrap_or_else(|| default_fee_rate(definition.symbol));
2461 if let Ok(instrument) =
2462 parse_spot_instrument(&definition, &fee_rate, ts_init, ts_init)
2463 {
2464 instruments.push(instrument);
2465 }
2466 }
2467
2468 cursor = response.result.next_page_cursor;
2469 if cursor.as_ref().is_none_or(|c| c.is_empty()) {
2470 break;
2471 }
2472 }
2473 }
2474 BybitProductType::Linear => {
2475 let fee_map: HashMap<_, _> = {
2477 let mut fee_params = BybitFeeRateParamsBuilder::default();
2478 fee_params.category(product_type);
2479 if let Ok(params) = fee_params.build() {
2480 match self.inner.get_fee_rate(¶ms).await {
2481 Ok(fee_response) => fee_response
2482 .result
2483 .list
2484 .into_iter()
2485 .map(|f| (f.symbol, f))
2486 .collect(),
2487 Err(BybitHttpError::MissingCredentials) => {
2488 tracing::warn!("Missing credentials for fee rates, using defaults");
2489 HashMap::new()
2490 }
2491 Err(e) => return Err(e.into()),
2492 }
2493 } else {
2494 HashMap::new()
2495 }
2496 };
2497
2498 let mut cursor: Option<String> = None;
2499
2500 loop {
2501 let params = BybitInstrumentsInfoParams {
2502 category: product_type,
2503 symbol: symbol.clone(),
2504 status: None,
2505 base_coin: None,
2506 limit: Some(1000),
2507 cursor: cursor.clone(),
2508 };
2509
2510 let response: BybitInstrumentLinearResponse =
2511 self.inner.get_instruments(¶ms).await?;
2512
2513 for definition in response.result.list {
2514 let fee_rate = fee_map
2515 .get(&definition.symbol)
2516 .cloned()
2517 .unwrap_or_else(|| default_fee_rate(definition.symbol));
2518 if let Ok(instrument) =
2519 parse_linear_instrument(&definition, &fee_rate, ts_init, ts_init)
2520 {
2521 instruments.push(instrument);
2522 }
2523 }
2524
2525 cursor = response.result.next_page_cursor;
2526 if cursor.as_ref().is_none_or(|c| c.is_empty()) {
2527 break;
2528 }
2529 }
2530 }
2531 BybitProductType::Inverse => {
2532 let fee_map: HashMap<_, _> = {
2534 let mut fee_params = BybitFeeRateParamsBuilder::default();
2535 fee_params.category(product_type);
2536 if let Ok(params) = fee_params.build() {
2537 match self.inner.get_fee_rate(¶ms).await {
2538 Ok(fee_response) => fee_response
2539 .result
2540 .list
2541 .into_iter()
2542 .map(|f| (f.symbol, f))
2543 .collect(),
2544 Err(BybitHttpError::MissingCredentials) => {
2545 tracing::warn!("Missing credentials for fee rates, using defaults");
2546 HashMap::new()
2547 }
2548 Err(e) => return Err(e.into()),
2549 }
2550 } else {
2551 HashMap::new()
2552 }
2553 };
2554
2555 let mut cursor: Option<String> = None;
2556
2557 loop {
2558 let params = BybitInstrumentsInfoParams {
2559 category: product_type,
2560 symbol: symbol.clone(),
2561 status: None,
2562 base_coin: None,
2563 limit: Some(1000),
2564 cursor: cursor.clone(),
2565 };
2566
2567 let response: BybitInstrumentInverseResponse =
2568 self.inner.get_instruments(¶ms).await?;
2569
2570 for definition in response.result.list {
2571 let fee_rate = fee_map
2572 .get(&definition.symbol)
2573 .cloned()
2574 .unwrap_or_else(|| default_fee_rate(definition.symbol));
2575 if let Ok(instrument) =
2576 parse_inverse_instrument(&definition, &fee_rate, ts_init, ts_init)
2577 {
2578 instruments.push(instrument);
2579 }
2580 }
2581
2582 cursor = response.result.next_page_cursor;
2583 if cursor.as_ref().is_none_or(|c| c.is_empty()) {
2584 break;
2585 }
2586 }
2587 }
2588 BybitProductType::Option => {
2589 let mut cursor: Option<String> = None;
2590
2591 loop {
2592 let params = BybitInstrumentsInfoParams {
2593 category: product_type,
2594 symbol: symbol.clone(),
2595 status: None,
2596 base_coin: None,
2597 limit: Some(1000),
2598 cursor: cursor.clone(),
2599 };
2600
2601 let response: BybitInstrumentOptionResponse =
2602 self.inner.get_instruments(¶ms).await?;
2603
2604 for definition in response.result.list {
2605 if let Ok(instrument) =
2606 parse_option_instrument(&definition, ts_init, ts_init)
2607 {
2608 instruments.push(instrument);
2609 }
2610 }
2611
2612 cursor = response.result.next_page_cursor;
2613 if cursor.as_ref().is_none_or(|c| c.is_empty()) {
2614 break;
2615 }
2616 }
2617 }
2618 }
2619
2620 for instrument in &instruments {
2621 self.cache_instrument(instrument.clone());
2622 }
2623
2624 Ok(instruments)
2625 }
2626
2627 pub async fn request_trades(
2647 &self,
2648 product_type: BybitProductType,
2649 instrument_id: InstrumentId,
2650 limit: Option<u32>,
2651 ) -> anyhow::Result<Vec<TradeTick>> {
2652 let instrument = self.instrument_from_cache(&instrument_id.symbol)?;
2653 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())?;
2654
2655 let mut params_builder = BybitTradesParamsBuilder::default();
2656 params_builder.category(product_type);
2657 params_builder.symbol(bybit_symbol.raw_symbol().to_string());
2658 if let Some(limit_val) = limit {
2659 params_builder.limit(limit_val);
2660 }
2661
2662 let params = params_builder.build().map_err(|e| anyhow::anyhow!(e))?;
2663 let response = self.inner.get_recent_trades(¶ms).await?;
2664
2665 let ts_init = self.generate_ts_init();
2666 let mut trades = Vec::new();
2667
2668 for trade in response.result.list {
2669 if let Ok(trade_tick) = parse_trade_tick(&trade, &instrument, ts_init) {
2670 trades.push(trade_tick);
2671 }
2672 }
2673
2674 Ok(trades)
2675 }
2676
2677 pub async fn request_bars(
2690 &self,
2691 product_type: BybitProductType,
2692 bar_type: BarType,
2693 start: Option<DateTime<Utc>>,
2694 end: Option<DateTime<Utc>>,
2695 limit: Option<u32>,
2696 timestamp_on_close: bool,
2697 ) -> anyhow::Result<Vec<Bar>> {
2698 let instrument = self.instrument_from_cache(&bar_type.instrument_id().symbol)?;
2699 let bybit_symbol = BybitSymbol::new(bar_type.instrument_id().symbol.as_str())?;
2700
2701 let interval = bar_spec_to_bybit_interval(
2703 bar_type.spec().aggregation,
2704 bar_type.spec().step.get() as u64,
2705 )?;
2706
2707 let start_ms = start.map(|dt| dt.timestamp_millis());
2708 let mut all_bars: Vec<Bar> = Vec::new();
2709 let mut seen_timestamps: std::collections::HashSet<i64> = std::collections::HashSet::new();
2710
2711 let mut current_end = end.map(|dt| dt.timestamp_millis());
2720 let mut page_count = 0;
2721
2722 loop {
2723 page_count += 1;
2724
2725 let mut params_builder = BybitKlinesParamsBuilder::default();
2726 params_builder.category(product_type);
2727 params_builder.symbol(bybit_symbol.raw_symbol().to_string());
2728 params_builder.interval(interval);
2729 params_builder.limit(1000u32); if let Some(start_val) = start_ms {
2732 params_builder.start(start_val);
2733 }
2734 if let Some(end_val) = current_end {
2735 params_builder.end(end_val);
2736 }
2737
2738 let params = params_builder.build().map_err(|e| anyhow::anyhow!(e))?;
2739 let response = self.inner.get_klines(¶ms).await?;
2740
2741 let klines = response.result.list;
2742 if klines.is_empty() {
2743 break;
2744 }
2745
2746 let mut sorted_klines = klines;
2748 sorted_klines.sort_by_key(|k| k.start.parse::<i64>().unwrap_or(0));
2749
2750 let ts_init = self.generate_ts_init();
2752 let mut new_bars = Vec::new();
2753
2754 for kline in &sorted_klines {
2755 let start_time = kline.start.parse::<i64>().unwrap_or(0);
2756 if !seen_timestamps.contains(&start_time)
2757 && let Ok(bar) =
2758 parse_kline_bar(kline, &instrument, bar_type, timestamp_on_close, ts_init)
2759 {
2760 new_bars.push(bar);
2761 }
2762 }
2763
2764 if new_bars.is_empty() {
2766 break;
2767 }
2768
2769 all_bars.splice(0..0, new_bars);
2772 seen_timestamps.extend(
2773 sorted_klines
2774 .iter()
2775 .filter_map(|k| k.start.parse::<i64>().ok()),
2776 );
2777
2778 if let Some(limit_val) = limit
2780 && all_bars.len() >= limit_val as usize
2781 {
2782 break;
2783 }
2784
2785 let earliest_bar_time = sorted_klines[0].start.parse::<i64>().unwrap_or(0);
2788 if let Some(start_val) = start_ms
2789 && earliest_bar_time <= start_val
2790 {
2791 break;
2792 }
2793
2794 current_end = Some(earliest_bar_time - 1);
2795
2796 if page_count > 100 {
2798 break;
2799 }
2800 }
2801
2802 if let Some(limit_val) = limit {
2805 let limit_usize = limit_val as usize;
2806 if all_bars.len() > limit_usize {
2807 let start_idx = all_bars.len() - limit_usize;
2808 return Ok(all_bars[start_idx..].to_vec());
2809 }
2810 }
2811
2812 Ok(all_bars)
2813 }
2814
2815 pub async fn request_fee_rates(
2827 &self,
2828 product_type: BybitProductType,
2829 symbol: Option<String>,
2830 base_coin: Option<String>,
2831 ) -> anyhow::Result<Vec<BybitFeeRate>> {
2832 let params = BybitFeeRateParams {
2833 category: product_type,
2834 symbol,
2835 base_coin,
2836 };
2837
2838 let response = self.inner.get_fee_rate(¶ms).await?;
2839 Ok(response.result.list)
2840 }
2841
2842 pub async fn request_account_state(
2854 &self,
2855 account_type: BybitAccountType,
2856 account_id: AccountId,
2857 ) -> anyhow::Result<AccountState> {
2858 let params = BybitWalletBalanceParams {
2859 account_type,
2860 coin: None,
2861 };
2862
2863 let response = self.inner.get_wallet_balance(¶ms).await?;
2864 let ts_init = self.generate_ts_init();
2865
2866 let wallet_balance = response
2868 .result
2869 .list
2870 .first()
2871 .ok_or_else(|| anyhow::anyhow!("No wallet balance found in response"))?;
2872
2873 parse_account_state(wallet_balance, account_id, ts_init)
2874 }
2875
2876 #[allow(clippy::too_many_arguments)]
2887 pub async fn request_order_status_reports(
2888 &self,
2889 account_id: AccountId,
2890 product_type: BybitProductType,
2891 instrument_id: Option<InstrumentId>,
2892 open_only: bool,
2893 start: Option<DateTime<Utc>>,
2894 end: Option<DateTime<Utc>>,
2895 limit: Option<u32>,
2896 ) -> anyhow::Result<Vec<OrderStatusReport>> {
2897 let symbol_param = if let Some(id) = instrument_id.as_ref() {
2899 let symbol_str = id.symbol.as_str();
2900 if symbol_str.is_empty() {
2901 None
2902 } else {
2903 Some(BybitSymbol::new(symbol_str)?.raw_symbol().to_string())
2904 }
2905 } else {
2906 None
2907 };
2908
2909 let settle_coins_to_query: Vec<Option<String>> =
2912 if product_type == BybitProductType::Linear && symbol_param.is_none() {
2913 vec![Some("USDT".to_string()), Some("USDC".to_string())]
2914 } else {
2915 match product_type {
2916 BybitProductType::Inverse => vec![None],
2917 _ => vec![None],
2918 }
2919 };
2920
2921 let mut all_collected_orders = Vec::new();
2922 let mut total_collected_across_coins = 0;
2923
2924 for settle_coin in settle_coins_to_query {
2925 let remaining_limit = if let Some(limit) = limit {
2926 let remaining = (limit as usize).saturating_sub(total_collected_across_coins);
2927 if remaining == 0 {
2928 break;
2929 }
2930 Some(remaining as u32)
2931 } else {
2932 None
2933 };
2934
2935 let orders_for_coin = if open_only {
2936 let mut all_orders = Vec::new();
2937 let mut cursor: Option<String> = None;
2938 let mut total_orders = 0;
2939
2940 loop {
2941 let remaining = if let Some(limit) = remaining_limit {
2942 (limit as usize).saturating_sub(total_orders)
2943 } else {
2944 usize::MAX
2945 };
2946
2947 if remaining == 0 {
2948 break;
2949 }
2950
2951 let page_limit = std::cmp::min(remaining, 50);
2953
2954 let mut p = BybitOpenOrdersParamsBuilder::default();
2955 p.category(product_type);
2956 if let Some(symbol) = symbol_param.clone() {
2957 p.symbol(symbol);
2958 }
2959 if let Some(coin) = settle_coin.clone() {
2960 p.settle_coin(coin);
2961 }
2962 p.limit(page_limit as u32);
2963 if let Some(c) = cursor {
2964 p.cursor(c);
2965 }
2966 let params = p.build().map_err(|e| anyhow::anyhow!(e))?;
2967 let response: BybitOpenOrdersResponse = self
2968 .inner
2969 .send_request(Method::GET, "/v5/order/realtime", Some(¶ms), None, true)
2970 .await?;
2971
2972 total_orders += response.result.list.len();
2973 all_orders.extend(response.result.list);
2974
2975 cursor = response.result.next_page_cursor;
2976 if cursor.as_ref().is_none_or(|c| c.is_empty()) {
2977 break;
2978 }
2979 }
2980
2981 all_orders
2982 } else {
2983 let mut all_orders = Vec::new();
2986 let mut open_orders = Vec::new();
2987 let mut cursor: Option<String> = None;
2988 let mut total_open_orders = 0;
2989
2990 loop {
2991 let remaining = if let Some(limit) = remaining_limit {
2992 (limit as usize).saturating_sub(total_open_orders)
2993 } else {
2994 usize::MAX
2995 };
2996
2997 if remaining == 0 {
2998 break;
2999 }
3000
3001 let page_limit = std::cmp::min(remaining, 50);
3003
3004 let mut open_params = BybitOpenOrdersParamsBuilder::default();
3005 open_params.category(product_type);
3006 if let Some(symbol) = symbol_param.clone() {
3007 open_params.symbol(symbol);
3008 }
3009 if let Some(coin) = settle_coin.clone() {
3010 open_params.settle_coin(coin);
3011 }
3012 open_params.limit(page_limit as u32);
3013 if let Some(c) = cursor {
3014 open_params.cursor(c);
3015 }
3016 let open_params = open_params.build().map_err(|e| anyhow::anyhow!(e))?;
3017 let open_response: BybitOpenOrdersResponse = self
3018 .inner
3019 .send_request(
3020 Method::GET,
3021 "/v5/order/realtime",
3022 Some(&open_params),
3023 None,
3024 true,
3025 )
3026 .await?;
3027
3028 total_open_orders += open_response.result.list.len();
3029 open_orders.extend(open_response.result.list);
3030
3031 cursor = open_response.result.next_page_cursor;
3032 if cursor.is_none() || cursor.as_ref().is_none_or(|c| c.is_empty()) {
3033 break;
3034 }
3035 }
3036
3037 let seen_order_ids: std::collections::HashSet<Ustr> =
3038 open_orders.iter().map(|o| o.order_id).collect();
3039
3040 all_orders.extend(open_orders);
3041
3042 let mut cursor: Option<String> = None;
3043 let mut total_history_orders = 0;
3044
3045 loop {
3046 let total_orders = total_open_orders + total_history_orders;
3047 let remaining = if let Some(limit) = remaining_limit {
3048 (limit as usize).saturating_sub(total_orders)
3049 } else {
3050 usize::MAX
3051 };
3052
3053 if remaining == 0 {
3054 break;
3055 }
3056
3057 let page_limit = std::cmp::min(remaining, 50);
3059
3060 let mut history_params = BybitOrderHistoryParamsBuilder::default();
3061 history_params.category(product_type);
3062 if let Some(symbol) = symbol_param.clone() {
3063 history_params.symbol(symbol);
3064 }
3065 if let Some(coin) = settle_coin.clone() {
3066 history_params.settle_coin(coin);
3067 }
3068 if let Some(start) = start {
3069 history_params.start_time(start.timestamp_millis());
3070 }
3071 if let Some(end) = end {
3072 history_params.end_time(end.timestamp_millis());
3073 }
3074 history_params.limit(page_limit as u32);
3075 if let Some(c) = cursor {
3076 history_params.cursor(c);
3077 }
3078 let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
3079 let history_response: BybitOrderHistoryResponse = self
3080 .inner
3081 .send_request(
3082 Method::GET,
3083 "/v5/order/history",
3084 Some(&history_params),
3085 None,
3086 true,
3087 )
3088 .await?;
3089
3090 for order in history_response.result.list {
3092 if !seen_order_ids.contains(&order.order_id) {
3093 all_orders.push(order);
3094 total_history_orders += 1;
3095 }
3096 }
3097
3098 cursor = history_response.result.next_page_cursor;
3099 if cursor.is_none() || cursor.as_ref().is_none_or(|c| c.is_empty()) {
3100 break;
3101 }
3102 }
3103
3104 all_orders
3105 };
3106
3107 total_collected_across_coins += orders_for_coin.len();
3108 all_collected_orders.extend(orders_for_coin);
3109 }
3110
3111 let ts_init = self.generate_ts_init();
3112
3113 let mut reports = Vec::new();
3114 for order in all_collected_orders {
3115 if let Some(ref instrument_id) = instrument_id {
3116 let instrument = self.instrument_from_cache(&instrument_id.symbol)?;
3117 if let Ok(report) =
3118 parse_order_status_report(&order, &instrument, account_id, ts_init)
3119 {
3120 reports.push(report);
3121 }
3122 } else {
3123 if !order.symbol.is_empty() {
3126 let symbol_with_product =
3127 Symbol::from_ustr_unchecked(make_bybit_symbol(order.symbol, product_type));
3128
3129 let Ok(instrument) = self.instrument_from_cache(&symbol_with_product) else {
3130 tracing::debug!(
3131 symbol = %order.symbol,
3132 full_symbol = %symbol_with_product,
3133 "Skipping order report for instrument not in cache"
3134 );
3135 continue;
3136 };
3137
3138 match parse_order_status_report(&order, &instrument, account_id, ts_init) {
3139 Ok(report) => reports.push(report),
3140 Err(e) => {
3141 tracing::error!("Failed to parse order status report: {e}");
3142 }
3143 }
3144 }
3145 }
3146 }
3147
3148 Ok(reports)
3149 }
3150
3151 pub async fn request_fill_reports(
3163 &self,
3164 account_id: AccountId,
3165 product_type: BybitProductType,
3166 instrument_id: Option<InstrumentId>,
3167 start: Option<i64>,
3168 end: Option<i64>,
3169 limit: Option<u32>,
3170 ) -> anyhow::Result<Vec<FillReport>> {
3171 let symbol = if let Some(id) = instrument_id {
3173 let bybit_symbol = BybitSymbol::new(id.symbol.as_str())?;
3174 Some(bybit_symbol.raw_symbol().to_string())
3175 } else {
3176 None
3177 };
3178
3179 let mut all_executions = Vec::new();
3181 let mut cursor: Option<String> = None;
3182 let mut total_executions = 0;
3183
3184 loop {
3185 let remaining = if let Some(limit) = limit {
3187 (limit as usize).saturating_sub(total_executions)
3188 } else {
3189 usize::MAX
3190 };
3191
3192 if remaining == 0 {
3194 break;
3195 }
3196
3197 let page_limit = std::cmp::min(remaining, 100);
3199
3200 let params = BybitTradeHistoryParams {
3201 category: product_type,
3202 symbol: symbol.clone(),
3203 base_coin: None,
3204 order_id: None,
3205 order_link_id: None,
3206 start_time: start,
3207 end_time: end,
3208 exec_type: None,
3209 limit: Some(page_limit as u32),
3210 cursor: cursor.clone(),
3211 };
3212
3213 let response = self.inner.get_trade_history(¶ms).await?;
3214 let list_len = response.result.list.len();
3215 all_executions.extend(response.result.list);
3216 total_executions += list_len;
3217
3218 cursor = response.result.next_page_cursor;
3219 if cursor.is_none() || cursor.as_ref().is_none_or(|c| c.is_empty()) {
3220 break;
3221 }
3222 }
3223
3224 let ts_init = self.generate_ts_init();
3225 let mut reports = Vec::new();
3226
3227 for execution in all_executions {
3228 let symbol_with_product =
3231 Symbol::from_ustr_unchecked(make_bybit_symbol(execution.symbol, product_type));
3232
3233 let Ok(instrument) = self.instrument_from_cache(&symbol_with_product) else {
3234 tracing::debug!(
3235 symbol = %execution.symbol,
3236 full_symbol = %symbol_with_product,
3237 "Skipping fill report for instrument not in cache"
3238 );
3239 continue;
3240 };
3241
3242 match parse_fill_report(&execution, account_id, &instrument, ts_init) {
3243 Ok(report) => reports.push(report),
3244 Err(e) => {
3245 tracing::error!("Failed to parse fill report: {e}");
3246 }
3247 }
3248 }
3249
3250 Ok(reports)
3251 }
3252
3253 pub async fn request_position_status_reports(
3265 &self,
3266 account_id: AccountId,
3267 product_type: BybitProductType,
3268 instrument_id: Option<InstrumentId>,
3269 ) -> anyhow::Result<Vec<PositionStatusReport>> {
3270 if product_type == BybitProductType::Spot {
3272 if self.use_spot_position_reports.load(Ordering::Relaxed) {
3273 return self
3274 .generate_spot_position_reports_from_wallet(account_id, instrument_id)
3275 .await;
3276 } else {
3277 return Ok(Vec::new());
3279 }
3280 }
3281
3282 let ts_init = self.generate_ts_init();
3283 let mut reports = Vec::new();
3284
3285 let symbol = if let Some(id) = instrument_id {
3287 let symbol_str = id.symbol.as_str();
3288 if symbol_str.is_empty() {
3289 anyhow::bail!("InstrumentId symbol is empty");
3290 }
3291 let bybit_symbol = BybitSymbol::new(symbol_str)?;
3292 Some(bybit_symbol.raw_symbol().to_string())
3293 } else {
3294 None
3295 };
3296
3297 if product_type == BybitProductType::Linear && symbol.is_none() {
3300 for settle_coin in ["USDT", "USDC"] {
3302 let mut cursor: Option<String> = None;
3303
3304 loop {
3305 let params = BybitPositionListParams {
3306 category: product_type,
3307 symbol: None,
3308 base_coin: None,
3309 settle_coin: Some(settle_coin.to_string()),
3310 limit: Some(200), cursor: cursor.clone(),
3312 };
3313
3314 let response = self.inner.get_positions(¶ms).await?;
3315
3316 for position in response.result.list {
3317 if position.symbol.is_empty() {
3318 continue;
3319 }
3320
3321 let symbol_with_product = Symbol::new(format!(
3322 "{}{}",
3323 position.symbol.as_str(),
3324 product_type.suffix()
3325 ));
3326
3327 let Ok(instrument) = self.instrument_from_cache(&symbol_with_product)
3328 else {
3329 tracing::debug!(
3330 symbol = %position.symbol,
3331 full_symbol = %symbol_with_product,
3332 "Skipping position report for instrument not in cache"
3333 );
3334 continue;
3335 };
3336
3337 match parse_position_status_report(
3338 &position,
3339 account_id,
3340 &instrument,
3341 ts_init,
3342 ) {
3343 Ok(report) => reports.push(report),
3344 Err(e) => {
3345 tracing::error!("Failed to parse position status report: {e}");
3346 }
3347 }
3348 }
3349
3350 cursor = response.result.next_page_cursor;
3351 if cursor.as_ref().is_none_or(|c| c.is_empty()) {
3352 break;
3353 }
3354 }
3355 }
3356 } else {
3357 let mut cursor: Option<String> = None;
3359
3360 loop {
3361 let params = BybitPositionListParams {
3362 category: product_type,
3363 symbol: symbol.clone(),
3364 base_coin: None,
3365 settle_coin: None,
3366 limit: Some(200), cursor: cursor.clone(),
3368 };
3369
3370 let response = self.inner.get_positions(¶ms).await?;
3371
3372 for position in response.result.list {
3373 if position.symbol.is_empty() {
3374 continue;
3375 }
3376
3377 let symbol_with_product = Symbol::new(format!(
3378 "{}{}",
3379 position.symbol.as_str(),
3380 product_type.suffix()
3381 ));
3382
3383 let Ok(instrument) = self.instrument_from_cache(&symbol_with_product) else {
3384 tracing::debug!(
3385 symbol = %position.symbol,
3386 full_symbol = %symbol_with_product,
3387 "Skipping position report for instrument not in cache"
3388 );
3389 continue;
3390 };
3391
3392 match parse_position_status_report(&position, account_id, &instrument, ts_init)
3393 {
3394 Ok(report) => reports.push(report),
3395 Err(e) => {
3396 tracing::error!("Failed to parse position status report: {e}");
3397 }
3398 }
3399 }
3400
3401 cursor = response.result.next_page_cursor;
3402 if cursor.is_none() || cursor.as_ref().is_none_or(|c| c.is_empty()) {
3403 break;
3404 }
3405 }
3406 }
3407
3408 Ok(reports)
3409 }
3410}
3411
3412#[cfg(test)]
3417mod tests {
3418 use rstest::rstest;
3419
3420 use super::*;
3421
3422 #[rstest]
3423 fn test_client_creation() {
3424 let client = BybitHttpClient::new(None, Some(60), None, None, None, None, None);
3425 assert!(client.is_ok());
3426
3427 let client = client.unwrap();
3428 assert!(client.base_url().contains("bybit.com"));
3429 assert!(client.credential().is_none());
3430 }
3431
3432 #[rstest]
3433 fn test_client_with_credentials() {
3434 let client = BybitHttpClient::with_credentials(
3435 "test_key".to_string(),
3436 "test_secret".to_string(),
3437 Some("https://api-testnet.bybit.com".to_string()),
3438 Some(60),
3439 None,
3440 None,
3441 None,
3442 None,
3443 None,
3444 );
3445 assert!(client.is_ok());
3446
3447 let client = client.unwrap();
3448 assert!(client.credential().is_some());
3449 }
3450
3451 #[rstest]
3452 fn test_build_path_with_params() {
3453 #[derive(Serialize)]
3454 struct TestParams {
3455 category: String,
3456 symbol: String,
3457 }
3458
3459 let params = TestParams {
3460 category: "linear".to_string(),
3461 symbol: "BTCUSDT".to_string(),
3462 };
3463
3464 let path = BybitRawHttpClient::build_path("/v5/market/test", ¶ms);
3465 assert!(path.is_ok());
3466 assert!(path.unwrap().contains("category=linear"));
3467 }
3468
3469 #[rstest]
3470 fn test_build_path_without_params() {
3471 let params = ();
3472 let path = BybitRawHttpClient::build_path("/v5/market/time", ¶ms);
3473 assert!(path.is_ok());
3474 assert_eq!(path.unwrap(), "/v5/market/time");
3475 }
3476
3477 #[rstest]
3478 fn test_params_serialization_matches_build_path() {
3479 #[derive(Serialize)]
3481 struct TestParams {
3482 category: String,
3483 limit: u32,
3484 }
3485
3486 let params = TestParams {
3487 category: "spot".to_string(),
3488 limit: 50,
3489 };
3490
3491 let old_path = BybitRawHttpClient::build_path("/v5/order/realtime", ¶ms).unwrap();
3493 let old_query = old_path.split('?').nth(1).unwrap_or("");
3494
3495 let new_query = serde_urlencoded::to_string(¶ms).unwrap();
3497
3498 assert_eq!(old_query, new_query);
3500 }
3501
3502 #[rstest]
3503 fn test_params_serialization_order() {
3504 #[derive(Serialize)]
3506 struct OrderParams {
3507 category: String,
3508 symbol: String,
3509 limit: u32,
3510 }
3511
3512 let params = OrderParams {
3513 category: "spot".to_string(),
3514 symbol: "BTCUSDT".to_string(),
3515 limit: 50,
3516 };
3517
3518 let query1 = serde_urlencoded::to_string(¶ms).unwrap();
3520 let query2 = serde_urlencoded::to_string(¶ms).unwrap();
3521 let query3 = serde_urlencoded::to_string(¶ms).unwrap();
3522
3523 assert_eq!(query1, query2);
3524 assert_eq!(query2, query3);
3525
3526 assert!(query1.contains("category=spot"));
3528 assert!(query1.contains("symbol=BTCUSDT"));
3529 assert!(query1.contains("limit=50"));
3530 }
3531}