1use std::{
21 collections::HashMap,
22 fmt::Debug,
23 num::NonZeroU32,
24 sync::{
25 Arc, LazyLock,
26 atomic::{AtomicBool, Ordering},
27 },
28};
29
30use ahash::{AHashMap, AHashSet};
31use chrono::{DateTime, Utc};
32use dashmap::DashMap;
33use nautilus_core::{
34 consts::NAUTILUS_USER_AGENT, env::get_or_env_var_opt, nanos::UnixNanos,
35 time::get_atomic_clock_realtime,
36};
37use nautilus_model::{
38 data::{Bar, BarType, FundingRateUpdate, OrderBookDeltas, TradeTick},
39 enums::{OrderSide, OrderType, PositionSideSpecified, TimeInForce},
40 events::account::state::AccountState,
41 identifiers::{AccountId, ClientOrderId, InstrumentId, Symbol, VenueOrderId},
42 instruments::{Instrument, InstrumentAny},
43 reports::{FillReport, OrderStatusReport, PositionStatusReport},
44 types::{Price, Quantity},
45};
46use nautilus_network::{
47 http::{HttpClient, Method, USER_AGENT},
48 ratelimiter::quota::Quota,
49 retry::{RetryConfig, RetryManager},
50};
51use rust_decimal::Decimal;
52use serde::{Serialize, de::DeserializeOwned};
53use tokio_util::sync::CancellationToken;
54use ustr::Ustr;
55
56use super::{
57 error::BybitHttpError,
58 models::{
59 BybitAccountDetailsResponse, BybitBorrowResponse, BybitFeeRate, BybitFeeRateResponse,
60 BybitFundingResponse, BybitInstrumentInverseResponse, BybitInstrumentLinearResponse,
61 BybitInstrumentOptionResponse, BybitInstrumentSpotResponse, BybitKlinesResponse,
62 BybitNoConvertRepayResponse, BybitOpenOrdersResponse, BybitOrderHistoryResponse,
63 BybitOrderbookResponse, BybitPlaceOrderResponse, BybitPositionListResponse,
64 BybitServerTimeResponse, BybitSetLeverageResponse, BybitSetMarginModeResponse,
65 BybitSetTradingStopResponse, BybitSwitchModeResponse, BybitTickerData,
66 BybitTradeHistoryResponse, BybitTradesResponse, BybitWalletBalanceResponse,
67 },
68 query::{
69 BybitAmendOrderParamsBuilder, BybitBatchAmendOrderEntryBuilder,
70 BybitBatchCancelOrderEntryBuilder, BybitBatchCancelOrderParamsBuilder,
71 BybitBatchPlaceOrderEntryBuilder, BybitBorrowParamsBuilder,
72 BybitCancelAllOrdersParamsBuilder, BybitCancelOrderParamsBuilder, BybitFeeRateParams,
73 BybitFeeRateParamsBuilder, BybitFundingParams, BybitFundingParamsBuilder,
74 BybitInstrumentsInfoParams, BybitKlinesParams, BybitKlinesParamsBuilder,
75 BybitNoConvertRepayParamsBuilder, BybitOpenOrdersParamsBuilder,
76 BybitOrderHistoryParamsBuilder, BybitOrderbookParams, BybitOrderbookParamsBuilder,
77 BybitPlaceOrderParamsBuilder, BybitPositionListParams, BybitSetLeverageParamsBuilder,
78 BybitSetMarginModeParamsBuilder, BybitSetTradingStopParams, BybitSwitchModeParamsBuilder,
79 BybitTickersParams, BybitTradeHistoryParams, BybitTradesParams, BybitTradesParamsBuilder,
80 BybitWalletBalanceParams,
81 },
82};
83use crate::common::{
84 consts::{BYBIT_BASE_COIN, BYBIT_NAUTILUS_BROKER_ID, BYBIT_QUOTE_COIN},
85 credential::Credential,
86 enums::{
87 BybitAccountType, BybitEnvironment, BybitMarginMode, BybitOpenOnly, BybitOrderFilter,
88 BybitOrderSide, BybitOrderType, BybitPositionMode, BybitProductType, BybitTimeInForce,
89 BybitTriggerDirection,
90 },
91 models::{BybitErrorCheck, BybitResponseCheck},
92 parse::{
93 bar_spec_to_bybit_interval, make_bybit_symbol, parse_account_state, parse_fill_report,
94 parse_funding_rate, parse_inverse_instrument, parse_kline_bar, parse_linear_instrument,
95 parse_option_instrument, parse_order_status_report, parse_orderbook,
96 parse_position_status_report, parse_spot_instrument, parse_trade_tick,
97 },
98 symbol::BybitSymbol,
99 urls::bybit_http_base_url,
100};
101
102const DEFAULT_RECV_WINDOW_MS: u64 = 5_000;
103
104pub static BYBIT_REST_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
109 Quota::per_second(NonZeroU32::new(10).expect("Should be a valid non-zero u32"))
110});
111
112pub static BYBIT_REPAY_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
116 Quota::per_second(NonZeroU32::new(1).expect("Should be a valid non-zero u32"))
117});
118
119const BYBIT_GLOBAL_RATE_KEY: &str = "bybit:global";
120const BYBIT_REPAY_ROUTE_KEY: &str = "bybit:/v5/account/no-convert-repay";
121
122#[cfg_attr(
127 feature = "python",
128 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.bybit")
129)]
130#[derive(Clone)]
131pub struct BybitRawHttpClient {
132 base_url: String,
133 client: HttpClient,
134 credential: Option<Credential>,
135 recv_window_ms: u64,
136 retry_manager: RetryManager<BybitHttpError>,
137 cancellation_token: CancellationToken,
138}
139
140impl Default for BybitRawHttpClient {
141 fn default() -> Self {
142 Self::new(None, Some(60), None, None, None, None, None)
143 .expect("Failed to create default BybitRawHttpClient")
144 }
145}
146
147impl Debug for BybitRawHttpClient {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 f.debug_struct(stringify!(BybitRawHttpClient))
150 .field("base_url", &self.base_url)
151 .field("has_credentials", &self.credential.is_some())
152 .field("recv_window_ms", &self.recv_window_ms)
153 .finish()
154 }
155}
156
157impl BybitRawHttpClient {
158 pub fn cancel_all_requests(&self) {
160 self.cancellation_token.cancel();
161 }
162
163 pub fn cancellation_token(&self) -> &CancellationToken {
165 &self.cancellation_token
166 }
167
168 #[allow(clippy::too_many_arguments)]
174 pub fn new(
175 base_url: Option<String>,
176 timeout_secs: Option<u64>,
177 max_retries: Option<u32>,
178 retry_delay_ms: Option<u64>,
179 retry_delay_max_ms: Option<u64>,
180 recv_window_ms: Option<u64>,
181 proxy_url: Option<String>,
182 ) -> Result<Self, BybitHttpError> {
183 let retry_config = RetryConfig {
184 max_retries: max_retries.unwrap_or(3),
185 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
186 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
187 backoff_factor: 2.0,
188 jitter_ms: 1000,
189 operation_timeout_ms: Some(60_000),
190 immediate_first: false,
191 max_elapsed_ms: Some(180_000),
192 };
193
194 let retry_manager = RetryManager::new(retry_config);
195
196 Ok(Self {
197 base_url: base_url
198 .unwrap_or_else(|| bybit_http_base_url(BybitEnvironment::Mainnet).to_string()),
199 client: HttpClient::new(
200 Self::default_headers(),
201 vec![],
202 Self::rate_limiter_quotas(),
203 Some(*BYBIT_REST_QUOTA),
204 timeout_secs,
205 proxy_url,
206 )
207 .map_err(|e| {
208 BybitHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
209 })?,
210 credential: None,
211 recv_window_ms: recv_window_ms.unwrap_or(DEFAULT_RECV_WINDOW_MS),
212 retry_manager,
213 cancellation_token: CancellationToken::new(),
214 })
215 }
216
217 #[allow(clippy::too_many_arguments)]
223 pub fn with_credentials(
224 api_key: String,
225 api_secret: String,
226 base_url: Option<String>,
227 timeout_secs: Option<u64>,
228 max_retries: Option<u32>,
229 retry_delay_ms: Option<u64>,
230 retry_delay_max_ms: Option<u64>,
231 recv_window_ms: Option<u64>,
232 proxy_url: Option<String>,
233 ) -> Result<Self, BybitHttpError> {
234 let retry_config = RetryConfig {
235 max_retries: max_retries.unwrap_or(3),
236 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
237 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
238 backoff_factor: 2.0,
239 jitter_ms: 1000,
240 operation_timeout_ms: Some(60_000),
241 immediate_first: false,
242 max_elapsed_ms: Some(180_000),
243 };
244
245 let retry_manager = RetryManager::new(retry_config);
246
247 Ok(Self {
248 base_url: base_url
249 .unwrap_or_else(|| bybit_http_base_url(BybitEnvironment::Mainnet).to_string()),
250 client: HttpClient::new(
251 Self::default_headers(),
252 vec![],
253 Self::rate_limiter_quotas(),
254 Some(*BYBIT_REST_QUOTA),
255 timeout_secs,
256 proxy_url,
257 )
258 .map_err(|e| {
259 BybitHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
260 })?,
261 credential: Some(Credential::new(api_key, api_secret)),
262 recv_window_ms: recv_window_ms.unwrap_or(DEFAULT_RECV_WINDOW_MS),
263 retry_manager,
264 cancellation_token: CancellationToken::new(),
265 })
266 }
267
268 #[allow(clippy::too_many_arguments)]
280 pub fn new_with_env(
281 api_key: Option<String>,
282 api_secret: Option<String>,
283 base_url: Option<String>,
284 demo: bool,
285 testnet: bool,
286 timeout_secs: Option<u64>,
287 max_retries: Option<u32>,
288 retry_delay_ms: Option<u64>,
289 retry_delay_max_ms: Option<u64>,
290 recv_window_ms: Option<u64>,
291 proxy_url: Option<String>,
292 ) -> Result<Self, BybitHttpError> {
293 let (api_key_env, api_secret_env) = if demo {
294 ("BYBIT_DEMO_API_KEY", "BYBIT_DEMO_API_SECRET")
295 } else if testnet {
296 ("BYBIT_TESTNET_API_KEY", "BYBIT_TESTNET_API_SECRET")
297 } else {
298 ("BYBIT_API_KEY", "BYBIT_API_SECRET")
299 };
300
301 let key = get_or_env_var_opt(api_key, api_key_env);
302 let secret = get_or_env_var_opt(api_secret, api_secret_env);
303
304 if let (Some(k), Some(s)) = (key, secret) {
305 Self::with_credentials(
306 k,
307 s,
308 base_url,
309 timeout_secs,
310 max_retries,
311 retry_delay_ms,
312 retry_delay_max_ms,
313 recv_window_ms,
314 proxy_url,
315 )
316 } else {
317 Self::new(
318 base_url,
319 timeout_secs,
320 max_retries,
321 retry_delay_ms,
322 retry_delay_max_ms,
323 recv_window_ms,
324 proxy_url,
325 )
326 }
327 }
328
329 fn default_headers() -> HashMap<String, String> {
330 HashMap::from([
331 (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string()),
332 (
333 "X-Referer".to_string(),
334 BYBIT_NAUTILUS_BROKER_ID.to_string(),
335 ),
336 ])
337 }
338
339 fn rate_limiter_quotas() -> Vec<(String, Quota)> {
340 vec![
341 (BYBIT_GLOBAL_RATE_KEY.to_string(), *BYBIT_REST_QUOTA),
342 (BYBIT_REPAY_ROUTE_KEY.to_string(), *BYBIT_REPAY_QUOTA),
343 ]
344 }
345
346 fn rate_limit_keys(endpoint: &str) -> Vec<String> {
347 let normalized = endpoint.split('?').next().unwrap_or(endpoint);
348 let route = format!("bybit:{normalized}");
349
350 vec![BYBIT_GLOBAL_RATE_KEY.to_string(), route]
351 }
352
353 fn sign_request(
354 &self,
355 timestamp: &str,
356 params: Option<&str>,
357 ) -> Result<HashMap<String, String>, BybitHttpError> {
358 let credential = self
359 .credential
360 .as_ref()
361 .ok_or(BybitHttpError::MissingCredentials)?;
362
363 let signature = credential.sign_with_payload(timestamp, self.recv_window_ms, params);
364
365 let mut headers = HashMap::new();
366 headers.insert(
367 "X-BAPI-API-KEY".to_string(),
368 credential.api_key().to_string(),
369 );
370 headers.insert("X-BAPI-TIMESTAMP".to_string(), timestamp.to_string());
371 headers.insert("X-BAPI-SIGN".to_string(), signature);
372 headers.insert(
373 "X-BAPI-RECV-WINDOW".to_string(),
374 self.recv_window_ms.to_string(),
375 );
376
377 Ok(headers)
378 }
379
380 async fn send_request<T: DeserializeOwned + BybitResponseCheck, P: Serialize>(
381 &self,
382 method: Method,
383 endpoint: &str,
384 params: Option<&P>,
385 body: Option<Vec<u8>>,
386 authenticate: bool,
387 ) -> Result<T, BybitHttpError> {
388 let endpoint = endpoint.to_string();
389 let url = format!("{}{endpoint}", self.base_url);
390 let method_clone = method.clone();
391 let body_clone = body.clone();
392
393 let params_str = if method == Method::GET {
395 params
396 .map(serde_urlencoded::to_string)
397 .transpose()
398 .map_err(|e| {
399 BybitHttpError::JsonError(format!("Failed to serialize params: {e}"))
400 })?
401 } else {
402 None
403 };
404
405 let operation = || {
406 let url = url.clone();
407 let method = method_clone.clone();
408 let body = body_clone.clone();
409 let endpoint = endpoint.clone();
410 let params_str = params_str.clone();
411
412 async move {
413 let mut headers = Self::default_headers();
414
415 if authenticate {
416 let timestamp = get_atomic_clock_realtime().get_time_ms().to_string();
417
418 let sign_payload = if method == Method::GET {
419 params_str.as_deref()
420 } else {
421 body.as_ref().and_then(|b| std::str::from_utf8(b).ok())
422 };
423
424 let auth_headers = self.sign_request(×tamp, sign_payload)?;
425 headers.extend(auth_headers);
426 }
427
428 if method == Method::POST || method == Method::PUT {
429 headers.insert("Content-Type".to_string(), "application/json".to_string());
430 }
431
432 let full_url = if let Some(ref query) = params_str {
433 if query.is_empty() {
434 url
435 } else {
436 format!("{url}?{query}")
437 }
438 } else {
439 url
440 };
441
442 let rate_limit_keys = Self::rate_limit_keys(&endpoint);
443
444 let response = self
445 .client
446 .request(
447 method,
448 full_url,
449 None,
450 Some(headers),
451 body,
452 None,
453 Some(rate_limit_keys),
454 )
455 .await?;
456
457 if response.status.as_u16() >= 400 {
458 let body = String::from_utf8_lossy(&response.body).to_string();
459 return Err(BybitHttpError::UnexpectedStatus {
460 status: response.status.as_u16(),
461 body,
462 });
463 }
464
465 match serde_json::from_slice::<T>(&response.body) {
467 Ok(result) => {
468 if result.ret_code() != 0 {
470 return Err(BybitHttpError::BybitError {
471 error_code: result.ret_code() as i32,
472 message: result.ret_msg().to_string(),
473 });
474 }
475 Ok(result)
476 }
477 Err(json_err) => {
478 if let Ok(error_check) =
481 serde_json::from_slice::<BybitErrorCheck>(&response.body)
482 && error_check.ret_code != 0
483 {
484 return Err(BybitHttpError::BybitError {
485 error_code: error_check.ret_code as i32,
486 message: error_check.ret_msg,
487 });
488 }
489 Err(json_err.into())
491 }
492 }
493 }
494 };
495
496 let should_retry = |error: &BybitHttpError| -> bool {
497 match error {
498 BybitHttpError::NetworkError(_) => true,
499 BybitHttpError::UnexpectedStatus { status, .. } => *status >= 500,
500 _ => false,
501 }
502 };
503
504 let create_error = |msg: String| -> BybitHttpError {
505 if msg == "canceled" {
506 BybitHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
507 } else {
508 BybitHttpError::NetworkError(msg)
509 }
510 };
511
512 self.retry_manager
513 .execute_with_retry_with_cancel(
514 endpoint.as_str(),
515 operation,
516 should_retry,
517 create_error,
518 &self.cancellation_token,
519 )
520 .await
521 }
522
523 #[cfg(test)]
524 fn build_path<S: Serialize>(base: &str, params: &S) -> Result<String, BybitHttpError> {
525 let query = serde_urlencoded::to_string(params)
526 .map_err(|e| BybitHttpError::JsonError(e.to_string()))?;
527 if query.is_empty() {
528 Ok(base.to_owned())
529 } else {
530 Ok(format!("{base}?{query}"))
531 }
532 }
533
534 pub async fn get_server_time(&self) -> Result<BybitServerTimeResponse, BybitHttpError> {
544 self.send_request::<_, ()>(Method::GET, "/v5/market/time", None, None, false)
545 .await
546 }
547
548 pub async fn get_instruments<T: DeserializeOwned + BybitResponseCheck>(
558 &self,
559 params: &BybitInstrumentsInfoParams,
560 ) -> Result<T, BybitHttpError> {
561 self.send_request(
562 Method::GET,
563 "/v5/market/instruments-info",
564 Some(params),
565 None,
566 false,
567 )
568 .await
569 }
570
571 pub async fn get_instruments_spot(
581 &self,
582 params: &BybitInstrumentsInfoParams,
583 ) -> Result<BybitInstrumentSpotResponse, BybitHttpError> {
584 self.get_instruments(params).await
585 }
586
587 pub async fn get_instruments_linear(
597 &self,
598 params: &BybitInstrumentsInfoParams,
599 ) -> Result<BybitInstrumentLinearResponse, BybitHttpError> {
600 self.get_instruments(params).await
601 }
602
603 pub async fn get_instruments_inverse(
613 &self,
614 params: &BybitInstrumentsInfoParams,
615 ) -> Result<BybitInstrumentInverseResponse, BybitHttpError> {
616 self.get_instruments(params).await
617 }
618
619 pub async fn get_instruments_option(
629 &self,
630 params: &BybitInstrumentsInfoParams,
631 ) -> Result<BybitInstrumentOptionResponse, BybitHttpError> {
632 self.get_instruments(params).await
633 }
634
635 pub async fn get_klines(
645 &self,
646 params: &BybitKlinesParams,
647 ) -> Result<BybitKlinesResponse, BybitHttpError> {
648 self.send_request(Method::GET, "/v5/market/kline", Some(params), None, false)
649 .await
650 }
651
652 pub async fn get_recent_trades(
662 &self,
663 params: &BybitTradesParams,
664 ) -> Result<BybitTradesResponse, BybitHttpError> {
665 self.send_request(
666 Method::GET,
667 "/v5/market/recent-trade",
668 Some(params),
669 None,
670 false,
671 )
672 .await
673 }
674
675 pub async fn get_funding_history(
685 &self,
686 params: &BybitFundingParams,
687 ) -> Result<BybitFundingResponse, BybitHttpError> {
688 self.send_request(
689 Method::GET,
690 "/v5/market/funding/history",
691 Some(params),
692 None,
693 false,
694 )
695 .await
696 }
697
698 pub async fn get_orderbook(
708 &self,
709 params: &BybitOrderbookParams,
710 ) -> Result<BybitOrderbookResponse, BybitHttpError> {
711 self.send_request(
712 Method::GET,
713 "/v5/market/orderbook",
714 Some(params),
715 None,
716 false,
717 )
718 .await
719 }
720
721 #[allow(clippy::too_many_arguments)]
735 pub async fn get_open_orders(
736 &self,
737 category: BybitProductType,
738 symbol: Option<String>,
739 base_coin: Option<String>,
740 settle_coin: Option<String>,
741 order_id: Option<String>,
742 order_link_id: Option<String>,
743 open_only: Option<BybitOpenOnly>,
744 order_filter: Option<BybitOrderFilter>,
745 limit: Option<u32>,
746 cursor: Option<String>,
747 ) -> Result<BybitOpenOrdersResponse, BybitHttpError> {
748 let mut builder = BybitOpenOrdersParamsBuilder::default();
749 builder.category(category);
750
751 if let Some(s) = symbol {
752 builder.symbol(s);
753 }
754 if let Some(bc) = base_coin {
755 builder.base_coin(bc);
756 }
757 if let Some(sc) = settle_coin {
758 builder.settle_coin(sc);
759 }
760 if let Some(oi) = order_id {
761 builder.order_id(oi);
762 }
763 if let Some(ol) = order_link_id {
764 builder.order_link_id(ol);
765 }
766 if let Some(oo) = open_only {
767 builder.open_only(oo);
768 }
769 if let Some(of) = order_filter {
770 builder.order_filter(of);
771 }
772 if let Some(l) = limit {
773 builder.limit(l);
774 }
775 if let Some(c) = cursor {
776 builder.cursor(c);
777 }
778
779 let params = builder
780 .build()
781 .expect("Failed to build BybitOpenOrdersParams");
782
783 self.send_request(Method::GET, "/v5/order/realtime", Some(¶ms), None, true)
784 .await
785 }
786
787 pub async fn place_order(
797 &self,
798 request: &serde_json::Value,
799 ) -> Result<BybitPlaceOrderResponse, BybitHttpError> {
800 let body = serde_json::to_vec(request)?;
801 self.send_request::<_, ()>(Method::POST, "/v5/order/create", None, Some(body), true)
802 .await
803 }
804
805 pub async fn get_wallet_balance(
815 &self,
816 params: &BybitWalletBalanceParams,
817 ) -> Result<BybitWalletBalanceResponse, BybitHttpError> {
818 self.send_request(
819 Method::GET,
820 "/v5/account/wallet-balance",
821 Some(params),
822 None,
823 true,
824 )
825 .await
826 }
827
828 pub async fn get_account_details(&self) -> Result<BybitAccountDetailsResponse, BybitHttpError> {
838 self.send_request::<_, ()>(Method::GET, "/v5/user/query-api", None, None, true)
839 .await
840 }
841
842 pub async fn get_fee_rate(
852 &self,
853 params: &BybitFeeRateParams,
854 ) -> Result<BybitFeeRateResponse, BybitHttpError> {
855 self.send_request(
856 Method::GET,
857 "/v5/account/fee-rate",
858 Some(params),
859 None,
860 true,
861 )
862 .await
863 }
864
865 pub async fn set_margin_mode(
882 &self,
883 margin_mode: BybitMarginMode,
884 ) -> Result<BybitSetMarginModeResponse, BybitHttpError> {
885 let params = BybitSetMarginModeParamsBuilder::default()
886 .set_margin_mode(margin_mode)
887 .build()
888 .expect("Failed to build BybitSetMarginModeParams");
889
890 let body = serde_json::to_vec(¶ms)?;
891 self.send_request::<_, ()>(
892 Method::POST,
893 "/v5/account/set-margin-mode",
894 None,
895 Some(body),
896 true,
897 )
898 .await
899 }
900
901 pub async fn set_leverage(
918 &self,
919 product_type: BybitProductType,
920 symbol: &str,
921 buy_leverage: &str,
922 sell_leverage: &str,
923 ) -> Result<BybitSetLeverageResponse, BybitHttpError> {
924 let params = BybitSetLeverageParamsBuilder::default()
925 .category(product_type)
926 .symbol(symbol.to_string())
927 .buy_leverage(buy_leverage.to_string())
928 .sell_leverage(sell_leverage.to_string())
929 .build()
930 .expect("Failed to build BybitSetLeverageParams");
931
932 let body = serde_json::to_vec(¶ms)?;
933 self.send_request::<_, ()>(
934 Method::POST,
935 "/v5/position/set-leverage",
936 None,
937 Some(body),
938 true,
939 )
940 .await
941 }
942
943 pub async fn switch_mode(
960 &self,
961 product_type: BybitProductType,
962 mode: BybitPositionMode,
963 symbol: Option<String>,
964 coin: Option<String>,
965 ) -> Result<BybitSwitchModeResponse, BybitHttpError> {
966 let mut builder = BybitSwitchModeParamsBuilder::default();
967 builder.category(product_type);
968 builder.mode(mode);
969
970 if let Some(s) = symbol {
971 builder.symbol(s);
972 }
973 if let Some(c) = coin {
974 builder.coin(c);
975 }
976
977 let params = builder
978 .build()
979 .expect("Failed to build BybitSwitchModeParams");
980
981 let body = serde_json::to_vec(¶ms)?;
982 self.send_request::<_, ()>(
983 Method::POST,
984 "/v5/position/switch-mode",
985 None,
986 Some(body),
987 true,
988 )
989 .await
990 }
991
992 pub async fn set_trading_stop(
1005 &self,
1006 params: &BybitSetTradingStopParams,
1007 ) -> Result<BybitSetTradingStopResponse, BybitHttpError> {
1008 let body = serde_json::to_vec(params)?;
1009 self.send_request::<_, ()>(
1010 Method::POST,
1011 "/v5/position/trading-stop",
1012 None,
1013 Some(body),
1014 true,
1015 )
1016 .await
1017 }
1018
1019 pub async fn borrow(
1036 &self,
1037 coin: &str,
1038 amount: &str,
1039 ) -> Result<BybitBorrowResponse, BybitHttpError> {
1040 let params = BybitBorrowParamsBuilder::default()
1041 .coin(coin.to_string())
1042 .amount(amount.to_string())
1043 .build()
1044 .expect("Failed to build BybitBorrowParams");
1045
1046 let body = serde_json::to_vec(¶ms)?;
1047 self.send_request::<_, ()>(Method::POST, "/v5/account/borrow", None, Some(body), true)
1048 .await
1049 }
1050
1051 pub async fn no_convert_repay(
1069 &self,
1070 coin: &str,
1071 amount: Option<&str>,
1072 ) -> Result<BybitNoConvertRepayResponse, BybitHttpError> {
1073 let mut builder = BybitNoConvertRepayParamsBuilder::default();
1074 builder.coin(coin.to_string());
1075
1076 if let Some(amt) = amount {
1077 builder.amount(amt.to_string());
1078 }
1079
1080 let params = builder
1081 .build()
1082 .expect("Failed to build BybitNoConvertRepayParams");
1083
1084 if let Ok(params_json) = serde_json::to_string(¶ms) {
1086 log::debug!("Repay request params: {params_json}");
1087 }
1088
1089 let body = serde_json::to_vec(¶ms)?;
1090 let result = self
1091 .send_request::<_, ()>(
1092 Method::POST,
1093 "/v5/account/no-convert-repay",
1094 None,
1095 Some(body),
1096 true,
1097 )
1098 .await;
1099
1100 if let Err(ref e) = result
1102 && let Ok(params_json) = serde_json::to_string(¶ms)
1103 {
1104 log::error!("Repay request failed with params {params_json}: {e}");
1105 }
1106
1107 result
1108 }
1109
1110 pub async fn get_tickers<T: DeserializeOwned + BybitResponseCheck>(
1120 &self,
1121 params: &BybitTickersParams,
1122 ) -> Result<T, BybitHttpError> {
1123 self.send_request(Method::GET, "/v5/market/tickers", Some(params), None, false)
1124 .await
1125 }
1126
1127 pub async fn get_trade_history(
1137 &self,
1138 params: &BybitTradeHistoryParams,
1139 ) -> Result<BybitTradeHistoryResponse, BybitHttpError> {
1140 self.send_request(Method::GET, "/v5/execution/list", Some(params), None, true)
1141 .await
1142 }
1143
1144 pub async fn get_positions(
1157 &self,
1158 params: &BybitPositionListParams,
1159 ) -> Result<BybitPositionListResponse, BybitHttpError> {
1160 self.send_request(Method::GET, "/v5/position/list", Some(params), None, true)
1161 .await
1162 }
1163
1164 #[must_use]
1166 pub fn base_url(&self) -> &str {
1167 &self.base_url
1168 }
1169
1170 #[must_use]
1172 pub fn recv_window_ms(&self) -> u64 {
1173 self.recv_window_ms
1174 }
1175
1176 #[must_use]
1178 pub fn credential(&self) -> Option<&Credential> {
1179 self.credential.as_ref()
1180 }
1181}
1182
1183#[cfg_attr(
1185 feature = "python",
1186 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.bybit")
1187)]
1188pub struct BybitHttpClient {
1193 pub(crate) inner: Arc<BybitRawHttpClient>,
1194 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
1195 cache_initialized: Arc<AtomicBool>,
1196 use_spot_position_reports: Arc<AtomicBool>,
1197}
1198
1199impl Clone for BybitHttpClient {
1200 fn clone(&self) -> Self {
1201 Self {
1202 inner: self.inner.clone(),
1203 instruments_cache: self.instruments_cache.clone(),
1204 cache_initialized: self.cache_initialized.clone(),
1205 use_spot_position_reports: self.use_spot_position_reports.clone(),
1206 }
1207 }
1208}
1209
1210impl Default for BybitHttpClient {
1211 fn default() -> Self {
1212 Self::new(None, Some(60), None, None, None, None, None)
1213 .expect("Failed to create default BybitHttpClient")
1214 }
1215}
1216
1217impl Debug for BybitHttpClient {
1218 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1219 f.debug_struct(stringify!(BybitHttpClient))
1220 .field("inner", &self.inner)
1221 .finish()
1222 }
1223}
1224
1225impl BybitHttpClient {
1226 #[allow(clippy::too_many_arguments)]
1232 pub fn new(
1233 base_url: Option<String>,
1234 timeout_secs: Option<u64>,
1235 max_retries: Option<u32>,
1236 retry_delay_ms: Option<u64>,
1237 retry_delay_max_ms: Option<u64>,
1238 recv_window_ms: Option<u64>,
1239 proxy_url: Option<String>,
1240 ) -> Result<Self, BybitHttpError> {
1241 Ok(Self {
1242 inner: Arc::new(BybitRawHttpClient::new(
1243 base_url,
1244 timeout_secs,
1245 max_retries,
1246 retry_delay_ms,
1247 retry_delay_max_ms,
1248 recv_window_ms,
1249 proxy_url,
1250 )?),
1251 instruments_cache: Arc::new(DashMap::new()),
1252 cache_initialized: Arc::new(AtomicBool::new(false)),
1253 use_spot_position_reports: Arc::new(AtomicBool::new(false)),
1254 })
1255 }
1256
1257 #[allow(clippy::too_many_arguments)]
1263 pub fn with_credentials(
1264 api_key: String,
1265 api_secret: String,
1266 base_url: Option<String>,
1267 timeout_secs: Option<u64>,
1268 max_retries: Option<u32>,
1269 retry_delay_ms: Option<u64>,
1270 retry_delay_max_ms: Option<u64>,
1271 recv_window_ms: Option<u64>,
1272 proxy_url: Option<String>,
1273 ) -> Result<Self, BybitHttpError> {
1274 Ok(Self {
1275 inner: Arc::new(BybitRawHttpClient::with_credentials(
1276 api_key,
1277 api_secret,
1278 base_url,
1279 timeout_secs,
1280 max_retries,
1281 retry_delay_ms,
1282 retry_delay_max_ms,
1283 recv_window_ms,
1284 proxy_url,
1285 )?),
1286 instruments_cache: Arc::new(DashMap::new()),
1287 cache_initialized: Arc::new(AtomicBool::new(false)),
1288 use_spot_position_reports: Arc::new(AtomicBool::new(false)),
1289 })
1290 }
1291
1292 #[allow(clippy::too_many_arguments)]
1305 pub fn new_with_env(
1306 api_key: Option<String>,
1307 api_secret: Option<String>,
1308 base_url: Option<String>,
1309 demo: bool,
1310 testnet: bool,
1311 timeout_secs: Option<u64>,
1312 max_retries: Option<u32>,
1313 retry_delay_ms: Option<u64>,
1314 retry_delay_max_ms: Option<u64>,
1315 recv_window_ms: Option<u64>,
1316 proxy_url: Option<String>,
1317 ) -> Result<Self, BybitHttpError> {
1318 let (api_key_env, api_secret_env) = if demo {
1319 ("BYBIT_DEMO_API_KEY", "BYBIT_DEMO_API_SECRET")
1320 } else if testnet {
1321 ("BYBIT_TESTNET_API_KEY", "BYBIT_TESTNET_API_SECRET")
1322 } else {
1323 ("BYBIT_API_KEY", "BYBIT_API_SECRET")
1324 };
1325
1326 let key = get_or_env_var_opt(api_key, api_key_env);
1327 let secret = get_or_env_var_opt(api_secret, api_secret_env);
1328
1329 match (key, secret) {
1330 (Some(k), Some(s)) => Self::with_credentials(
1331 k,
1332 s,
1333 base_url,
1334 timeout_secs,
1335 max_retries,
1336 retry_delay_ms,
1337 retry_delay_max_ms,
1338 recv_window_ms,
1339 proxy_url,
1340 ),
1341 _ => Self::new(
1342 base_url,
1343 timeout_secs,
1344 max_retries,
1345 retry_delay_ms,
1346 retry_delay_max_ms,
1347 recv_window_ms,
1348 proxy_url,
1349 ),
1350 }
1351 }
1352
1353 #[must_use]
1354 pub fn base_url(&self) -> &str {
1355 self.inner.base_url()
1356 }
1357
1358 #[must_use]
1359 pub fn recv_window_ms(&self) -> u64 {
1360 self.inner.recv_window_ms()
1361 }
1362
1363 #[must_use]
1364 pub fn credential(&self) -> Option<&Credential> {
1365 self.inner.credential()
1366 }
1367
1368 pub fn set_use_spot_position_reports(&self, use_spot_position_reports: bool) {
1369 self.use_spot_position_reports
1370 .store(use_spot_position_reports, Ordering::Relaxed);
1371 }
1372
1373 pub fn cancel_all_requests(&self) {
1374 self.inner.cancel_all_requests();
1375 }
1376
1377 pub fn cancellation_token(&self) -> &CancellationToken {
1378 self.inner.cancellation_token()
1379 }
1380
1381 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1383 self.instruments_cache
1384 .insert(instrument.symbol().inner(), instrument);
1385 self.cache_initialized.store(true, Ordering::Release);
1386 }
1387
1388 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1390 for instrument in instruments {
1391 self.instruments_cache
1392 .insert(instrument.symbol().inner(), instrument);
1393 }
1394 self.cache_initialized.store(true, Ordering::Release);
1395 }
1396
1397 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1398 self.instruments_cache
1399 .get(symbol)
1400 .map(|entry| entry.value().clone())
1401 }
1402
1403 fn instrument_from_cache(&self, symbol: &Symbol) -> anyhow::Result<InstrumentAny> {
1404 self.get_instrument(&symbol.inner()).ok_or_else(|| {
1405 anyhow::anyhow!(
1406 "Instrument {symbol} not found in cache, ensure instruments loaded first"
1407 )
1408 })
1409 }
1410
1411 #[must_use]
1412 fn generate_ts_init(&self) -> UnixNanos {
1413 get_atomic_clock_realtime().get_time_ns()
1414 }
1415
1416 pub async fn get_server_time(&self) -> Result<BybitServerTimeResponse, BybitHttpError> {
1428 self.inner.get_server_time().await
1429 }
1430
1431 pub async fn get_instruments<T: DeserializeOwned + BybitResponseCheck>(
1443 &self,
1444 params: &BybitInstrumentsInfoParams,
1445 ) -> Result<T, BybitHttpError> {
1446 self.inner.get_instruments(params).await
1447 }
1448
1449 pub async fn get_instruments_spot(
1461 &self,
1462 params: &BybitInstrumentsInfoParams,
1463 ) -> Result<BybitInstrumentSpotResponse, BybitHttpError> {
1464 self.inner.get_instruments_spot(params).await
1465 }
1466
1467 pub async fn get_instruments_linear(
1479 &self,
1480 params: &BybitInstrumentsInfoParams,
1481 ) -> Result<BybitInstrumentLinearResponse, BybitHttpError> {
1482 self.inner.get_instruments_linear(params).await
1483 }
1484
1485 pub async fn get_instruments_inverse(
1497 &self,
1498 params: &BybitInstrumentsInfoParams,
1499 ) -> Result<BybitInstrumentInverseResponse, BybitHttpError> {
1500 self.inner.get_instruments_inverse(params).await
1501 }
1502
1503 pub async fn get_instruments_option(
1515 &self,
1516 params: &BybitInstrumentsInfoParams,
1517 ) -> Result<BybitInstrumentOptionResponse, BybitHttpError> {
1518 self.inner.get_instruments_option(params).await
1519 }
1520
1521 pub async fn get_klines(
1533 &self,
1534 params: &BybitKlinesParams,
1535 ) -> Result<BybitKlinesResponse, BybitHttpError> {
1536 self.inner.get_klines(params).await
1537 }
1538
1539 pub async fn get_recent_trades(
1551 &self,
1552 params: &BybitTradesParams,
1553 ) -> Result<BybitTradesResponse, BybitHttpError> {
1554 self.inner.get_recent_trades(params).await
1555 }
1556
1557 #[allow(clippy::too_many_arguments)]
1569 pub async fn get_open_orders(
1570 &self,
1571 category: BybitProductType,
1572 symbol: Option<String>,
1573 base_coin: Option<String>,
1574 settle_coin: Option<String>,
1575 order_id: Option<String>,
1576 order_link_id: Option<String>,
1577 open_only: Option<BybitOpenOnly>,
1578 order_filter: Option<BybitOrderFilter>,
1579 limit: Option<u32>,
1580 cursor: Option<String>,
1581 ) -> Result<BybitOpenOrdersResponse, BybitHttpError> {
1582 self.inner
1583 .get_open_orders(
1584 category,
1585 symbol,
1586 base_coin,
1587 settle_coin,
1588 order_id,
1589 order_link_id,
1590 open_only,
1591 order_filter,
1592 limit,
1593 cursor,
1594 )
1595 .await
1596 }
1597
1598 pub async fn place_order(
1610 &self,
1611 request: &serde_json::Value,
1612 ) -> Result<BybitPlaceOrderResponse, BybitHttpError> {
1613 self.inner.place_order(request).await
1614 }
1615
1616 pub async fn get_wallet_balance(
1628 &self,
1629 params: &BybitWalletBalanceParams,
1630 ) -> Result<BybitWalletBalanceResponse, BybitHttpError> {
1631 self.inner.get_wallet_balance(params).await
1632 }
1633
1634 pub async fn get_account_details(&self) -> Result<BybitAccountDetailsResponse, BybitHttpError> {
1646 self.inner.get_account_details().await
1647 }
1648
1649 pub async fn get_positions(
1662 &self,
1663 params: &BybitPositionListParams,
1664 ) -> Result<BybitPositionListResponse, BybitHttpError> {
1665 self.inner.get_positions(params).await
1666 }
1667
1668 pub async fn get_fee_rate(
1681 &self,
1682 params: &BybitFeeRateParams,
1683 ) -> Result<BybitFeeRateResponse, BybitHttpError> {
1684 self.inner.get_fee_rate(params).await
1685 }
1686
1687 pub async fn set_margin_mode(
1700 &self,
1701 margin_mode: BybitMarginMode,
1702 ) -> Result<BybitSetMarginModeResponse, BybitHttpError> {
1703 self.inner.set_margin_mode(margin_mode).await
1704 }
1705
1706 pub async fn set_leverage(
1719 &self,
1720 product_type: BybitProductType,
1721 symbol: &str,
1722 buy_leverage: &str,
1723 sell_leverage: &str,
1724 ) -> Result<BybitSetLeverageResponse, BybitHttpError> {
1725 self.inner
1726 .set_leverage(product_type, symbol, buy_leverage, sell_leverage)
1727 .await
1728 }
1729
1730 pub async fn switch_mode(
1743 &self,
1744 product_type: BybitProductType,
1745 mode: BybitPositionMode,
1746 symbol: Option<String>,
1747 coin: Option<String>,
1748 ) -> Result<BybitSwitchModeResponse, BybitHttpError> {
1749 self.inner
1750 .switch_mode(product_type, mode, symbol, coin)
1751 .await
1752 }
1753
1754 pub async fn set_trading_stop(
1767 &self,
1768 params: &BybitSetTradingStopParams,
1769 ) -> Result<BybitSetTradingStopResponse, BybitHttpError> {
1770 self.inner.set_trading_stop(params).await
1771 }
1772
1773 pub async fn get_spot_borrow_amount(&self, coin: &str) -> anyhow::Result<Decimal> {
1788 let params = BybitWalletBalanceParams {
1789 account_type: BybitAccountType::Unified,
1790 coin: Some(coin.to_string()),
1791 };
1792
1793 let response = self.inner.get_wallet_balance(¶ms).await?;
1794
1795 let borrow_amount = response
1796 .result
1797 .list
1798 .first()
1799 .and_then(|wallet| wallet.coin.iter().find(|c| c.coin.as_str() == coin))
1800 .map_or(Decimal::ZERO, |balance| balance.spot_borrow);
1801
1802 Ok(borrow_amount)
1803 }
1804
1805 pub async fn borrow_spot(
1821 &self,
1822 coin: &str,
1823 amount: Quantity,
1824 ) -> anyhow::Result<BybitBorrowResponse> {
1825 let amount_str = amount.to_string();
1826 self.inner
1827 .borrow(coin, &amount_str)
1828 .await
1829 .map_err(|e| anyhow::anyhow!("Failed to borrow {amount} {coin}: {e}"))
1830 }
1831
1832 pub async fn repay_spot_borrow(
1849 &self,
1850 coin: &str,
1851 amount: Option<Quantity>,
1852 ) -> anyhow::Result<BybitNoConvertRepayResponse> {
1853 let amount_str = amount.as_ref().map(|q| q.to_string());
1854 self.inner
1855 .no_convert_repay(coin, amount_str.as_deref())
1856 .await
1857 .map_err(|e| anyhow::anyhow!("Failed to repay spot borrow for {coin}: {e}"))
1858 }
1859
1860 async fn generate_spot_position_reports_from_wallet(
1868 &self,
1869 account_id: AccountId,
1870 instrument_id: Option<InstrumentId>,
1871 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1872 let params = BybitWalletBalanceParams {
1873 account_type: BybitAccountType::Unified,
1874 coin: None,
1875 };
1876
1877 let response = self.inner.get_wallet_balance(¶ms).await?;
1878 let ts_init = self.generate_ts_init();
1879
1880 let mut wallet_by_coin: HashMap<Ustr, Decimal> = HashMap::new();
1881
1882 for wallet in &response.result.list {
1883 for coin_balance in &wallet.coin {
1884 let balance = coin_balance.wallet_balance - coin_balance.spot_borrow;
1885 *wallet_by_coin
1886 .entry(coin_balance.coin)
1887 .or_insert(Decimal::ZERO) += balance;
1888 }
1889 }
1890
1891 let mut reports = Vec::new();
1892
1893 if let Some(instrument_id) = instrument_id {
1894 if let Some(instrument) = self.instruments_cache.get(&instrument_id.symbol.inner()) {
1895 let base_currency = instrument
1896 .base_currency()
1897 .expect("SPOT instrument should have base currency");
1898 let coin = base_currency.code;
1899 let wallet_balance = wallet_by_coin.get(&coin).copied().unwrap_or(Decimal::ZERO);
1900
1901 let side = if wallet_balance > Decimal::ZERO {
1902 PositionSideSpecified::Long
1903 } else if wallet_balance < Decimal::ZERO {
1904 PositionSideSpecified::Short
1905 } else {
1906 PositionSideSpecified::Flat
1907 };
1908
1909 let abs_balance = wallet_balance.abs();
1910 let quantity = Quantity::from_decimal_dp(abs_balance, instrument.size_precision())?;
1911
1912 let report = PositionStatusReport::new(
1913 account_id,
1914 instrument_id,
1915 side,
1916 quantity,
1917 ts_init,
1918 ts_init,
1919 None,
1920 None,
1921 None,
1922 );
1923
1924 reports.push(report);
1925 }
1926 } else {
1927 for entry in self.instruments_cache.iter() {
1929 let symbol = entry.key();
1930 let instrument = entry.value();
1931 if !symbol.as_str().ends_with("-SPOT") {
1933 continue;
1934 }
1935
1936 let base_currency = match instrument.base_currency() {
1937 Some(currency) => currency,
1938 None => continue,
1939 };
1940
1941 let coin = base_currency.code;
1942 let wallet_balance = wallet_by_coin.get(&coin).copied().unwrap_or(Decimal::ZERO);
1943
1944 if wallet_balance.is_zero() {
1945 continue;
1946 }
1947
1948 let side = if wallet_balance > Decimal::ZERO {
1949 PositionSideSpecified::Long
1950 } else if wallet_balance < Decimal::ZERO {
1951 PositionSideSpecified::Short
1952 } else {
1953 PositionSideSpecified::Flat
1954 };
1955
1956 let abs_balance = wallet_balance.abs();
1957 let quantity = Quantity::from_decimal_dp(abs_balance, instrument.size_precision())?;
1958
1959 if quantity.is_zero() {
1960 continue;
1961 }
1962
1963 let report = PositionStatusReport::new(
1964 account_id,
1965 instrument.id(),
1966 side,
1967 quantity,
1968 ts_init,
1969 ts_init,
1970 None,
1971 None,
1972 None,
1973 );
1974
1975 reports.push(report);
1976 }
1977 }
1978
1979 Ok(reports)
1980 }
1981
1982 #[allow(clippy::too_many_arguments)]
1993 pub async fn submit_order(
1994 &self,
1995 account_id: AccountId,
1996 product_type: BybitProductType,
1997 instrument_id: InstrumentId,
1998 client_order_id: ClientOrderId,
1999 order_side: OrderSide,
2000 order_type: OrderType,
2001 quantity: Quantity,
2002 time_in_force: Option<TimeInForce>,
2003 price: Option<Price>,
2004 trigger_price: Option<Price>,
2005 post_only: Option<bool>,
2006 reduce_only: bool,
2007 is_quote_quantity: bool,
2008 is_leverage: bool,
2009 ) -> anyhow::Result<OrderStatusReport> {
2010 let instrument = self.instrument_from_cache(&instrument_id.symbol)?;
2011 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())?;
2012
2013 let bybit_side = match order_side {
2014 OrderSide::Buy => BybitOrderSide::Buy,
2015 OrderSide::Sell => BybitOrderSide::Sell,
2016 _ => anyhow::bail!("Invalid order side: {order_side:?}"),
2017 };
2018
2019 let (bybit_order_type, is_stop_order) = match order_type {
2021 OrderType::Market => (BybitOrderType::Market, false),
2022 OrderType::Limit => (BybitOrderType::Limit, false),
2023 OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
2024 OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
2025 _ => anyhow::bail!("Unsupported order type: {order_type:?}"),
2026 };
2027
2028 let bybit_tif = if bybit_order_type == BybitOrderType::Market {
2030 None
2031 } else if post_only == Some(true) {
2032 Some(BybitTimeInForce::PostOnly)
2033 } else if let Some(tif) = time_in_force {
2034 Some(match tif {
2035 TimeInForce::Gtc => BybitTimeInForce::Gtc,
2036 TimeInForce::Ioc => BybitTimeInForce::Ioc,
2037 TimeInForce::Fok => BybitTimeInForce::Fok,
2038 _ => anyhow::bail!("Unsupported time in force: {tif:?}"),
2039 })
2040 } else {
2041 None
2042 };
2043
2044 let market_unit = if product_type == BybitProductType::Spot
2046 && bybit_order_type == BybitOrderType::Market
2047 {
2048 if is_quote_quantity {
2049 Some(BYBIT_QUOTE_COIN.to_string())
2050 } else {
2051 Some(BYBIT_BASE_COIN.to_string())
2052 }
2053 } else {
2054 None
2055 };
2056
2057 let trigger_direction = if is_stop_order {
2060 match (order_type, order_side) {
2061 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
2062 Some(BybitTriggerDirection::RisesTo)
2063 }
2064 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
2065 Some(BybitTriggerDirection::FallsTo)
2066 }
2067 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
2068 Some(BybitTriggerDirection::FallsTo)
2069 }
2070 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
2071 Some(BybitTriggerDirection::RisesTo)
2072 }
2073 _ => None,
2074 }
2075 } else {
2076 None
2077 };
2078
2079 let mut order_entry = BybitBatchPlaceOrderEntryBuilder::default();
2080 order_entry.symbol(bybit_symbol.raw_symbol().to_string());
2081 order_entry.side(bybit_side);
2082 order_entry.order_type(bybit_order_type);
2083 order_entry.qty(quantity.to_string());
2084 order_entry.time_in_force(bybit_tif);
2085 order_entry.order_link_id(client_order_id.to_string());
2086 order_entry.market_unit(market_unit);
2087 order_entry.trigger_direction(trigger_direction);
2088
2089 if let Some(price) = price {
2090 order_entry.price(Some(price.to_string()));
2091 }
2092
2093 if let Some(trigger_price) = trigger_price {
2094 order_entry.trigger_price(Some(trigger_price.to_string()));
2095 }
2096
2097 if reduce_only {
2098 order_entry.reduce_only(Some(true));
2099 }
2100
2101 let is_leverage_value = if product_type == BybitProductType::Spot {
2103 Some(i32::from(is_leverage))
2104 } else {
2105 None
2106 };
2107 order_entry.is_leverage(is_leverage_value);
2108
2109 let order_entry = order_entry.build().map_err(|e| anyhow::anyhow!(e))?;
2110
2111 let mut params = BybitPlaceOrderParamsBuilder::default();
2112 params.category(product_type);
2113 params.order(order_entry);
2114
2115 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2116
2117 let body = serde_json::to_value(¶ms)?;
2118 let response = self.inner.place_order(&body).await?;
2119
2120 let order_id = response
2121 .result
2122 .order_id
2123 .ok_or_else(|| anyhow::anyhow!("No order_id in response"))?;
2124
2125 let mut query_params = BybitOpenOrdersParamsBuilder::default();
2127 query_params.category(product_type);
2128 query_params.order_id(order_id.as_str().to_string());
2129
2130 let query_params = query_params.build().map_err(|e| anyhow::anyhow!(e))?;
2131 let order_response: BybitOpenOrdersResponse = self
2132 .inner
2133 .send_request(
2134 Method::GET,
2135 "/v5/order/realtime",
2136 Some(&query_params),
2137 None,
2138 true,
2139 )
2140 .await?;
2141
2142 let order = order_response
2143 .result
2144 .list
2145 .into_iter()
2146 .next()
2147 .ok_or_else(|| anyhow::anyhow!("No order returned after submission"))?;
2148
2149 if order.order_status == crate::common::enums::BybitOrderStatus::Rejected
2152 && (order.cum_exec_qty.as_str() == "0" || order.cum_exec_qty.is_empty())
2153 {
2154 anyhow::bail!("Order rejected: {}", order.reject_reason);
2155 }
2156
2157 let ts_init = self.generate_ts_init();
2158
2159 parse_order_status_report(&order, &instrument, account_id, ts_init)
2160 }
2161
2162 pub async fn cancel_order(
2172 &self,
2173 account_id: AccountId,
2174 product_type: BybitProductType,
2175 instrument_id: InstrumentId,
2176 client_order_id: Option<ClientOrderId>,
2177 venue_order_id: Option<VenueOrderId>,
2178 ) -> anyhow::Result<OrderStatusReport> {
2179 let instrument = self.instrument_from_cache(&instrument_id.symbol)?;
2180 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())?;
2181
2182 let mut cancel_entry = BybitBatchCancelOrderEntryBuilder::default();
2183 cancel_entry.symbol(bybit_symbol.raw_symbol().to_string());
2184
2185 if let Some(venue_order_id) = venue_order_id {
2186 cancel_entry.order_id(venue_order_id.to_string());
2187 } else if let Some(client_order_id) = client_order_id {
2188 cancel_entry.order_link_id(client_order_id.to_string());
2189 } else {
2190 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
2191 }
2192
2193 let cancel_entry = cancel_entry.build().map_err(|e| anyhow::anyhow!(e))?;
2194
2195 let mut params = BybitCancelOrderParamsBuilder::default();
2196 params.category(product_type);
2197 params.order(cancel_entry);
2198
2199 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2200 let body = serde_json::to_vec(¶ms)?;
2201
2202 let response: BybitPlaceOrderResponse = self
2203 .inner
2204 .send_request::<_, ()>(Method::POST, "/v5/order/cancel", None, Some(body), true)
2205 .await?;
2206
2207 let order_id = response
2208 .result
2209 .order_id
2210 .ok_or_else(|| anyhow::anyhow!("No order_id in cancel response"))?;
2211
2212 let mut query_params = BybitOpenOrdersParamsBuilder::default();
2214 query_params.category(product_type);
2215 query_params.order_id(order_id.as_str().to_string());
2216
2217 let query_params = query_params.build().map_err(|e| anyhow::anyhow!(e))?;
2218 let order_response: BybitOrderHistoryResponse = self
2219 .inner
2220 .send_request(
2221 Method::GET,
2222 "/v5/order/history",
2223 Some(&query_params),
2224 None,
2225 true,
2226 )
2227 .await?;
2228
2229 let order = order_response
2230 .result
2231 .list
2232 .into_iter()
2233 .next()
2234 .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
2235
2236 let ts_init = self.generate_ts_init();
2237
2238 parse_order_status_report(&order, &instrument, account_id, ts_init)
2239 }
2240
2241 pub async fn batch_cancel_orders(
2251 &self,
2252 account_id: AccountId,
2253 product_type: BybitProductType,
2254 instrument_ids: Vec<InstrumentId>,
2255 client_order_ids: Vec<Option<ClientOrderId>>,
2256 venue_order_ids: Vec<Option<VenueOrderId>>,
2257 ) -> anyhow::Result<Vec<OrderStatusReport>> {
2258 if instrument_ids.len() != client_order_ids.len()
2259 || instrument_ids.len() != venue_order_ids.len()
2260 {
2261 anyhow::bail!(
2262 "instrument_ids, client_order_ids, and venue_order_ids must have the same length"
2263 );
2264 }
2265
2266 if instrument_ids.is_empty() {
2267 return Ok(Vec::new());
2268 }
2269
2270 if instrument_ids.len() > 20 {
2271 anyhow::bail!("Batch cancel limit is 20 orders per request");
2272 }
2273
2274 let mut cancel_entries = Vec::new();
2275
2276 for ((instrument_id, client_order_id), venue_order_id) in instrument_ids
2277 .iter()
2278 .zip(client_order_ids.iter())
2279 .zip(venue_order_ids.iter())
2280 {
2281 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())?;
2282 let mut cancel_entry = BybitBatchCancelOrderEntryBuilder::default();
2283 cancel_entry.symbol(bybit_symbol.raw_symbol().to_string());
2284
2285 if let Some(venue_order_id) = venue_order_id {
2286 cancel_entry.order_id(venue_order_id.to_string());
2287 } else if let Some(client_order_id) = client_order_id {
2288 cancel_entry.order_link_id(client_order_id.to_string());
2289 } else {
2290 anyhow::bail!(
2291 "Either client_order_id or venue_order_id must be provided for each order"
2292 );
2293 }
2294
2295 cancel_entries.push(cancel_entry.build().map_err(|e| anyhow::anyhow!(e))?);
2296 }
2297
2298 let mut params = BybitBatchCancelOrderParamsBuilder::default();
2299 params.category(product_type);
2300 params.request(cancel_entries);
2301
2302 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2303 let body = serde_json::to_vec(¶ms)?;
2304
2305 let _response: BybitPlaceOrderResponse = self
2306 .inner
2307 .send_request::<_, ()>(
2308 Method::POST,
2309 "/v5/order/cancel-batch",
2310 None,
2311 Some(body),
2312 true,
2313 )
2314 .await?;
2315
2316 let mut reports = Vec::new();
2318 for (instrument_id, (client_order_id, venue_order_id)) in instrument_ids
2319 .iter()
2320 .zip(client_order_ids.iter().zip(venue_order_ids.iter()))
2321 {
2322 let Ok(instrument) = self.instrument_from_cache(&instrument_id.symbol) else {
2323 log::debug!(
2324 "Skipping cancelled order report for instrument not in cache: symbol={}",
2325 instrument_id.symbol
2326 );
2327 continue;
2328 };
2329
2330 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())?;
2331
2332 let mut query_params = BybitOpenOrdersParamsBuilder::default();
2333 query_params.category(product_type);
2334 query_params.symbol(bybit_symbol.raw_symbol().to_string());
2335
2336 if let Some(venue_order_id) = venue_order_id {
2337 query_params.order_id(venue_order_id.to_string());
2338 } else if let Some(client_order_id) = client_order_id {
2339 query_params.order_link_id(client_order_id.to_string());
2340 }
2341
2342 let query_params = query_params.build().map_err(|e| anyhow::anyhow!(e))?;
2343 let order_response: BybitOrderHistoryResponse = self
2344 .inner
2345 .send_request(
2346 Method::GET,
2347 "/v5/order/history",
2348 Some(&query_params),
2349 None,
2350 true,
2351 )
2352 .await?;
2353
2354 if let Some(order) = order_response.result.list.into_iter().next() {
2355 let ts_init = self.generate_ts_init();
2356 let report = parse_order_status_report(&order, &instrument, account_id, ts_init)?;
2357 reports.push(report);
2358 }
2359 }
2360
2361 Ok(reports)
2362 }
2363
2364 pub async fn cancel_all_orders(
2373 &self,
2374 account_id: AccountId,
2375 product_type: BybitProductType,
2376 instrument_id: InstrumentId,
2377 ) -> anyhow::Result<Vec<OrderStatusReport>> {
2378 let instrument = self.instrument_from_cache(&instrument_id.symbol)?;
2379 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())?;
2380
2381 let mut params = BybitCancelAllOrdersParamsBuilder::default();
2382 params.category(product_type);
2383 params.symbol(bybit_symbol.raw_symbol().to_string());
2384
2385 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2386 let body = serde_json::to_vec(¶ms)?;
2387
2388 let _response: crate::common::models::BybitListResponse<serde_json::Value> = self
2389 .inner
2390 .send_request::<_, ()>(Method::POST, "/v5/order/cancel-all", None, Some(body), true)
2391 .await?;
2392
2393 let mut query_params = BybitOrderHistoryParamsBuilder::default();
2395 query_params.category(product_type);
2396 query_params.symbol(bybit_symbol.raw_symbol().to_string());
2397 query_params.limit(50u32);
2398
2399 let query_params = query_params.build().map_err(|e| anyhow::anyhow!(e))?;
2400 let order_response: BybitOrderHistoryResponse = self
2401 .inner
2402 .send_request(
2403 Method::GET,
2404 "/v5/order/history",
2405 Some(&query_params),
2406 None,
2407 true,
2408 )
2409 .await?;
2410
2411 let ts_init = self.generate_ts_init();
2412
2413 let mut reports = Vec::new();
2414 for order in order_response.result.list {
2415 if let Ok(report) = parse_order_status_report(&order, &instrument, account_id, ts_init)
2416 {
2417 reports.push(report);
2418 }
2419 }
2420
2421 Ok(reports)
2422 }
2423
2424 #[allow(clippy::too_many_arguments)]
2435 pub async fn modify_order(
2436 &self,
2437 account_id: AccountId,
2438 product_type: BybitProductType,
2439 instrument_id: InstrumentId,
2440 client_order_id: Option<ClientOrderId>,
2441 venue_order_id: Option<VenueOrderId>,
2442 quantity: Option<Quantity>,
2443 price: Option<Price>,
2444 ) -> anyhow::Result<OrderStatusReport> {
2445 let instrument = self.instrument_from_cache(&instrument_id.symbol)?;
2446 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())?;
2447
2448 let mut amend_entry = BybitBatchAmendOrderEntryBuilder::default();
2449 amend_entry.symbol(bybit_symbol.raw_symbol().to_string());
2450
2451 if let Some(venue_order_id) = venue_order_id {
2452 amend_entry.order_id(venue_order_id.to_string());
2453 } else if let Some(client_order_id) = client_order_id {
2454 amend_entry.order_link_id(client_order_id.to_string());
2455 } else {
2456 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
2457 }
2458
2459 if let Some(quantity) = quantity {
2460 amend_entry.qty(Some(quantity.to_string()));
2461 }
2462
2463 if let Some(price) = price {
2464 amend_entry.price(Some(price.to_string()));
2465 }
2466
2467 let amend_entry = amend_entry.build().map_err(|e| anyhow::anyhow!(e))?;
2468
2469 let mut params = BybitAmendOrderParamsBuilder::default();
2470 params.category(product_type);
2471 params.order(amend_entry);
2472
2473 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2474 let body = serde_json::to_vec(¶ms)?;
2475
2476 let response: BybitPlaceOrderResponse = self
2477 .inner
2478 .send_request::<_, ()>(Method::POST, "/v5/order/amend", None, Some(body), true)
2479 .await?;
2480
2481 let order_id = response
2482 .result
2483 .order_id
2484 .ok_or_else(|| anyhow::anyhow!("No order_id in amend response"))?;
2485
2486 let mut query_params = BybitOpenOrdersParamsBuilder::default();
2488 query_params.category(product_type);
2489 query_params.order_id(order_id.as_str().to_string());
2490
2491 let query_params = query_params.build().map_err(|e| anyhow::anyhow!(e))?;
2492 let order_response: BybitOpenOrdersResponse = self
2493 .inner
2494 .send_request(
2495 Method::GET,
2496 "/v5/order/realtime",
2497 Some(&query_params),
2498 None,
2499 true,
2500 )
2501 .await?;
2502
2503 let order = order_response
2504 .result
2505 .list
2506 .into_iter()
2507 .next()
2508 .ok_or_else(|| anyhow::anyhow!("No order returned after modification"))?;
2509
2510 let ts_init = self.generate_ts_init();
2511
2512 parse_order_status_report(&order, &instrument, account_id, ts_init)
2513 }
2514
2515 pub async fn query_order(
2524 &self,
2525 account_id: AccountId,
2526 product_type: BybitProductType,
2527 instrument_id: InstrumentId,
2528 client_order_id: Option<ClientOrderId>,
2529 venue_order_id: Option<VenueOrderId>,
2530 ) -> anyhow::Result<Option<OrderStatusReport>> {
2531 log::debug!(
2532 "query_order: instrument_id={instrument_id}, client_order_id={client_order_id:?}, venue_order_id={venue_order_id:?}"
2533 );
2534
2535 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())?;
2536
2537 let mut params = BybitOpenOrdersParamsBuilder::default();
2538 params.category(product_type);
2539 params.symbol(bybit_symbol.raw_symbol().to_string());
2541
2542 if let Some(venue_order_id) = venue_order_id {
2543 params.order_id(venue_order_id.to_string());
2544 } else if let Some(client_order_id) = client_order_id {
2545 params.order_link_id(client_order_id.to_string());
2546 } else {
2547 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
2548 }
2549
2550 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2551 let mut response: BybitOpenOrdersResponse = self
2552 .inner
2553 .send_request(Method::GET, "/v5/order/realtime", Some(¶ms), None, true)
2554 .await?;
2555
2556 if response.result.list.is_empty() {
2557 log::debug!("Order not found in open orders, trying with StopOrder filter");
2558
2559 let mut stop_params = BybitOpenOrdersParamsBuilder::default();
2560 stop_params.category(product_type);
2561 stop_params.symbol(bybit_symbol.raw_symbol().to_string());
2562 stop_params.order_filter(BybitOrderFilter::StopOrder);
2563
2564 if let Some(venue_order_id) = venue_order_id {
2565 stop_params.order_id(venue_order_id.to_string());
2566 } else if let Some(client_order_id) = client_order_id {
2567 stop_params.order_link_id(client_order_id.to_string());
2568 }
2569
2570 let stop_params = stop_params.build().map_err(|e| anyhow::anyhow!(e))?;
2571 response = self
2572 .inner
2573 .send_request(
2574 Method::GET,
2575 "/v5/order/realtime",
2576 Some(&stop_params),
2577 None,
2578 true,
2579 )
2580 .await?;
2581 }
2582
2583 if response.result.list.is_empty() {
2585 log::debug!("Order not found in open orders, checking order history");
2586
2587 let mut history_params = BybitOrderHistoryParamsBuilder::default();
2588 history_params.category(product_type);
2589 history_params.symbol(bybit_symbol.raw_symbol().to_string());
2590
2591 if let Some(venue_order_id) = venue_order_id {
2592 history_params.order_id(venue_order_id.to_string());
2593 } else if let Some(client_order_id) = client_order_id {
2594 history_params.order_link_id(client_order_id.to_string());
2595 }
2596
2597 let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
2598
2599 let mut history_response: BybitOrderHistoryResponse = self
2600 .inner
2601 .send_request(
2602 Method::GET,
2603 "/v5/order/history",
2604 Some(&history_params),
2605 None,
2606 true,
2607 )
2608 .await?;
2609
2610 if history_response.result.list.is_empty() {
2611 log::debug!("Order not found in order history, trying with StopOrder filter");
2612
2613 let mut stop_history_params = BybitOrderHistoryParamsBuilder::default();
2614 stop_history_params.category(product_type);
2615 stop_history_params.symbol(bybit_symbol.raw_symbol().to_string());
2616 stop_history_params.order_filter(BybitOrderFilter::StopOrder);
2617
2618 if let Some(venue_order_id) = venue_order_id {
2619 stop_history_params.order_id(venue_order_id.to_string());
2620 } else if let Some(client_order_id) = client_order_id {
2621 stop_history_params.order_link_id(client_order_id.to_string());
2622 }
2623
2624 let stop_history_params = stop_history_params
2625 .build()
2626 .map_err(|e| anyhow::anyhow!(e))?;
2627
2628 history_response = self
2629 .inner
2630 .send_request(
2631 Method::GET,
2632 "/v5/order/history",
2633 Some(&stop_history_params),
2634 None,
2635 true,
2636 )
2637 .await?;
2638
2639 if history_response.result.list.is_empty() {
2640 log::debug!("Order not found in order history with StopOrder filter either");
2641 return Ok(None);
2642 }
2643 }
2644
2645 response.result.list = history_response.result.list;
2647 }
2648
2649 let order = &response.result.list[0];
2650 let ts_init = self.generate_ts_init();
2651
2652 log::debug!(
2653 "Query order response: symbol={}, order_id={}, order_link_id={}",
2654 order.symbol.as_str(),
2655 order.order_id.as_str(),
2656 order.order_link_id.as_str()
2657 );
2658
2659 let instrument = self
2660 .instrument_from_cache(&instrument_id.symbol)
2661 .map_err(|e| {
2662 log::error!(
2663 "Instrument cache miss for symbol '{}': {}",
2664 instrument_id.symbol.as_str(),
2665 e
2666 );
2667 anyhow::anyhow!(
2668 "Failed to query order {}: {}",
2669 client_order_id
2670 .as_ref()
2671 .map(|id| id.to_string())
2672 .or_else(|| venue_order_id.as_ref().map(|id| id.to_string()))
2673 .unwrap_or_else(|| "unknown".to_string()),
2674 e
2675 )
2676 })?;
2677
2678 log::debug!("Retrieved instrument from cache: id={}", instrument.id());
2679
2680 let report =
2681 parse_order_status_report(order, &instrument, account_id, ts_init).map_err(|e| {
2682 log::error!(
2683 "Failed to parse order status report for {}: {}",
2684 order.order_link_id.as_str(),
2685 e
2686 );
2687 e
2688 })?;
2689
2690 log::debug!(
2691 "Successfully created OrderStatusReport for {}",
2692 order.order_link_id.as_str()
2693 );
2694
2695 Ok(Some(report))
2696 }
2697
2698 pub async fn request_instruments(
2704 &self,
2705 product_type: BybitProductType,
2706 symbol: Option<String>,
2707 ) -> anyhow::Result<Vec<InstrumentAny>> {
2708 let ts_init = self.generate_ts_init();
2709
2710 let mut instruments = Vec::new();
2711
2712 let default_fee_rate = |symbol: ustr::Ustr| BybitFeeRate {
2713 symbol,
2714 taker_fee_rate: "0.001".to_string(),
2715 maker_fee_rate: "0.001".to_string(),
2716 base_coin: None,
2717 };
2718
2719 match product_type {
2720 BybitProductType::Spot => {
2721 let fee_map: AHashMap<_, _> = {
2723 let mut fee_params = BybitFeeRateParamsBuilder::default();
2724 fee_params.category(product_type);
2725 if let Ok(params) = fee_params.build() {
2726 match self.inner.get_fee_rate(¶ms).await {
2727 Ok(fee_response) => fee_response
2728 .result
2729 .list
2730 .into_iter()
2731 .map(|f| (f.symbol, f))
2732 .collect(),
2733 Err(BybitHttpError::MissingCredentials) => {
2734 log::warn!("Missing credentials for fee rates, using defaults");
2735 AHashMap::new()
2736 }
2737 Err(e) => return Err(e.into()),
2738 }
2739 } else {
2740 AHashMap::new()
2741 }
2742 };
2743
2744 let mut cursor: Option<String> = None;
2745
2746 loop {
2747 let params = BybitInstrumentsInfoParams {
2748 category: product_type,
2749 symbol: symbol.clone(),
2750 status: None,
2751 base_coin: None,
2752 limit: Some(1000),
2753 cursor: cursor.clone(),
2754 };
2755
2756 let response: BybitInstrumentSpotResponse =
2757 self.inner.get_instruments(¶ms).await?;
2758
2759 for definition in response.result.list {
2760 let fee_rate = fee_map
2761 .get(&definition.symbol)
2762 .cloned()
2763 .unwrap_or_else(|| default_fee_rate(definition.symbol));
2764 if let Ok(instrument) =
2765 parse_spot_instrument(&definition, &fee_rate, ts_init, ts_init)
2766 {
2767 instruments.push(instrument);
2768 }
2769 }
2770
2771 cursor = response.result.next_page_cursor;
2772 if cursor.as_ref().is_none_or(|c| c.is_empty()) {
2773 break;
2774 }
2775 }
2776 }
2777 BybitProductType::Linear => {
2778 let fee_map: AHashMap<_, _> = {
2780 let mut fee_params = BybitFeeRateParamsBuilder::default();
2781 fee_params.category(product_type);
2782 if let Ok(params) = fee_params.build() {
2783 match self.inner.get_fee_rate(¶ms).await {
2784 Ok(fee_response) => fee_response
2785 .result
2786 .list
2787 .into_iter()
2788 .map(|f| (f.symbol, f))
2789 .collect(),
2790 Err(BybitHttpError::MissingCredentials) => {
2791 log::warn!("Missing credentials for fee rates, using defaults");
2792 AHashMap::new()
2793 }
2794 Err(e) => return Err(e.into()),
2795 }
2796 } else {
2797 AHashMap::new()
2798 }
2799 };
2800
2801 let mut cursor: Option<String> = None;
2802
2803 loop {
2804 let params = BybitInstrumentsInfoParams {
2805 category: product_type,
2806 symbol: symbol.clone(),
2807 status: None,
2808 base_coin: None,
2809 limit: Some(1000),
2810 cursor: cursor.clone(),
2811 };
2812
2813 let response: BybitInstrumentLinearResponse =
2814 self.inner.get_instruments(¶ms).await?;
2815
2816 for definition in response.result.list {
2817 let fee_rate = fee_map
2818 .get(&definition.symbol)
2819 .cloned()
2820 .unwrap_or_else(|| default_fee_rate(definition.symbol));
2821 if let Ok(instrument) =
2822 parse_linear_instrument(&definition, &fee_rate, ts_init, ts_init)
2823 {
2824 instruments.push(instrument);
2825 }
2826 }
2827
2828 cursor = response.result.next_page_cursor;
2829 if cursor.as_ref().is_none_or(|c| c.is_empty()) {
2830 break;
2831 }
2832 }
2833 }
2834 BybitProductType::Inverse => {
2835 let fee_map: AHashMap<_, _> = {
2837 let mut fee_params = BybitFeeRateParamsBuilder::default();
2838 fee_params.category(product_type);
2839 if let Ok(params) = fee_params.build() {
2840 match self.inner.get_fee_rate(¶ms).await {
2841 Ok(fee_response) => fee_response
2842 .result
2843 .list
2844 .into_iter()
2845 .map(|f| (f.symbol, f))
2846 .collect(),
2847 Err(BybitHttpError::MissingCredentials) => {
2848 log::warn!("Missing credentials for fee rates, using defaults");
2849 AHashMap::new()
2850 }
2851 Err(e) => return Err(e.into()),
2852 }
2853 } else {
2854 AHashMap::new()
2855 }
2856 };
2857
2858 let mut cursor: Option<String> = None;
2859
2860 loop {
2861 let params = BybitInstrumentsInfoParams {
2862 category: product_type,
2863 symbol: symbol.clone(),
2864 status: None,
2865 base_coin: None,
2866 limit: Some(1000),
2867 cursor: cursor.clone(),
2868 };
2869
2870 let response: BybitInstrumentInverseResponse =
2871 self.inner.get_instruments(¶ms).await?;
2872
2873 for definition in response.result.list {
2874 let fee_rate = fee_map
2875 .get(&definition.symbol)
2876 .cloned()
2877 .unwrap_or_else(|| default_fee_rate(definition.symbol));
2878 if let Ok(instrument) =
2879 parse_inverse_instrument(&definition, &fee_rate, ts_init, ts_init)
2880 {
2881 instruments.push(instrument);
2882 }
2883 }
2884
2885 cursor = response.result.next_page_cursor;
2886 if cursor.as_ref().is_none_or(|c| c.is_empty()) {
2887 break;
2888 }
2889 }
2890 }
2891 BybitProductType::Option => {
2892 let mut cursor: Option<String> = None;
2893
2894 loop {
2895 let params = BybitInstrumentsInfoParams {
2896 category: product_type,
2897 symbol: symbol.clone(),
2898 status: None,
2899 base_coin: None,
2900 limit: Some(1000),
2901 cursor: cursor.clone(),
2902 };
2903
2904 let response: BybitInstrumentOptionResponse =
2905 self.inner.get_instruments(¶ms).await?;
2906
2907 for definition in response.result.list {
2908 if let Ok(instrument) =
2909 parse_option_instrument(&definition, ts_init, ts_init)
2910 {
2911 instruments.push(instrument);
2912 }
2913 }
2914
2915 cursor = response.result.next_page_cursor;
2916 if cursor.as_ref().is_none_or(|c| c.is_empty()) {
2917 break;
2918 }
2919 }
2920 }
2921 }
2922
2923 for instrument in &instruments {
2924 self.cache_instrument(instrument.clone());
2925 }
2926
2927 Ok(instruments)
2928 }
2929
2930 pub async fn request_tickers(
2943 &self,
2944 params: &BybitTickersParams,
2945 ) -> anyhow::Result<Vec<BybitTickerData>> {
2946 use super::models::{
2947 BybitTickersLinearResponse, BybitTickersOptionResponse, BybitTickersSpotResponse,
2948 };
2949
2950 match params.category {
2951 BybitProductType::Spot => {
2952 let response: BybitTickersSpotResponse = self.inner.get_tickers(params).await?;
2953 Ok(response.result.list.into_iter().map(Into::into).collect())
2954 }
2955 BybitProductType::Linear | BybitProductType::Inverse => {
2956 let response: BybitTickersLinearResponse = self.inner.get_tickers(params).await?;
2957 Ok(response.result.list.into_iter().map(Into::into).collect())
2958 }
2959 BybitProductType::Option => {
2960 let response: BybitTickersOptionResponse = self.inner.get_tickers(params).await?;
2961 Ok(response.result.list.into_iter().map(Into::into).collect())
2962 }
2963 }
2964 }
2965
2966 pub async fn request_trades(
2986 &self,
2987 product_type: BybitProductType,
2988 instrument_id: InstrumentId,
2989 limit: Option<u32>,
2990 ) -> anyhow::Result<Vec<TradeTick>> {
2991 let instrument = self.instrument_from_cache(&instrument_id.symbol)?;
2992 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())?;
2993
2994 let mut params_builder = BybitTradesParamsBuilder::default();
2995 params_builder.category(product_type);
2996 params_builder.symbol(bybit_symbol.raw_symbol().to_string());
2997 if let Some(limit_val) = limit {
2998 params_builder.limit(limit_val);
2999 }
3000
3001 let params = params_builder.build().map_err(|e| anyhow::anyhow!(e))?;
3002 let response = self.inner.get_recent_trades(¶ms).await?;
3003
3004 let mut trades = Vec::new();
3005
3006 for trade in response.result.list {
3007 if let Ok(trade_tick) = parse_trade_tick(&trade, &instrument, None) {
3008 trades.push(trade_tick);
3009 }
3010 }
3011
3012 Ok(trades)
3013 }
3014
3015 pub async fn request_funding_rates(
3028 &self,
3029 product_type: BybitProductType,
3030 instrument_id: InstrumentId,
3031 start: Option<DateTime<Utc>>,
3032 end: Option<DateTime<Utc>>,
3033 limit: Option<u32>,
3034 ) -> anyhow::Result<Vec<FundingRateUpdate>> {
3035 let instrument = self.instrument_from_cache(&instrument_id.symbol)?;
3036 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())?;
3037
3038 let start_ms = start.map(|dt| dt.timestamp_millis());
3039 let mut seen_timestamps: AHashSet<i64> = AHashSet::new();
3040
3041 let mut pages: Vec<Vec<FundingRateUpdate>> = Vec::new();
3042 let mut total_funding_rates = 0usize;
3043 let mut current_end = end.map(|dt| dt.timestamp_millis());
3044
3045 loop {
3046 let mut params_builder = BybitFundingParamsBuilder::default();
3047 params_builder.category(product_type);
3048 params_builder.symbol(bybit_symbol.raw_symbol().to_string());
3049 params_builder.limit(200u32); if let Some(start_val) = start_ms {
3052 params_builder.start_time(start_val);
3053 }
3054 if let Some(end_val) = current_end {
3055 params_builder.end_time(end_val);
3056 }
3057
3058 let params = params_builder.build().map_err(|e| anyhow::anyhow!(e))?;
3059 let response = self.inner.get_funding_history(¶ms).await?;
3060
3061 let funding_rates = response.result.list;
3062 if funding_rates.is_empty() {
3063 break;
3064 }
3065
3066 let mut funding_rates_with_ts: Vec<(i64, _)> = funding_rates
3067 .into_iter()
3068 .filter_map(|f| {
3069 f.funding_rate_timestamp
3070 .parse::<i64>()
3071 .ok()
3072 .map(|ts| (ts, f))
3073 })
3074 .collect();
3075
3076 funding_rates_with_ts.sort_by_key(|(ts, _)| *ts);
3077
3078 let has_new = funding_rates_with_ts
3079 .iter()
3080 .any(|(ts, _)| !seen_timestamps.contains(ts));
3081 if !has_new {
3082 break;
3083 }
3084
3085 let mut page_funding_rates = Vec::with_capacity(funding_rates_with_ts.len());
3086
3087 let mut earliest_ts: Option<i64> = None;
3088
3089 for (time, funding) in &funding_rates_with_ts {
3090 if earliest_ts.is_none_or(|ts| *time < ts) {
3092 earliest_ts = Some(*time);
3093 }
3094
3095 if !seen_timestamps.contains(time)
3096 && let Ok(funding_rate) = parse_funding_rate(funding, &instrument)
3097 {
3098 page_funding_rates.push(funding_rate);
3099 seen_timestamps.insert(*time);
3100 }
3101 }
3102
3103 total_funding_rates += page_funding_rates.len();
3104 pages.push(page_funding_rates);
3105
3106 if let Some(limit_val) = limit
3108 && total_funding_rates >= limit_val as usize
3109 {
3110 break;
3111 }
3112
3113 let Some(earliest_funding_time) = earliest_ts else {
3116 break;
3117 };
3118 if let Some(start_val) = start_ms
3119 && earliest_funding_time <= start_val
3120 {
3121 break;
3122 }
3123
3124 current_end = Some(earliest_funding_time - 1);
3125 }
3126
3127 let mut all_funding_rates: Vec<FundingRateUpdate> = Vec::with_capacity(total_funding_rates);
3129 for page in pages.into_iter().rev() {
3130 all_funding_rates.extend(page);
3131 }
3132
3133 if let Some(limit_val) = limit {
3135 let limit_usize = limit_val as usize;
3136 if all_funding_rates.len() > limit_usize {
3137 let start_idx = all_funding_rates.len() - limit_usize;
3138 return Ok(all_funding_rates[start_idx..].to_vec());
3139 }
3140 }
3141
3142 Ok(all_funding_rates)
3143 }
3144
3145 pub async fn request_orderbook_snapshot(
3163 &self,
3164 product_type: BybitProductType,
3165 instrument_id: InstrumentId,
3166 limit: Option<u32>,
3167 ) -> anyhow::Result<OrderBookDeltas> {
3168 let instrument = self.instrument_from_cache(&instrument_id.symbol)?;
3169 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())?;
3170
3171 let mut params_builder = BybitOrderbookParamsBuilder::default();
3172 params_builder.category(product_type);
3173 params_builder.symbol(bybit_symbol.raw_symbol().to_string());
3174
3175 if let Some(limit) = limit {
3176 let max_limit = match product_type {
3177 BybitProductType::Spot => 200,
3178 BybitProductType::Option => 25,
3179 BybitProductType::Linear | BybitProductType::Inverse => 500,
3180 };
3181 let clamped_limit = limit.min(max_limit);
3182 if limit > max_limit {
3183 log::warn!(
3184 "Bybit orderbook snapshot request depth limit exceeds venue maximum; clamping: limit={limit}, clamped_limit={clamped_limit}",
3185 );
3186 }
3187 params_builder.limit(clamped_limit);
3188 }
3189
3190 let params = params_builder.build().map_err(|e| anyhow::anyhow!(e))?;
3191 let response = self.inner.get_orderbook(¶ms).await?;
3192
3193 let deltas = parse_orderbook(&response.result, &instrument, None)?;
3194
3195 Ok(deltas)
3196 }
3197
3198 pub async fn request_bars(
3211 &self,
3212 product_type: BybitProductType,
3213 bar_type: BarType,
3214 start: Option<DateTime<Utc>>,
3215 end: Option<DateTime<Utc>>,
3216 limit: Option<u32>,
3217 timestamp_on_close: bool,
3218 ) -> anyhow::Result<Vec<Bar>> {
3219 let instrument = self.instrument_from_cache(&bar_type.instrument_id().symbol)?;
3220 let bybit_symbol = BybitSymbol::new(bar_type.instrument_id().symbol.as_str())?;
3221
3222 let interval = bar_spec_to_bybit_interval(
3224 bar_type.spec().aggregation,
3225 bar_type.spec().step.get() as u64,
3226 )?;
3227
3228 let start_ms = start.map(|dt| dt.timestamp_millis());
3229 let mut seen_timestamps: AHashSet<i64> = AHashSet::new();
3230 let current_time_ms = get_atomic_clock_realtime().get_time_ms() as i64;
3231
3232 let mut pages: Vec<Vec<Bar>> = Vec::new();
3242 let mut total_bars = 0usize;
3243 let mut current_end = end.map(|dt| dt.timestamp_millis());
3244 let mut page_count = 0;
3245
3246 loop {
3247 page_count += 1;
3248
3249 let mut params_builder = BybitKlinesParamsBuilder::default();
3250 params_builder.category(product_type);
3251 params_builder.symbol(bybit_symbol.raw_symbol().to_string());
3252 params_builder.interval(interval);
3253 params_builder.limit(1000u32); if let Some(start_val) = start_ms {
3256 params_builder.start(start_val);
3257 }
3258 if let Some(end_val) = current_end {
3259 params_builder.end(end_val);
3260 }
3261
3262 let params = params_builder.build().map_err(|e| anyhow::anyhow!(e))?;
3263 let response = self.inner.get_klines(¶ms).await?;
3264
3265 let klines = response.result.list;
3266 if klines.is_empty() {
3267 break;
3268 }
3269
3270 let mut klines_with_ts: Vec<(i64, _)> = klines
3272 .into_iter()
3273 .filter_map(|k| k.start.parse::<i64>().ok().map(|ts| (ts, k)))
3274 .collect();
3275
3276 klines_with_ts.sort_by_key(|(ts, _)| *ts);
3277
3278 let has_new = klines_with_ts
3280 .iter()
3281 .any(|(ts, _)| !seen_timestamps.contains(ts));
3282 if !has_new {
3283 break;
3284 }
3285
3286 let mut page_bars = Vec::with_capacity(klines_with_ts.len());
3287
3288 let mut earliest_ts: Option<i64> = None;
3289
3290 for (start_time, kline) in &klines_with_ts {
3291 if earliest_ts.is_none_or(|ts| *start_time < ts) {
3293 earliest_ts = Some(*start_time);
3294 }
3295
3296 let bar_end_time = interval.bar_end_time_ms(*start_time);
3297 if bar_end_time > current_time_ms {
3298 continue;
3299 }
3300
3301 if !seen_timestamps.contains(start_time)
3302 && let Ok(bar) =
3303 parse_kline_bar(kline, &instrument, bar_type, timestamp_on_close, None)
3304 {
3305 page_bars.push(bar);
3306 seen_timestamps.insert(*start_time);
3307 }
3308 }
3309
3310 total_bars += page_bars.len();
3313 pages.push(page_bars);
3314
3315 if let Some(limit_val) = limit
3317 && total_bars >= limit_val as usize
3318 {
3319 break;
3320 }
3321
3322 let Some(earliest_bar_time) = earliest_ts else {
3325 break;
3326 };
3327 if let Some(start_val) = start_ms
3328 && earliest_bar_time <= start_val
3329 {
3330 break;
3331 }
3332
3333 current_end = Some(earliest_bar_time - 1);
3334
3335 if page_count > 100 {
3337 break;
3338 }
3339 }
3340
3341 let mut all_bars: Vec<Bar> = Vec::with_capacity(total_bars);
3343 for page in pages.into_iter().rev() {
3344 all_bars.extend(page);
3345 }
3346
3347 if let Some(limit_val) = limit {
3349 let limit_usize = limit_val as usize;
3350 if all_bars.len() > limit_usize {
3351 let start_idx = all_bars.len() - limit_usize;
3352 return Ok(all_bars[start_idx..].to_vec());
3353 }
3354 }
3355
3356 Ok(all_bars)
3357 }
3358
3359 pub async fn request_fee_rates(
3371 &self,
3372 product_type: BybitProductType,
3373 symbol: Option<String>,
3374 base_coin: Option<String>,
3375 ) -> anyhow::Result<Vec<BybitFeeRate>> {
3376 let params = BybitFeeRateParams {
3377 category: product_type,
3378 symbol,
3379 base_coin,
3380 };
3381
3382 let response = self.inner.get_fee_rate(¶ms).await?;
3383 Ok(response.result.list)
3384 }
3385
3386 pub async fn request_account_state(
3398 &self,
3399 account_type: BybitAccountType,
3400 account_id: AccountId,
3401 ) -> anyhow::Result<AccountState> {
3402 let params = BybitWalletBalanceParams {
3403 account_type,
3404 coin: None,
3405 };
3406
3407 let response = self.inner.get_wallet_balance(¶ms).await?;
3408 let ts_init = self.generate_ts_init();
3409
3410 let wallet_balance = response
3412 .result
3413 .list
3414 .first()
3415 .ok_or_else(|| anyhow::anyhow!("No wallet balance found in response"))?;
3416
3417 parse_account_state(wallet_balance, account_id, ts_init)
3418 }
3419
3420 #[allow(clippy::too_many_arguments)]
3431 pub async fn request_order_status_reports(
3432 &self,
3433 account_id: AccountId,
3434 product_type: BybitProductType,
3435 instrument_id: Option<InstrumentId>,
3436 open_only: bool,
3437 start: Option<DateTime<Utc>>,
3438 end: Option<DateTime<Utc>>,
3439 limit: Option<u32>,
3440 ) -> anyhow::Result<Vec<OrderStatusReport>> {
3441 let symbol_param = if let Some(id) = instrument_id.as_ref() {
3443 let symbol_str = id.symbol.as_str();
3444 if symbol_str.is_empty() {
3445 None
3446 } else {
3447 Some(BybitSymbol::new(symbol_str)?.raw_symbol().to_string())
3448 }
3449 } else {
3450 None
3451 };
3452
3453 let settle_coins_to_query: Vec<Option<String>> =
3456 if product_type == BybitProductType::Linear && symbol_param.is_none() {
3457 vec![Some("USDT".to_string()), Some("USDC".to_string())]
3458 } else {
3459 match product_type {
3460 BybitProductType::Inverse => vec![None],
3461 _ => vec![None],
3462 }
3463 };
3464
3465 let mut all_collected_orders = Vec::new();
3466 let mut total_collected_across_coins = 0;
3467
3468 for settle_coin in settle_coins_to_query {
3469 let remaining_limit = if let Some(limit) = limit {
3470 let remaining = (limit as usize).saturating_sub(total_collected_across_coins);
3471 if remaining == 0 {
3472 break;
3473 }
3474 Some(remaining as u32)
3475 } else {
3476 None
3477 };
3478
3479 let orders_for_coin = if open_only {
3480 let mut all_orders = Vec::new();
3481 let mut cursor: Option<String> = None;
3482 let mut total_orders = 0;
3483
3484 loop {
3485 let remaining = if let Some(limit) = remaining_limit {
3486 (limit as usize).saturating_sub(total_orders)
3487 } else {
3488 usize::MAX
3489 };
3490
3491 if remaining == 0 {
3492 break;
3493 }
3494
3495 let page_limit = std::cmp::min(remaining, 50);
3497
3498 let mut p = BybitOpenOrdersParamsBuilder::default();
3499 p.category(product_type);
3500 if let Some(symbol) = symbol_param.clone() {
3501 p.symbol(symbol);
3502 }
3503 if let Some(coin) = settle_coin.clone() {
3504 p.settle_coin(coin);
3505 }
3506 p.limit(page_limit as u32);
3507 if let Some(c) = cursor {
3508 p.cursor(c);
3509 }
3510 let params = p.build().map_err(|e| anyhow::anyhow!(e))?;
3511 let response: BybitOpenOrdersResponse = self
3512 .inner
3513 .send_request(Method::GET, "/v5/order/realtime", Some(¶ms), None, true)
3514 .await?;
3515
3516 total_orders += response.result.list.len();
3517 all_orders.extend(response.result.list);
3518
3519 cursor = response.result.next_page_cursor;
3520 if cursor.as_ref().is_none_or(|c| c.is_empty()) {
3521 break;
3522 }
3523 }
3524
3525 all_orders
3526 } else {
3527 let mut all_orders = Vec::new();
3530 let mut open_orders = Vec::new();
3531 let mut cursor: Option<String> = None;
3532 let mut total_open_orders = 0;
3533
3534 loop {
3535 let remaining = if let Some(limit) = remaining_limit {
3536 (limit as usize).saturating_sub(total_open_orders)
3537 } else {
3538 usize::MAX
3539 };
3540
3541 if remaining == 0 {
3542 break;
3543 }
3544
3545 let page_limit = std::cmp::min(remaining, 50);
3547
3548 let mut open_params = BybitOpenOrdersParamsBuilder::default();
3549 open_params.category(product_type);
3550 if let Some(symbol) = symbol_param.clone() {
3551 open_params.symbol(symbol);
3552 }
3553 if let Some(coin) = settle_coin.clone() {
3554 open_params.settle_coin(coin);
3555 }
3556 open_params.limit(page_limit as u32);
3557 if let Some(c) = cursor {
3558 open_params.cursor(c);
3559 }
3560 let open_params = open_params.build().map_err(|e| anyhow::anyhow!(e))?;
3561 let open_response: BybitOpenOrdersResponse = self
3562 .inner
3563 .send_request(
3564 Method::GET,
3565 "/v5/order/realtime",
3566 Some(&open_params),
3567 None,
3568 true,
3569 )
3570 .await?;
3571
3572 total_open_orders += open_response.result.list.len();
3573 open_orders.extend(open_response.result.list);
3574
3575 cursor = open_response.result.next_page_cursor;
3576 if cursor.is_none() || cursor.as_ref().is_none_or(|c| c.is_empty()) {
3577 break;
3578 }
3579 }
3580
3581 let seen_order_ids: AHashSet<Ustr> =
3582 open_orders.iter().map(|o| o.order_id).collect();
3583
3584 all_orders.extend(open_orders);
3585
3586 let mut cursor: Option<String> = None;
3587 let mut total_history_orders = 0;
3588
3589 loop {
3590 let total_orders = total_open_orders + total_history_orders;
3591 let remaining = if let Some(limit) = remaining_limit {
3592 (limit as usize).saturating_sub(total_orders)
3593 } else {
3594 usize::MAX
3595 };
3596
3597 if remaining == 0 {
3598 break;
3599 }
3600
3601 let page_limit = std::cmp::min(remaining, 50);
3603
3604 let mut history_params = BybitOrderHistoryParamsBuilder::default();
3605 history_params.category(product_type);
3606 if let Some(symbol) = symbol_param.clone() {
3607 history_params.symbol(symbol);
3608 }
3609 if let Some(coin) = settle_coin.clone() {
3610 history_params.settle_coin(coin);
3611 }
3612 if let Some(start) = start {
3613 history_params.start_time(start.timestamp_millis());
3614 }
3615 if let Some(end) = end {
3616 history_params.end_time(end.timestamp_millis());
3617 }
3618 history_params.limit(page_limit as u32);
3619 if let Some(c) = cursor {
3620 history_params.cursor(c);
3621 }
3622 let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
3623 let history_response: BybitOrderHistoryResponse = self
3624 .inner
3625 .send_request(
3626 Method::GET,
3627 "/v5/order/history",
3628 Some(&history_params),
3629 None,
3630 true,
3631 )
3632 .await?;
3633
3634 for order in history_response.result.list {
3636 if !seen_order_ids.contains(&order.order_id) {
3637 all_orders.push(order);
3638 total_history_orders += 1;
3639 }
3640 }
3641
3642 cursor = history_response.result.next_page_cursor;
3643 if cursor.is_none() || cursor.as_ref().is_none_or(|c| c.is_empty()) {
3644 break;
3645 }
3646 }
3647
3648 all_orders
3649 };
3650
3651 total_collected_across_coins += orders_for_coin.len();
3652 all_collected_orders.extend(orders_for_coin);
3653 }
3654
3655 let ts_init = self.generate_ts_init();
3656
3657 let mut reports = Vec::new();
3658 for order in all_collected_orders {
3659 if let Some(ref instrument_id) = instrument_id {
3660 let instrument = self.instrument_from_cache(&instrument_id.symbol)?;
3661 if let Ok(report) =
3662 parse_order_status_report(&order, &instrument, account_id, ts_init)
3663 {
3664 reports.push(report);
3665 }
3666 } else {
3667 if !order.symbol.is_empty() {
3670 let symbol_with_product =
3671 Symbol::from_ustr_unchecked(make_bybit_symbol(order.symbol, product_type));
3672
3673 let Ok(instrument) = self.instrument_from_cache(&symbol_with_product) else {
3674 log::debug!(
3675 "Skipping order report for instrument not in cache: symbol={}, full_symbol={}",
3676 order.symbol,
3677 symbol_with_product
3678 );
3679 continue;
3680 };
3681
3682 match parse_order_status_report(&order, &instrument, account_id, ts_init) {
3683 Ok(report) => reports.push(report),
3684 Err(e) => {
3685 log::error!("Failed to parse order status report: {e}");
3686 }
3687 }
3688 }
3689 }
3690 }
3691
3692 Ok(reports)
3693 }
3694
3695 pub async fn request_fill_reports(
3707 &self,
3708 account_id: AccountId,
3709 product_type: BybitProductType,
3710 instrument_id: Option<InstrumentId>,
3711 start: Option<i64>,
3712 end: Option<i64>,
3713 limit: Option<u32>,
3714 ) -> anyhow::Result<Vec<FillReport>> {
3715 let symbol = if let Some(id) = instrument_id {
3717 let bybit_symbol = BybitSymbol::new(id.symbol.as_str())?;
3718 Some(bybit_symbol.raw_symbol().to_string())
3719 } else {
3720 None
3721 };
3722
3723 let mut all_executions = Vec::new();
3725 let mut cursor: Option<String> = None;
3726 let mut total_executions = 0;
3727
3728 loop {
3729 let remaining = if let Some(limit) = limit {
3731 (limit as usize).saturating_sub(total_executions)
3732 } else {
3733 usize::MAX
3734 };
3735
3736 if remaining == 0 {
3738 break;
3739 }
3740
3741 let page_limit = std::cmp::min(remaining, 100);
3743
3744 let params = BybitTradeHistoryParams {
3745 category: product_type,
3746 symbol: symbol.clone(),
3747 base_coin: None,
3748 order_id: None,
3749 order_link_id: None,
3750 start_time: start,
3751 end_time: end,
3752 exec_type: None,
3753 limit: Some(page_limit as u32),
3754 cursor: cursor.clone(),
3755 };
3756
3757 let response = self.inner.get_trade_history(¶ms).await?;
3758 let list_len = response.result.list.len();
3759 all_executions.extend(response.result.list);
3760 total_executions += list_len;
3761
3762 cursor = response.result.next_page_cursor;
3763 if cursor.is_none() || cursor.as_ref().is_none_or(|c| c.is_empty()) {
3764 break;
3765 }
3766 }
3767
3768 let ts_init = self.generate_ts_init();
3769 let mut reports = Vec::new();
3770
3771 for execution in all_executions {
3772 let symbol_with_product =
3775 Symbol::from_ustr_unchecked(make_bybit_symbol(execution.symbol, product_type));
3776
3777 let Ok(instrument) = self.instrument_from_cache(&symbol_with_product) else {
3778 log::debug!(
3779 "Skipping fill report for instrument not in cache: symbol={}, full_symbol={}",
3780 execution.symbol,
3781 symbol_with_product
3782 );
3783 continue;
3784 };
3785
3786 match parse_fill_report(&execution, account_id, &instrument, ts_init) {
3787 Ok(report) => reports.push(report),
3788 Err(e) => {
3789 log::error!("Failed to parse fill report: {e}");
3790 }
3791 }
3792 }
3793
3794 Ok(reports)
3795 }
3796
3797 pub async fn request_position_status_reports(
3809 &self,
3810 account_id: AccountId,
3811 product_type: BybitProductType,
3812 instrument_id: Option<InstrumentId>,
3813 ) -> anyhow::Result<Vec<PositionStatusReport>> {
3814 if product_type == BybitProductType::Spot {
3816 if self.use_spot_position_reports.load(Ordering::Relaxed) {
3817 return self
3818 .generate_spot_position_reports_from_wallet(account_id, instrument_id)
3819 .await;
3820 } else {
3821 return Ok(Vec::new());
3823 }
3824 }
3825
3826 let ts_init = self.generate_ts_init();
3827 let mut reports = Vec::new();
3828
3829 let symbol = if let Some(id) = instrument_id {
3831 let symbol_str = id.symbol.as_str();
3832 if symbol_str.is_empty() {
3833 anyhow::bail!("InstrumentId symbol is empty");
3834 }
3835 let bybit_symbol = BybitSymbol::new(symbol_str)?;
3836 Some(bybit_symbol.raw_symbol().to_string())
3837 } else {
3838 None
3839 };
3840
3841 if product_type == BybitProductType::Linear && symbol.is_none() {
3844 for settle_coin in ["USDT", "USDC"] {
3846 let mut cursor: Option<String> = None;
3847
3848 loop {
3849 let params = BybitPositionListParams {
3850 category: product_type,
3851 symbol: None,
3852 base_coin: None,
3853 settle_coin: Some(settle_coin.to_string()),
3854 limit: Some(200), cursor: cursor.clone(),
3856 };
3857
3858 let response = self.inner.get_positions(¶ms).await?;
3859
3860 for position in response.result.list {
3861 if position.symbol.is_empty() {
3862 continue;
3863 }
3864
3865 let symbol_with_product = Symbol::new(format!(
3866 "{}{}",
3867 position.symbol.as_str(),
3868 product_type.suffix()
3869 ));
3870
3871 let Ok(instrument) = self.instrument_from_cache(&symbol_with_product)
3872 else {
3873 log::debug!(
3874 "Skipping position report for instrument not in cache: symbol={}, full_symbol={}",
3875 position.symbol,
3876 symbol_with_product
3877 );
3878 continue;
3879 };
3880
3881 match parse_position_status_report(
3882 &position,
3883 account_id,
3884 &instrument,
3885 ts_init,
3886 ) {
3887 Ok(report) => reports.push(report),
3888 Err(e) => {
3889 log::error!("Failed to parse position status report: {e}");
3890 }
3891 }
3892 }
3893
3894 cursor = response.result.next_page_cursor;
3895 if cursor.as_ref().is_none_or(|c| c.is_empty()) {
3896 break;
3897 }
3898 }
3899 }
3900 } else {
3901 let mut cursor: Option<String> = None;
3903
3904 loop {
3905 let params = BybitPositionListParams {
3906 category: product_type,
3907 symbol: symbol.clone(),
3908 base_coin: None,
3909 settle_coin: None,
3910 limit: Some(200), cursor: cursor.clone(),
3912 };
3913
3914 let response = self.inner.get_positions(¶ms).await?;
3915
3916 for position in response.result.list {
3917 if position.symbol.is_empty() {
3918 continue;
3919 }
3920
3921 let symbol_with_product = Symbol::new(format!(
3922 "{}{}",
3923 position.symbol.as_str(),
3924 product_type.suffix()
3925 ));
3926
3927 let Ok(instrument) = self.instrument_from_cache(&symbol_with_product) else {
3928 log::debug!(
3929 "Skipping position report for instrument not in cache: symbol={}, full_symbol={}",
3930 position.symbol,
3931 symbol_with_product
3932 );
3933 continue;
3934 };
3935
3936 match parse_position_status_report(&position, account_id, &instrument, ts_init)
3937 {
3938 Ok(report) => reports.push(report),
3939 Err(e) => {
3940 log::error!("Failed to parse position status report: {e}");
3941 }
3942 }
3943 }
3944
3945 cursor = response.result.next_page_cursor;
3946 if cursor.is_none() || cursor.as_ref().is_none_or(|c| c.is_empty()) {
3947 break;
3948 }
3949 }
3950 }
3951
3952 Ok(reports)
3953 }
3954}
3955
3956#[cfg(test)]
3957mod tests {
3958 use rstest::rstest;
3959
3960 use super::*;
3961
3962 #[rstest]
3963 fn test_client_creation() {
3964 let client = BybitHttpClient::new(None, Some(60), None, None, None, None, None);
3965 assert!(client.is_ok());
3966
3967 let client = client.unwrap();
3968 assert!(client.base_url().contains("bybit.com"));
3969 assert!(client.credential().is_none());
3970 }
3971
3972 #[rstest]
3973 fn test_client_with_credentials() {
3974 let client = BybitHttpClient::with_credentials(
3975 "test_key".to_string(),
3976 "test_secret".to_string(),
3977 Some("https://api-testnet.bybit.com".to_string()),
3978 Some(60),
3979 None,
3980 None,
3981 None,
3982 None,
3983 None,
3984 );
3985 assert!(client.is_ok());
3986
3987 let client = client.unwrap();
3988 assert!(client.credential().is_some());
3989 }
3990
3991 #[rstest]
3992 fn test_build_path_with_params() {
3993 #[derive(Serialize)]
3994 struct TestParams {
3995 category: String,
3996 symbol: String,
3997 }
3998
3999 let params = TestParams {
4000 category: "linear".to_string(),
4001 symbol: "BTCUSDT".to_string(),
4002 };
4003
4004 let path = BybitRawHttpClient::build_path("/v5/market/test", ¶ms);
4005 assert!(path.is_ok());
4006 assert!(path.unwrap().contains("category=linear"));
4007 }
4008
4009 #[rstest]
4010 fn test_build_path_without_params() {
4011 let params = ();
4012 let path = BybitRawHttpClient::build_path("/v5/market/time", ¶ms);
4013 assert!(path.is_ok());
4014 assert_eq!(path.unwrap(), "/v5/market/time");
4015 }
4016
4017 #[rstest]
4018 fn test_params_serialization_matches_build_path() {
4019 #[derive(Serialize)]
4021 struct TestParams {
4022 category: String,
4023 limit: u32,
4024 }
4025
4026 let params = TestParams {
4027 category: "spot".to_string(),
4028 limit: 50,
4029 };
4030
4031 let old_path = BybitRawHttpClient::build_path("/v5/order/realtime", ¶ms).unwrap();
4033 let old_query = old_path.split('?').nth(1).unwrap_or("");
4034
4035 let new_query = serde_urlencoded::to_string(¶ms).unwrap();
4037
4038 assert_eq!(old_query, new_query);
4040 }
4041
4042 #[rstest]
4043 fn test_params_serialization_order() {
4044 #[derive(Serialize)]
4046 struct OrderParams {
4047 category: String,
4048 symbol: String,
4049 limit: u32,
4050 }
4051
4052 let params = OrderParams {
4053 category: "spot".to_string(),
4054 symbol: "BTCUSDT".to_string(),
4055 limit: 50,
4056 };
4057
4058 let query1 = serde_urlencoded::to_string(¶ms).unwrap();
4060 let query2 = serde_urlencoded::to_string(¶ms).unwrap();
4061 let query3 = serde_urlencoded::to_string(¶ms).unwrap();
4062
4063 assert_eq!(query1, query2);
4064 assert_eq!(query2, query3);
4065
4066 assert!(query1.contains("category=spot"));
4068 assert!(query1.contains("symbol=BTCUSDT"));
4069 assert!(query1.contains("limit=50"));
4070 }
4071}