1use std::{
37 collections::HashMap,
38 fmt::Debug,
39 num::NonZeroU32,
40 str::FromStr,
41 sync::{
42 Arc, LazyLock,
43 atomic::{AtomicBool, Ordering},
44 },
45};
46
47use ahash::{AHashMap, AHashSet};
48use chrono::{DateTime, Utc};
49use dashmap::DashMap;
50use nautilus_core::{
51 UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var, time::get_atomic_clock_realtime,
52};
53use nautilus_model::{
54 data::{Bar, BarType, IndexPriceUpdate, MarkPriceUpdate, TradeTick},
55 enums::{AggregationSource, BarAggregation, OrderSide, OrderType, TriggerType},
56 events::AccountState,
57 identifiers::{AccountId, ClientOrderId, InstrumentId},
58 instruments::{Instrument, InstrumentAny},
59 reports::{FillReport, OrderStatusReport, PositionStatusReport},
60 types::{Price, Quantity},
61};
62use nautilus_network::{
63 http::{HttpClient, Method, StatusCode, USER_AGENT},
64 ratelimiter::quota::Quota,
65 retry::{RetryConfig, RetryManager},
66};
67use rust_decimal::Decimal;
68use serde::{Deserialize, Serialize, de::DeserializeOwned};
69use tokio_util::sync::CancellationToken;
70use ustr::Ustr;
71
72use super::{
73 error::OKXHttpError,
74 models::{
75 OKXAccount, OKXCancelAlgoOrderRequest, OKXCancelAlgoOrderResponse, OKXFeeRate,
76 OKXIndexTicker, OKXMarkPrice, OKXOrderAlgo, OKXOrderHistory, OKXPlaceAlgoOrderRequest,
77 OKXPlaceAlgoOrderResponse, OKXPosition, OKXPositionHistory, OKXPositionTier, OKXServerTime,
78 OKXTransactionDetail,
79 },
80 query::{
81 GetAlgoOrdersParams, GetAlgoOrdersParamsBuilder, GetCandlesticksParams,
82 GetCandlesticksParamsBuilder, GetIndexTickerParams, GetIndexTickerParamsBuilder,
83 GetInstrumentsParams, GetInstrumentsParamsBuilder, GetMarkPriceParams,
84 GetMarkPriceParamsBuilder, GetOrderHistoryParams, GetOrderHistoryParamsBuilder,
85 GetOrderListParams, GetOrderListParamsBuilder, GetPositionTiersParams,
86 GetPositionsHistoryParams, GetPositionsParams, GetPositionsParamsBuilder,
87 GetTradeFeeParams, GetTradesParams, GetTradesParamsBuilder, GetTransactionDetailsParams,
88 GetTransactionDetailsParamsBuilder, SetPositionModeParams, SetPositionModeParamsBuilder,
89 },
90};
91use crate::{
92 common::{
93 consts::{OKX_HTTP_URL, OKX_NAUTILUS_BROKER_ID, should_retry_error_code},
94 credential::Credential,
95 enums::{
96 OKXAlgoOrderType, OKXContractType, OKXInstrumentStatus, OKXInstrumentType,
97 OKXOrderStatus, OKXPositionMode, OKXSide, OKXTradeMode, OKXTriggerType,
98 },
99 models::OKXInstrument,
100 parse::{
101 okx_instrument_type, okx_instrument_type_from_symbol, parse_account_state,
102 parse_candlestick, parse_fill_report, parse_index_price_update, parse_instrument_any,
103 parse_mark_price_update, parse_order_status_report, parse_position_status_report,
104 parse_spot_margin_position_from_balance, parse_trade_tick,
105 },
106 },
107 http::{
108 models::{OKXCandlestick, OKXTrade},
109 query::GetOrderParams,
110 },
111 websocket::{messages::OKXAlgoOrderMsg, parse::parse_algo_order_status_report},
112};
113
114const OKX_SUCCESS_CODE: &str = "0";
115
116pub static OKX_REST_QUOTA: LazyLock<Quota> =
125 LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
126
127const OKX_GLOBAL_RATE_KEY: &str = "okx:global";
128
129#[derive(Debug, Serialize, Deserialize)]
131pub struct OKXResponse<T> {
132 pub code: String,
134 pub msg: String,
136 pub data: Vec<T>,
138}
139
140pub struct OKXRawHttpClient {
146 base_url: String,
147 client: HttpClient,
148 credential: Option<Credential>,
149 retry_manager: RetryManager<OKXHttpError>,
150 cancellation_token: CancellationToken,
151 is_demo: bool,
152}
153
154impl Default for OKXRawHttpClient {
155 fn default() -> Self {
156 Self::new(None, Some(60), None, None, None, false, None)
157 .expect("Failed to create default OKXRawHttpClient")
158 }
159}
160
161impl Debug for OKXRawHttpClient {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 let credential = self.credential.as_ref().map(|_| "<redacted>");
164 f.debug_struct(stringify!(OKXRawHttpClient))
165 .field("base_url", &self.base_url)
166 .field("credential", &credential)
167 .finish_non_exhaustive()
168 }
169}
170
171impl OKXRawHttpClient {
172 fn rate_limiter_quotas() -> Vec<(String, Quota)> {
173 vec![
174 (OKX_GLOBAL_RATE_KEY.to_string(), *OKX_REST_QUOTA),
175 (
176 "okx:/api/v5/account/balance".to_string(),
177 Quota::per_second(NonZeroU32::new(5).unwrap()),
178 ),
179 (
180 "okx:/api/v5/public/instruments".to_string(),
181 Quota::per_second(NonZeroU32::new(10).unwrap()),
182 ),
183 (
184 "okx:/api/v5/market/candles".to_string(),
185 Quota::per_second(NonZeroU32::new(50).unwrap()),
186 ),
187 (
188 "okx:/api/v5/market/history-candles".to_string(),
189 Quota::per_second(NonZeroU32::new(20).unwrap()),
190 ),
191 (
192 "okx:/api/v5/market/history-trades".to_string(),
193 Quota::per_second(NonZeroU32::new(30).unwrap()),
194 ),
195 (
196 "okx:/api/v5/trade/order".to_string(),
197 Quota::per_second(NonZeroU32::new(30).unwrap()), ),
199 (
200 "okx:/api/v5/trade/orders-pending".to_string(),
201 Quota::per_second(NonZeroU32::new(20).unwrap()),
202 ),
203 (
204 "okx:/api/v5/trade/orders-history".to_string(),
205 Quota::per_second(NonZeroU32::new(20).unwrap()),
206 ),
207 (
208 "okx:/api/v5/trade/fills".to_string(),
209 Quota::per_second(NonZeroU32::new(30).unwrap()),
210 ),
211 (
212 "okx:/api/v5/trade/order-algo".to_string(),
213 Quota::per_second(NonZeroU32::new(10).unwrap()),
214 ),
215 (
216 "okx:/api/v5/trade/cancel-algos".to_string(),
217 Quota::per_second(NonZeroU32::new(10).unwrap()),
218 ),
219 ]
220 }
221
222 fn rate_limit_keys(endpoint: &str) -> Vec<Ustr> {
223 let normalized = endpoint.split('?').next().unwrap_or(endpoint);
224 let route = format!("okx:{normalized}");
225
226 vec![Ustr::from(OKX_GLOBAL_RATE_KEY), Ustr::from(route.as_str())]
227 }
228
229 pub fn cancel_all_requests(&self) {
231 self.cancellation_token.cancel();
232 }
233
234 pub fn cancellation_token(&self) -> &CancellationToken {
236 &self.cancellation_token
237 }
238
239 pub fn new(
249 base_url: Option<String>,
250 timeout_secs: Option<u64>,
251 max_retries: Option<u32>,
252 retry_delay_ms: Option<u64>,
253 retry_delay_max_ms: Option<u64>,
254 is_demo: bool,
255 proxy_url: Option<String>,
256 ) -> Result<Self, OKXHttpError> {
257 let retry_config = RetryConfig {
258 max_retries: max_retries.unwrap_or(3),
259 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
260 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
261 backoff_factor: 2.0,
262 jitter_ms: 1000,
263 operation_timeout_ms: Some(60_000),
264 immediate_first: false,
265 max_elapsed_ms: Some(180_000),
266 };
267
268 let retry_manager = RetryManager::new(retry_config);
269
270 Ok(Self {
271 base_url: base_url.unwrap_or(OKX_HTTP_URL.to_string()),
272 client: HttpClient::new(
273 Self::default_headers(is_demo),
274 vec![],
275 Self::rate_limiter_quotas(),
276 Some(*OKX_REST_QUOTA),
277 timeout_secs,
278 proxy_url,
279 )
280 .map_err(|e| {
281 OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
282 })?,
283 credential: None,
284 retry_manager,
285 cancellation_token: CancellationToken::new(),
286 is_demo,
287 })
288 }
289
290 #[allow(clippy::too_many_arguments)]
297 pub fn with_credentials(
298 api_key: String,
299 api_secret: String,
300 api_passphrase: String,
301 base_url: String,
302 timeout_secs: Option<u64>,
303 max_retries: Option<u32>,
304 retry_delay_ms: Option<u64>,
305 retry_delay_max_ms: Option<u64>,
306 is_demo: bool,
307 proxy_url: Option<String>,
308 ) -> Result<Self, OKXHttpError> {
309 let retry_config = RetryConfig {
310 max_retries: max_retries.unwrap_or(3),
311 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
312 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
313 backoff_factor: 2.0,
314 jitter_ms: 1000,
315 operation_timeout_ms: Some(60_000),
316 immediate_first: false,
317 max_elapsed_ms: Some(180_000),
318 };
319
320 let retry_manager = RetryManager::new(retry_config);
321
322 Ok(Self {
323 base_url,
324 client: HttpClient::new(
325 Self::default_headers(is_demo),
326 vec![],
327 Self::rate_limiter_quotas(),
328 Some(*OKX_REST_QUOTA),
329 timeout_secs,
330 proxy_url,
331 )
332 .map_err(|e| {
333 OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
334 })?,
335 credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
336 retry_manager,
337 cancellation_token: CancellationToken::new(),
338 is_demo,
339 })
340 }
341
342 fn default_headers(is_demo: bool) -> HashMap<String, String> {
344 let mut headers =
345 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())]);
346
347 if is_demo {
348 headers.insert("x-simulated-trading".to_string(), "1".to_string());
349 }
350
351 headers
352 }
353
354 fn sign_request(
361 &self,
362 method: &Method,
363 path: &str,
364 body: Option<&[u8]>,
365 ) -> Result<HashMap<String, String>, OKXHttpError> {
366 let credential = match self.credential.as_ref() {
367 Some(c) => c,
368 None => return Err(OKXHttpError::MissingCredentials),
369 };
370
371 let api_key = credential.api_key.to_string();
372 let api_passphrase = credential.api_passphrase.clone();
373
374 let now = Utc::now();
376 let millis = now.timestamp_subsec_millis();
377 let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string() + &format!(".{millis:03}Z");
378 let signature = credential.sign_bytes(×tamp, method.as_str(), path, body);
379
380 let mut headers = HashMap::new();
381 headers.insert("OK-ACCESS-KEY".to_string(), api_key);
382 headers.insert("OK-ACCESS-PASSPHRASE".to_string(), api_passphrase);
383 headers.insert("OK-ACCESS-TIMESTAMP".to_string(), timestamp);
384 headers.insert("OK-ACCESS-SIGN".to_string(), signature);
385
386 Ok(headers)
387 }
388
389 async fn send_request<T: DeserializeOwned, P: Serialize>(
405 &self,
406 method: Method,
407 path: &str,
408 params: Option<&P>,
409 body: Option<Vec<u8>>,
410 authenticate: bool,
411 ) -> Result<Vec<T>, OKXHttpError> {
412 let url = format!("{}{path}", self.base_url);
413
414 let rate_keys: Vec<String> = Self::rate_limit_keys(path)
416 .into_iter()
417 .map(|k| k.to_string())
418 .collect();
419
420 let operation = || {
421 let url = url.clone();
422 let method = method.clone();
423 let body = body.clone();
424 let rate_keys = rate_keys.clone();
425
426 async move {
427 let query_string = if let Some(p) = params {
429 serde_urlencoded::to_string(p).map_err(|e| {
430 OKXHttpError::JsonError(format!("Failed to serialize params: {e}"))
431 })?
432 } else {
433 String::new()
434 };
435
436 let full_path = if query_string.is_empty() {
438 path.to_string()
439 } else {
440 format!("{path}?{query_string}")
441 };
442
443 let mut headers = if authenticate {
444 self.sign_request(&method, &full_path, body.as_deref())?
445 } else {
446 HashMap::new()
447 };
448
449 if body.is_some() {
451 headers.insert("Content-Type".to_string(), "application/json".to_string());
452 }
453
454 let resp = self
455 .client
456 .request_with_params(
457 method.clone(),
458 url,
459 params,
460 Some(headers),
461 body,
462 None,
463 Some(rate_keys),
464 )
465 .await?;
466
467 log::trace!("Response: {resp:?}");
468
469 if resp.status.is_success() {
470 let okx_response: OKXResponse<T> =
471 serde_json::from_slice(&resp.body).map_err(|e| {
472 log::error!("Failed to deserialize OKXResponse: {e}");
473 OKXHttpError::JsonError(e.to_string())
474 })?;
475
476 if okx_response.code != OKX_SUCCESS_CODE {
477 return Err(OKXHttpError::OkxError {
478 error_code: okx_response.code,
479 message: okx_response.msg,
480 });
481 }
482
483 Ok(okx_response.data)
484 } else {
485 let error_body = String::from_utf8_lossy(&resp.body);
486 if resp.status.as_u16() == StatusCode::NOT_FOUND.as_u16() {
487 log::debug!("HTTP 404 with body: {error_body}");
488 } else {
489 log::error!(
490 "HTTP error {} with body: {error_body}",
491 resp.status.as_str()
492 );
493 }
494
495 if let Ok(parsed_error) = serde_json::from_slice::<OKXResponse<T>>(&resp.body) {
496 return Err(OKXHttpError::OkxError {
497 error_code: parsed_error.code,
498 message: parsed_error.msg,
499 });
500 }
501
502 Err(OKXHttpError::UnexpectedStatus {
503 status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
504 body: error_body.to_string(),
505 })
506 }
507 }
508 };
509
510 let should_retry = |error: &OKXHttpError| -> bool {
519 match error {
520 OKXHttpError::HttpClientError(_) => true,
521 OKXHttpError::UnexpectedStatus { status, .. } => {
522 status.as_u16() >= 500 || status.as_u16() == 429
523 }
524 OKXHttpError::OkxError { error_code, .. } => should_retry_error_code(error_code),
525 _ => false,
526 }
527 };
528
529 let create_error = |msg: String| -> OKXHttpError {
530 if msg == "canceled" {
531 OKXHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
532 } else {
533 OKXHttpError::ValidationError(msg)
534 }
535 };
536
537 self.retry_manager
538 .execute_with_retry_with_cancel(
539 path,
540 operation,
541 should_retry,
542 create_error,
543 &self.cancellation_token,
544 )
545 .await
546 }
547
548 pub async fn set_position_mode(
559 &self,
560 params: SetPositionModeParams,
561 ) -> Result<Vec<serde_json::Value>, OKXHttpError> {
562 let path = "/api/v5/account/set-position-mode";
563 let body = serde_json::to_vec(¶ms)?;
564 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
565 .await
566 }
567
568 pub async fn get_position_tiers(
579 &self,
580 params: GetPositionTiersParams,
581 ) -> Result<Vec<OKXPositionTier>, OKXHttpError> {
582 self.send_request(
583 Method::GET,
584 "/api/v5/public/position-tiers",
585 Some(¶ms),
586 None,
587 false,
588 )
589 .await
590 }
591
592 pub async fn get_instruments(
603 &self,
604 params: GetInstrumentsParams,
605 ) -> Result<Vec<OKXInstrument>, OKXHttpError> {
606 self.send_request(
607 Method::GET,
608 "/api/v5/public/instruments",
609 Some(¶ms),
610 None,
611 false,
612 )
613 .await
614 }
615
616 pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
630 let response: Vec<OKXServerTime> = self
631 .send_request::<_, ()>(Method::GET, "/api/v5/public/time", None, None, false)
632 .await?;
633 response
634 .first()
635 .map(|t| t.ts)
636 .ok_or_else(|| OKXHttpError::JsonError("Empty server time response".to_string()))
637 }
638
639 pub async fn get_mark_price(
653 &self,
654 params: GetMarkPriceParams,
655 ) -> Result<Vec<OKXMarkPrice>, OKXHttpError> {
656 self.send_request(
657 Method::GET,
658 "/api/v5/public/mark-price",
659 Some(¶ms),
660 None,
661 false,
662 )
663 .await
664 }
665
666 pub async fn get_index_tickers(
676 &self,
677 params: GetIndexTickerParams,
678 ) -> Result<Vec<OKXIndexTicker>, OKXHttpError> {
679 self.send_request(
680 Method::GET,
681 "/api/v5/market/index-tickers",
682 Some(¶ms),
683 None,
684 false,
685 )
686 .await
687 }
688
689 pub async fn get_history_trades(
699 &self,
700 params: GetTradesParams,
701 ) -> Result<Vec<OKXTrade>, OKXHttpError> {
702 self.send_request(
703 Method::GET,
704 "/api/v5/market/history-trades",
705 Some(¶ms),
706 None,
707 false,
708 )
709 .await
710 }
711
712 pub async fn get_candles(
722 &self,
723 params: GetCandlesticksParams,
724 ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
725 self.send_request(
726 Method::GET,
727 "/api/v5/market/candles",
728 Some(¶ms),
729 None,
730 false,
731 )
732 .await
733 }
734
735 pub async fn get_history_candles(
745 &self,
746 params: GetCandlesticksParams,
747 ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
748 self.send_request(
749 Method::GET,
750 "/api/v5/market/history-candles",
751 Some(¶ms),
752 None,
753 false,
754 )
755 .await
756 }
757
758 pub async fn get_balance(&self) -> Result<Vec<OKXAccount>, OKXHttpError> {
769 let path = "/api/v5/account/balance";
770 self.send_request::<_, ()>(Method::GET, path, None, None, true)
771 .await
772 }
773
774 pub async fn get_trade_fee(
786 &self,
787 params: GetTradeFeeParams,
788 ) -> Result<Vec<OKXFeeRate>, OKXHttpError> {
789 self.send_request(
790 Method::GET,
791 "/api/v5/account/trade-fee",
792 Some(¶ms),
793 None,
794 true,
795 )
796 .await
797 }
798
799 pub async fn get_order(
809 &self,
810 params: GetOrderParams,
811 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
812 self.send_request(
813 Method::GET,
814 "/api/v5/trade/order",
815 Some(¶ms),
816 None,
817 true,
818 )
819 .await
820 }
821
822 pub async fn get_orders_pending(
832 &self,
833 params: GetOrderListParams,
834 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
835 self.send_request(
836 Method::GET,
837 "/api/v5/trade/orders-pending",
838 Some(¶ms),
839 None,
840 true,
841 )
842 .await
843 }
844
845 pub async fn get_orders_history(
855 &self,
856 params: GetOrderHistoryParams,
857 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
858 self.send_request(
859 Method::GET,
860 "/api/v5/trade/orders-history",
861 Some(¶ms),
862 None,
863 true,
864 )
865 .await
866 }
867
868 pub async fn get_order_algo_pending(
874 &self,
875 params: GetAlgoOrdersParams,
876 ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
877 self.send_request(
878 Method::GET,
879 "/api/v5/trade/order-algo-pending",
880 Some(¶ms),
881 None,
882 true,
883 )
884 .await
885 }
886
887 pub async fn get_order_algo_history(
893 &self,
894 params: GetAlgoOrdersParams,
895 ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
896 self.send_request(
897 Method::GET,
898 "/api/v5/trade/order-algo-history",
899 Some(¶ms),
900 None,
901 true,
902 )
903 .await
904 }
905
906 pub async fn get_fills(
916 &self,
917 params: GetTransactionDetailsParams,
918 ) -> Result<Vec<OKXTransactionDetail>, OKXHttpError> {
919 self.send_request(
920 Method::GET,
921 "/api/v5/trade/fills",
922 Some(¶ms),
923 None,
924 true,
925 )
926 .await
927 }
928
929 pub async fn get_positions(
941 &self,
942 params: GetPositionsParams,
943 ) -> Result<Vec<OKXPosition>, OKXHttpError> {
944 self.send_request(
945 Method::GET,
946 "/api/v5/account/positions",
947 Some(¶ms),
948 None,
949 true,
950 )
951 .await
952 }
953
954 pub async fn get_positions_history(
964 &self,
965 params: GetPositionsHistoryParams,
966 ) -> Result<Vec<OKXPositionHistory>, OKXHttpError> {
967 self.send_request(
968 Method::GET,
969 "/api/v5/account/positions-history",
970 Some(¶ms),
971 None,
972 true,
973 )
974 .await
975 }
976}
977
978#[derive(Debug)]
983#[cfg_attr(
984 feature = "python",
985 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.okx")
986)]
987pub struct OKXHttpClient {
988 pub(crate) inner: Arc<OKXRawHttpClient>,
989 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
990 cache_initialized: AtomicBool,
991}
992
993impl Clone for OKXHttpClient {
994 fn clone(&self) -> Self {
995 let cache_initialized = AtomicBool::new(false);
996
997 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
998 if is_initialized {
999 cache_initialized.store(true, Ordering::Release);
1000 }
1001
1002 Self {
1003 inner: self.inner.clone(),
1004 instruments_cache: self.instruments_cache.clone(),
1005 cache_initialized,
1006 }
1007 }
1008}
1009
1010impl Default for OKXHttpClient {
1011 fn default() -> Self {
1012 Self::new(None, Some(60), None, None, None, false, None)
1013 .expect("Failed to create default OKXHttpClient")
1014 }
1015}
1016
1017impl OKXHttpClient {
1018 pub fn new(
1028 base_url: Option<String>,
1029 timeout_secs: Option<u64>,
1030 max_retries: Option<u32>,
1031 retry_delay_ms: Option<u64>,
1032 retry_delay_max_ms: Option<u64>,
1033 is_demo: bool,
1034 proxy_url: Option<String>,
1035 ) -> anyhow::Result<Self> {
1036 Ok(Self {
1037 inner: Arc::new(OKXRawHttpClient::new(
1038 base_url,
1039 timeout_secs,
1040 max_retries,
1041 retry_delay_ms,
1042 retry_delay_max_ms,
1043 is_demo,
1044 proxy_url,
1045 )?),
1046 instruments_cache: Arc::new(DashMap::new()),
1047 cache_initialized: AtomicBool::new(false),
1048 })
1049 }
1050
1051 fn generate_ts_init(&self) -> UnixNanos {
1053 get_atomic_clock_realtime().get_time_ns()
1054 }
1055
1056 pub fn from_env() -> anyhow::Result<Self> {
1063 Self::with_credentials(None, None, None, None, None, None, None, None, false, None)
1064 }
1065
1066 #[allow(clippy::too_many_arguments)]
1073 pub fn with_credentials(
1074 api_key: Option<String>,
1075 api_secret: Option<String>,
1076 api_passphrase: Option<String>,
1077 base_url: Option<String>,
1078 timeout_secs: Option<u64>,
1079 max_retries: Option<u32>,
1080 retry_delay_ms: Option<u64>,
1081 retry_delay_max_ms: Option<u64>,
1082 is_demo: bool,
1083 proxy_url: Option<String>,
1084 ) -> anyhow::Result<Self> {
1085 let api_key = get_or_env_var(api_key, "OKX_API_KEY")?;
1086 let api_secret = get_or_env_var(api_secret, "OKX_API_SECRET")?;
1087 let api_passphrase = get_or_env_var(api_passphrase, "OKX_API_PASSPHRASE")?;
1088 let base_url = base_url.unwrap_or(OKX_HTTP_URL.to_string());
1089
1090 Ok(Self {
1091 inner: Arc::new(OKXRawHttpClient::with_credentials(
1092 api_key,
1093 api_secret,
1094 api_passphrase,
1095 base_url,
1096 timeout_secs,
1097 max_retries,
1098 retry_delay_ms,
1099 retry_delay_max_ms,
1100 is_demo,
1101 proxy_url,
1102 )?),
1103 instruments_cache: Arc::new(DashMap::new()),
1104 cache_initialized: AtomicBool::new(false),
1105 })
1106 }
1107
1108 fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1114 self.instruments_cache
1115 .get(&symbol)
1116 .map(|entry| entry.value().clone())
1117 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
1118 }
1119
1120 pub fn cancel_all_requests(&self) {
1122 self.inner.cancel_all_requests();
1123 }
1124
1125 pub fn cancellation_token(&self) -> &CancellationToken {
1127 self.inner.cancellation_token()
1128 }
1129
1130 pub fn base_url(&self) -> &str {
1132 self.inner.base_url.as_str()
1133 }
1134
1135 pub fn api_key(&self) -> Option<&str> {
1137 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
1138 }
1139
1140 #[must_use]
1142 pub fn api_key_masked(&self) -> Option<String> {
1143 self.inner.credential.as_ref().map(|c| c.api_key_masked())
1144 }
1145
1146 #[must_use]
1148 pub fn is_demo(&self) -> bool {
1149 self.inner.is_demo
1150 }
1151
1152 pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
1160 self.inner.get_server_time().await
1161 }
1162
1163 #[must_use]
1167 pub fn is_initialized(&self) -> bool {
1168 self.cache_initialized.load(Ordering::Acquire)
1169 }
1170
1171 #[must_use]
1174 pub fn get_cached_symbols(&self) -> Vec<String> {
1175 self.instruments_cache
1176 .iter()
1177 .map(|entry| entry.key().to_string())
1178 .collect()
1179 }
1180
1181 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1185 for inst in instruments {
1186 self.instruments_cache
1187 .insert(inst.raw_symbol().inner(), inst);
1188 }
1189 self.cache_initialized.store(true, Ordering::Release);
1190 }
1191
1192 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1196 self.instruments_cache
1197 .insert(instrument.raw_symbol().inner(), instrument);
1198 self.cache_initialized.store(true, Ordering::Release);
1199 }
1200
1201 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1203 self.instruments_cache
1204 .get(symbol)
1205 .map(|entry| entry.value().clone())
1206 }
1207
1208 pub async fn request_account_state(
1214 &self,
1215 account_id: AccountId,
1216 ) -> anyhow::Result<AccountState> {
1217 let resp = self
1218 .inner
1219 .get_balance()
1220 .await
1221 .map_err(|e| anyhow::anyhow!(e))?;
1222
1223 let ts_init = self.generate_ts_init();
1224 let raw = resp
1225 .first()
1226 .ok_or_else(|| anyhow::anyhow!("No account state returned from OKX"))?;
1227 let account_state = parse_account_state(raw, account_id, ts_init)?;
1228
1229 Ok(account_state)
1230 }
1231
1232 pub async fn set_position_mode(&self, position_mode: OKXPositionMode) -> anyhow::Result<()> {
1245 let mut params = SetPositionModeParamsBuilder::default();
1246 params.pos_mode(position_mode);
1247 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1248
1249 match self.inner.set_position_mode(params).await {
1250 Ok(_) => Ok(()),
1251 Err(e) => {
1252 if let OKXHttpError::OkxError {
1253 error_code,
1254 message,
1255 } = &e
1256 && error_code == "50115"
1257 {
1258 log::warn!(
1259 "Account does not support position mode setting (derivatives trading not enabled): {message}"
1260 );
1261 return Ok(()); }
1263 anyhow::bail!(e)
1264 }
1265 }
1266 }
1267
1268 pub async fn request_instruments(
1274 &self,
1275 instrument_type: OKXInstrumentType,
1276 instrument_family: Option<String>,
1277 ) -> anyhow::Result<Vec<InstrumentAny>> {
1278 let mut params = GetInstrumentsParamsBuilder::default();
1279 params.inst_type(instrument_type);
1280
1281 if let Some(family) = instrument_family.clone() {
1282 params.inst_family(family);
1283 }
1284
1285 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1286
1287 let resp = self
1288 .inner
1289 .get_instruments(params)
1290 .await
1291 .map_err(|e| anyhow::anyhow!(e))?;
1292
1293 let fee_rate_opt = {
1294 let fee_params = GetTradeFeeParams {
1295 inst_type: instrument_type,
1296 uly: None,
1297 inst_family: instrument_family,
1298 };
1299
1300 match self.inner.get_trade_fee(fee_params).await {
1301 Ok(rates) => rates.into_iter().next(),
1302 Err(OKXHttpError::MissingCredentials) => {
1303 log::debug!("Missing credentials for fee rates, using None");
1304 None
1305 }
1306 Err(e) => {
1307 log::warn!("Failed to fetch fee rates for {instrument_type}: {e}");
1308 None
1309 }
1310 }
1311 };
1312
1313 let ts_init = self.generate_ts_init();
1314
1315 let mut instruments: Vec<InstrumentAny> = Vec::new();
1316 for inst in &resp {
1317 if inst.state == OKXInstrumentStatus::Preopen {
1320 continue;
1321 }
1322
1323 let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1328 let is_usdt_margined = inst.ct_type == OKXContractType::Linear;
1329 let (maker_str, taker_str) = if is_usdt_margined {
1330 (&fee_rate.maker_u, &fee_rate.taker_u)
1331 } else {
1332 (&fee_rate.maker, &fee_rate.taker)
1333 };
1334
1335 let maker = if maker_str.is_empty() {
1336 None
1337 } else {
1338 Decimal::from_str(maker_str).ok().map(|v| -v)
1339 };
1340 let taker = if taker_str.is_empty() {
1341 None
1342 } else {
1343 Decimal::from_str(taker_str).ok().map(|v| -v)
1344 };
1345
1346 (maker, taker)
1347 } else {
1348 (None, None)
1349 };
1350
1351 match parse_instrument_any(inst, None, None, maker_fee, taker_fee, ts_init) {
1352 Ok(Some(instrument_any)) => {
1353 instruments.push(instrument_any);
1354 }
1355 Ok(None) => {
1356 }
1358 Err(e) => {
1359 log::warn!("Failed to parse instrument {}: {e}", inst.inst_id);
1360 }
1361 }
1362 }
1363
1364 Ok(instruments)
1365 }
1366
1367 pub async fn request_instrument(
1378 &self,
1379 instrument_id: InstrumentId,
1380 ) -> anyhow::Result<InstrumentAny> {
1381 let symbol = instrument_id.symbol.as_str();
1382 let instrument_type = okx_instrument_type_from_symbol(symbol);
1383
1384 let mut params = GetInstrumentsParamsBuilder::default();
1385 params.inst_type(instrument_type);
1386 params.inst_id(symbol);
1387
1388 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1389
1390 let resp = self
1391 .inner
1392 .get_instruments(params)
1393 .await
1394 .map_err(|e| anyhow::anyhow!(e))?;
1395
1396 let raw_inst = resp
1397 .first()
1398 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found"))?;
1399
1400 if raw_inst.state == OKXInstrumentStatus::Preopen {
1402 anyhow::bail!("Instrument {symbol} is in pre-open state");
1403 }
1404
1405 let fee_rate_opt = {
1406 let fee_params = GetTradeFeeParams {
1407 inst_type: instrument_type,
1408 uly: None,
1409 inst_family: None,
1410 };
1411
1412 match self.inner.get_trade_fee(fee_params).await {
1413 Ok(rates) => rates.into_iter().next(),
1414 Err(OKXHttpError::MissingCredentials) => {
1415 log::debug!("Missing credentials for fee rates, using None");
1416 None
1417 }
1418 Err(e) => {
1419 log::warn!("Failed to fetch fee rates for {symbol}: {e}");
1420 None
1421 }
1422 }
1423 };
1424
1425 let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1429 let is_usdt_margined = raw_inst.ct_type == OKXContractType::Linear;
1430 let (maker_str, taker_str) = if is_usdt_margined {
1431 (&fee_rate.maker_u, &fee_rate.taker_u)
1432 } else {
1433 (&fee_rate.maker, &fee_rate.taker)
1434 };
1435
1436 let maker = if maker_str.is_empty() {
1437 None
1438 } else {
1439 Decimal::from_str(maker_str).ok().map(|v| -v)
1440 };
1441 let taker = if taker_str.is_empty() {
1442 None
1443 } else {
1444 Decimal::from_str(taker_str).ok().map(|v| -v)
1445 };
1446
1447 (maker, taker)
1448 } else {
1449 (None, None)
1450 };
1451
1452 let ts_init = self.generate_ts_init();
1453 let instrument = parse_instrument_any(raw_inst, None, None, maker_fee, taker_fee, ts_init)?
1454 .ok_or_else(|| anyhow::anyhow!("Unsupported instrument type for {symbol}"))?;
1455
1456 self.cache_instrument(instrument.clone());
1457
1458 Ok(instrument)
1459 }
1460
1461 pub async fn request_mark_price(
1467 &self,
1468 instrument_id: InstrumentId,
1469 ) -> anyhow::Result<MarkPriceUpdate> {
1470 let mut params = GetMarkPriceParamsBuilder::default();
1471 params.inst_id(instrument_id.symbol.inner());
1472 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1473
1474 let resp = self
1475 .inner
1476 .get_mark_price(params)
1477 .await
1478 .map_err(|e| anyhow::anyhow!(e))?;
1479
1480 let raw = resp
1481 .first()
1482 .ok_or_else(|| anyhow::anyhow!("No mark price returned from OKX"))?;
1483 let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1484 let ts_init = self.generate_ts_init();
1485
1486 let mark_price =
1487 parse_mark_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1488 .map_err(|e| anyhow::anyhow!(e))?;
1489 Ok(mark_price)
1490 }
1491
1492 pub async fn request_index_price(
1498 &self,
1499 instrument_id: InstrumentId,
1500 ) -> anyhow::Result<IndexPriceUpdate> {
1501 let mut params = GetIndexTickerParamsBuilder::default();
1502 params.inst_id(instrument_id.symbol.inner());
1503 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1504
1505 let resp = self
1506 .inner
1507 .get_index_tickers(params)
1508 .await
1509 .map_err(|e| anyhow::anyhow!(e))?;
1510
1511 let raw = resp
1512 .first()
1513 .ok_or_else(|| anyhow::anyhow!("No index price returned from OKX"))?;
1514 let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1515 let ts_init = self.generate_ts_init();
1516
1517 let index_price =
1518 parse_index_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1519 .map_err(|e| anyhow::anyhow!(e))?;
1520 Ok(index_price)
1521 }
1522
1523 pub async fn request_trades(
1533 &self,
1534 instrument_id: InstrumentId,
1535 start: Option<DateTime<Utc>>,
1536 end: Option<DateTime<Utc>>,
1537 limit: Option<u32>,
1538 ) -> anyhow::Result<Vec<TradeTick>> {
1539 const OKX_TRADES_MAX_LIMIT: u32 = 100;
1540 const MAX_PAGES: usize = 500;
1541 const MAX_CONSECUTIVE_EMPTY: usize = 3;
1542
1543 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1544 enum Mode {
1545 Latest,
1546 Backward,
1547 Range,
1548 }
1549
1550 let limit = if limit == Some(0) { None } else { limit };
1551
1552 if let (Some(s), Some(e)) = (start, end) {
1553 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1554 }
1555
1556 let now = Utc::now();
1557
1558 if let Some(s) = start
1559 && s > now
1560 {
1561 return Ok(Vec::new());
1562 }
1563
1564 let end = if let Some(e) = end
1565 && e > now
1566 {
1567 Some(now)
1568 } else {
1569 end
1570 };
1571
1572 let mode = match (start, end) {
1573 (None, None) => Mode::Latest,
1574 (Some(_), None) => Mode::Backward,
1575 (None, Some(_)) => Mode::Backward,
1576 (Some(_), Some(_)) => Mode::Range,
1577 };
1578
1579 let start_ms = start.map(|s| s.timestamp_millis());
1580 let end_ms = end.map(|e| e.timestamp_millis());
1581
1582 let ts_init = self.generate_ts_init();
1583 let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1584
1585 if matches!(mode, Mode::Backward | Mode::Range) {
1588 let mut before_trade_id: Option<String> = None;
1589 let mut pages = 0usize;
1590 let mut page_results: Vec<Vec<TradeTick>> = Vec::new();
1591 let mut seen_trades: std::collections::HashSet<(String, i64)> =
1592 std::collections::HashSet::new();
1593 let mut unique_count = 0usize;
1594 let mut consecutive_empty_pages = 0usize;
1595
1596 let effective_limit = if start.is_some() {
1599 limit.unwrap_or(u32::MAX)
1600 } else {
1601 limit.unwrap_or(OKX_TRADES_MAX_LIMIT)
1602 };
1603
1604 log::debug!(
1605 "Starting trades pagination: mode={mode:?}, start={start:?}, end={end:?}, limit={limit:?}, effective_limit={effective_limit}"
1606 );
1607
1608 loop {
1609 if pages >= MAX_PAGES {
1610 log::warn!("Hit MAX_PAGES limit of {MAX_PAGES}");
1611 break;
1612 }
1613
1614 if effective_limit < u32::MAX && unique_count >= effective_limit as usize {
1615 log::debug!("Reached effective limit: unique_count={unique_count}");
1616 break;
1617 }
1618
1619 let remaining = (effective_limit as usize).saturating_sub(unique_count);
1620 let page_cap = remaining.min(OKX_TRADES_MAX_LIMIT as usize) as u32;
1621
1622 log::debug!(
1623 "Requesting page {}: before_id={:?}, page_cap={}, unique_count={}",
1624 pages + 1,
1625 before_trade_id,
1626 page_cap,
1627 unique_count
1628 );
1629
1630 let mut params_builder = GetTradesParamsBuilder::default();
1631 params_builder
1632 .inst_id(instrument_id.symbol.inner())
1633 .limit(page_cap)
1634 .pagination_type(1);
1635
1636 if let Some(ref before_id) = before_trade_id {
1638 params_builder.after(before_id.clone());
1639 }
1640
1641 let params = params_builder.build().map_err(anyhow::Error::new)?;
1642 let raw = self
1643 .inner
1644 .get_history_trades(params)
1645 .await
1646 .map_err(anyhow::Error::new)?;
1647
1648 log::debug!("Received {} raw trades from API", raw.len());
1649
1650 if !raw.is_empty() {
1651 let first_id = &raw.first().unwrap().trade_id;
1652 let last_id = &raw.last().unwrap().trade_id;
1653 log::debug!(
1654 "Raw response trade ID range: first={first_id} (newest), last={last_id} (oldest)"
1655 );
1656 }
1657
1658 if raw.is_empty() {
1659 log::debug!("API returned empty page, stopping pagination");
1660 break;
1661 }
1662
1663 pages += 1;
1664
1665 let mut page_trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
1666 let mut hit_start_boundary = false;
1667 let mut filtered_out = 0usize;
1668 let mut duplicates = 0usize;
1669
1670 for r in &raw {
1671 match parse_trade_tick(
1672 r,
1673 instrument_id,
1674 inst.price_precision(),
1675 inst.size_precision(),
1676 ts_init,
1677 ) {
1678 Ok(trade) => {
1679 let ts_ms = trade.ts_event.as_i64() / 1_000_000;
1680
1681 if let Some(e_ms) = end_ms
1682 && ts_ms > e_ms
1683 {
1684 filtered_out += 1;
1685 continue;
1686 }
1687
1688 if let Some(s_ms) = start_ms
1689 && ts_ms < s_ms
1690 {
1691 hit_start_boundary = true;
1692 filtered_out += 1;
1693 break;
1694 }
1695
1696 let trade_key = (trade.trade_id.to_string(), trade.ts_event.as_i64());
1697 if seen_trades.insert(trade_key) {
1698 unique_count += 1;
1699 page_trades.push(trade);
1700 } else {
1701 duplicates += 1;
1702 }
1703 }
1704 Err(e) => log::error!("{e}"),
1705 }
1706 }
1707
1708 log::debug!(
1709 "Page {} processed: {} trades kept, {} filtered out, {} duplicates, hit_start_boundary={}",
1710 pages,
1711 page_trades.len(),
1712 filtered_out,
1713 duplicates,
1714 hit_start_boundary
1715 );
1716
1717 let oldest_trade_id = if page_trades.is_empty() {
1719 if unique_count > 0 {
1722 consecutive_empty_pages += 1;
1723 if consecutive_empty_pages >= MAX_CONSECUTIVE_EMPTY {
1724 log::debug!(
1725 "Stopping: {consecutive_empty_pages} consecutive pages with no trades in range after collecting {unique_count} trades"
1726 );
1727 break;
1728 }
1729 }
1730 raw.last().map(|t| {
1732 let id = t.trade_id.to_string();
1733 log::debug!(
1734 "Setting cursor from raw response (no unique trades): oldest_id={id}"
1735 );
1736 id
1737 })
1738 } else {
1739 let oldest_id = page_trades.last().map(|t| {
1741 let id = t.trade_id.to_string();
1742 log::debug!(
1743 "Setting cursor from deduplicated trades: oldest_id={}, ts_event={}",
1744 id,
1745 t.ts_event.as_i64()
1746 );
1747 id
1748 });
1749 page_trades.reverse();
1750 page_results.push(page_trades);
1751 consecutive_empty_pages = 0;
1752 oldest_id
1753 };
1754
1755 if let Some(ref old_id) = before_trade_id
1756 && oldest_trade_id.as_ref() == Some(old_id)
1757 {
1758 break;
1759 }
1760
1761 if oldest_trade_id.is_none() {
1762 break;
1763 }
1764
1765 before_trade_id = oldest_trade_id;
1766
1767 if hit_start_boundary {
1768 break;
1769 }
1770
1771 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1772 }
1773
1774 log::debug!(
1775 "Pagination complete: {pages} pages, {unique_count} unique trades collected"
1776 );
1777
1778 let mut out: Vec<TradeTick> = Vec::new();
1779 for page in page_results.into_iter().rev() {
1780 out.extend(page);
1781 }
1782
1783 let mut dedup_keys = std::collections::HashSet::new();
1785 let pre_dedup_len = out.len();
1786 out.retain(|trade| {
1787 dedup_keys.insert((trade.trade_id.to_string(), trade.ts_event.as_i64()))
1788 });
1789
1790 if out.len() < pre_dedup_len {
1791 log::debug!(
1792 "Removed {} duplicate trades during final dedup",
1793 pre_dedup_len - out.len()
1794 );
1795 }
1796
1797 if let Some(lim) = limit
1798 && lim > 0
1799 && out.len() > lim as usize
1800 {
1801 let excess = out.len() - lim as usize;
1802 log::debug!("Trimming {excess} oldest trades to respect limit={lim}");
1803 out.drain(0..excess);
1804 }
1805
1806 log::debug!("Returning {} trades", out.len());
1807 return Ok(out);
1808 }
1809
1810 let req_limit = limit
1811 .unwrap_or(OKX_TRADES_MAX_LIMIT)
1812 .min(OKX_TRADES_MAX_LIMIT);
1813 let params = GetTradesParamsBuilder::default()
1814 .inst_id(instrument_id.symbol.inner())
1815 .limit(req_limit)
1816 .build()
1817 .map_err(anyhow::Error::new)?;
1818
1819 let raw = self
1820 .inner
1821 .get_history_trades(params)
1822 .await
1823 .map_err(anyhow::Error::new)?;
1824
1825 let mut trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
1826 for r in &raw {
1827 match parse_trade_tick(
1828 r,
1829 instrument_id,
1830 inst.price_precision(),
1831 inst.size_precision(),
1832 ts_init,
1833 ) {
1834 Ok(trade) => trades.push(trade),
1835 Err(e) => log::error!("{e}"),
1836 }
1837 }
1838
1839 trades.reverse();
1841
1842 if let Some(lim) = limit
1843 && lim > 0
1844 && trades.len() > lim as usize
1845 {
1846 trades.drain(0..trades.len() - lim as usize);
1847 }
1848
1849 Ok(trades)
1850 }
1851
1852 pub async fn request_bars(
1897 &self,
1898 bar_type: BarType,
1899 start: Option<DateTime<Utc>>,
1900 mut end: Option<DateTime<Utc>>,
1901 limit: Option<u32>,
1902 ) -> anyhow::Result<Vec<Bar>> {
1903 const HISTORY_SPLIT_DAYS: i64 = 100;
1904 const MAX_PAGES_SOFT: usize = 500;
1905
1906 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1907 enum Mode {
1908 Latest,
1909 Backward,
1910 Range,
1911 }
1912
1913 let limit = if limit == Some(0) { None } else { limit };
1914
1915 anyhow::ensure!(
1916 bar_type.aggregation_source() == AggregationSource::External,
1917 "Only EXTERNAL aggregation is supported"
1918 );
1919
1920 if let (Some(s), Some(e)) = (start, end) {
1921 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1922 }
1923
1924 let now = Utc::now();
1925
1926 if let Some(s) = start
1927 && s > now
1928 {
1929 return Ok(Vec::new());
1930 }
1931 if let Some(e) = end
1932 && e > now
1933 {
1934 end = Some(now);
1935 }
1936
1937 let spec = bar_type.spec();
1938 let step = spec.step.get();
1939 let bar_param = match spec.aggregation {
1940 BarAggregation::Second => format!("{step}s"),
1941 BarAggregation::Minute => format!("{step}m"),
1942 BarAggregation::Hour => format!("{step}H"),
1943 BarAggregation::Day => format!("{step}D"),
1944 BarAggregation::Week => format!("{step}W"),
1945 BarAggregation::Month => format!("{step}M"),
1946 a => anyhow::bail!("OKX does not support {a:?} aggregation"),
1947 };
1948
1949 let slot_ms: i64 = match spec.aggregation {
1950 BarAggregation::Second => (step as i64) * 1_000,
1951 BarAggregation::Minute => (step as i64) * 60_000,
1952 BarAggregation::Hour => (step as i64) * 3_600_000,
1953 BarAggregation::Day => (step as i64) * 86_400_000,
1954 BarAggregation::Week => (step as i64) * 7 * 86_400_000,
1955 BarAggregation::Month => (step as i64) * 30 * 86_400_000,
1956 _ => unreachable!("Unsupported aggregation should have been caught above"),
1957 };
1958 let slot_ns: i64 = slot_ms * 1_000_000;
1959
1960 let mode = match (start, end) {
1961 (None, None) => Mode::Latest,
1962 (Some(_), None) => Mode::Backward, (None, Some(_)) => Mode::Backward,
1964 (Some(_), Some(_)) => Mode::Range,
1965 };
1966
1967 let start_ns = start.and_then(|s| s.timestamp_nanos_opt());
1968 let end_ns = end.and_then(|e| e.timestamp_nanos_opt());
1969
1970 let start_ms = start.map(|s| {
1972 let ms = s.timestamp_millis();
1973 if slot_ms > 0 {
1974 (ms / slot_ms) * slot_ms } else {
1976 ms
1977 }
1978 });
1979 let end_ms = end.map(|e| {
1980 let ms = e.timestamp_millis();
1981 if slot_ms > 0 {
1982 ((ms + slot_ms - 1) / slot_ms) * slot_ms } else {
1984 ms
1985 }
1986 });
1987 let now_ms = now.timestamp_millis();
1988
1989 let symbol = bar_type.instrument_id().symbol;
1990 let inst = self.instrument_from_cache(symbol.inner())?;
1991
1992 let mut out: Vec<Bar> = Vec::new();
1993 let mut pages = 0usize;
1994
1995 let mut after_ms: Option<i64> = match mode {
2000 Mode::Range => end_ms.or(Some(now_ms)), _ => None,
2002 };
2003 let mut before_ms: Option<i64> = match mode {
2004 Mode::Backward => end_ms.map(|v| v.saturating_sub(1)),
2005 Mode::Range => start_ms, Mode::Latest => None,
2007 };
2008
2009 let mut forward_prepend_mode = matches!(mode, Mode::Range);
2011
2012 if matches!(mode, Mode::Backward | Mode::Range)
2016 && let Some(b) = before_ms
2017 {
2018 let buffer_ms = slot_ms.max(60_000); if b >= now_ms.saturating_sub(buffer_ms) {
2024 before_ms = Some(now_ms.saturating_sub(buffer_ms));
2025 }
2026 }
2027
2028 let mut have_latest_first_page = false;
2029 let mut progressless_loops = 0u8;
2030
2031 loop {
2032 if let Some(lim) = limit
2033 && lim > 0
2034 && out.len() >= lim as usize
2035 {
2036 break;
2037 }
2038 if pages >= MAX_PAGES_SOFT {
2039 break;
2040 }
2041
2042 let pivot_ms = if let Some(a) = after_ms {
2043 a
2044 } else if let Some(b) = before_ms {
2045 b
2046 } else {
2047 now_ms
2048 };
2049 let age_ms = now_ms.saturating_sub(pivot_ms);
2055 let age_hours = age_ms / (60 * 60 * 1000);
2056 let using_history = age_hours > 1; let page_ceiling = if using_history { 100 } else { 300 };
2059 let remaining = limit
2060 .filter(|&l| l > 0) .map_or(page_ceiling, |l| (l as usize).saturating_sub(out.len()));
2062 let page_cap = remaining.min(page_ceiling);
2063
2064 let mut p = GetCandlesticksParamsBuilder::default();
2065 p.inst_id(symbol.as_str())
2066 .bar(&bar_param)
2067 .limit(page_cap as u32);
2068
2069 let mut req_used_before = false;
2071
2072 match mode {
2073 Mode::Latest => {
2074 if have_latest_first_page && let Some(b) = before_ms {
2075 p.before_ms(b);
2076 req_used_before = true;
2077 }
2078 }
2079 Mode::Backward => {
2080 if let Some(b) = before_ms {
2082 p.after_ms(b);
2083 }
2084 }
2085 Mode::Range => {
2086 if let Some(a) = after_ms {
2089 p.after_ms(a);
2090 }
2091 if let Some(b) = before_ms {
2092 p.before_ms(b);
2093 req_used_before = true;
2094 }
2095 }
2096 }
2097
2098 let params = p.build().map_err(anyhow::Error::new)?;
2099
2100 let mut raw = if using_history {
2101 self.inner
2102 .get_history_candles(params.clone())
2103 .await
2104 .map_err(anyhow::Error::new)?
2105 } else {
2106 self.inner
2107 .get_candles(params.clone())
2108 .await
2109 .map_err(anyhow::Error::new)?
2110 };
2111
2112 if raw.is_empty() {
2114 if matches!(mode, Mode::Latest)
2116 && have_latest_first_page
2117 && !using_history
2118 && let Some(b) = before_ms
2119 {
2120 let mut p2 = GetCandlesticksParamsBuilder::default();
2121 p2.inst_id(symbol.as_str())
2122 .bar(&bar_param)
2123 .limit(page_cap as u32);
2124 p2.before_ms(b);
2125 let params2 = p2.build().map_err(anyhow::Error::new)?;
2126 let raw2 = self
2127 .inner
2128 .get_history_candles(params2)
2129 .await
2130 .map_err(anyhow::Error::new)?;
2131 if raw2.is_empty() {
2132 let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2134 before_ms = Some(b.saturating_sub(jump));
2135 progressless_loops = progressless_loops.saturating_add(1);
2136 if progressless_loops >= 3 {
2137 break;
2138 }
2139 continue;
2140 } else {
2141 raw = raw2;
2142 }
2143 }
2144
2145 if raw.is_empty() && matches!(mode, Mode::Range) && pages > 0 {
2149 let backstep_ms = (page_cap as i64).saturating_mul(slot_ms.max(1));
2150 let pivot_back = after_ms.unwrap_or(now_ms).saturating_sub(backstep_ms);
2151
2152 let mut p2 = GetCandlesticksParamsBuilder::default();
2153 p2.inst_id(symbol.as_str())
2154 .bar(&bar_param)
2155 .limit(page_cap as u32)
2156 .before_ms(pivot_back);
2157 let params2 = p2.build().map_err(anyhow::Error::new)?;
2158 let raw2 = if (now_ms.saturating_sub(pivot_back)) / (24 * 60 * 60 * 1000)
2159 > HISTORY_SPLIT_DAYS
2160 {
2161 self.inner.get_history_candles(params2).await
2162 } else {
2163 self.inner.get_candles(params2).await
2164 }
2165 .map_err(anyhow::Error::new)?;
2166 if raw2.is_empty() {
2167 break;
2168 } else {
2169 raw = raw2;
2170 forward_prepend_mode = true;
2171 req_used_before = true;
2172 }
2173 }
2174
2175 if raw.is_empty()
2177 && matches!(mode, Mode::Latest)
2178 && !have_latest_first_page
2179 && !using_history
2180 {
2181 let jump_days_ms = (HISTORY_SPLIT_DAYS + 1) * 86_400_000;
2182 before_ms = Some(now_ms.saturating_sub(jump_days_ms));
2183 have_latest_first_page = true;
2184 continue;
2185 }
2186
2187 if raw.is_empty() {
2189 break;
2190 }
2191 }
2192 pages += 1;
2195
2196 let ts_init = self.generate_ts_init();
2198 let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2199 for r in &raw {
2200 page.push(parse_candlestick(
2201 r,
2202 bar_type,
2203 inst.price_precision(),
2204 inst.size_precision(),
2205 ts_init,
2206 )?);
2207 }
2208 page.reverse();
2209
2210 let page_oldest_ms = page.first().map(|b| b.ts_event.as_i64() / 1_000_000);
2211 let page_newest_ms = page.last().map(|b| b.ts_event.as_i64() / 1_000_000);
2212
2213 let mut filtered: Vec<Bar> = if matches!(mode, Mode::Range)
2217 && out.is_empty()
2218 && pages < 2
2219 {
2220 let tolerance_ns = slot_ns * 2; if !page.is_empty() {
2227 log::debug!(
2228 "Range mode bootstrap page: {} bars from {} to {}, filtering with start={:?} end={:?}",
2229 page.len(),
2230 page.first().unwrap().ts_event.as_i64() / 1_000_000,
2231 page.last().unwrap().ts_event.as_i64() / 1_000_000,
2232 start_ms,
2233 end_ms
2234 );
2235 }
2236
2237 let result: Vec<Bar> = page
2238 .clone()
2239 .into_iter()
2240 .filter(|b| {
2241 let ts = b.ts_event.as_i64();
2242 let ok_after =
2244 start_ns.is_none_or(|sns| ts >= sns.saturating_sub(tolerance_ns));
2245 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2246 ok_after && ok_before
2247 })
2248 .collect();
2249
2250 result
2251 } else {
2252 page.clone()
2254 .into_iter()
2255 .filter(|b| {
2256 let ts = b.ts_event.as_i64();
2257 let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2258 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2259 ok_after && ok_before
2260 })
2261 .collect()
2262 };
2263
2264 if !page.is_empty() && filtered.is_empty() {
2265 if matches!(mode, Mode::Range)
2267 && !forward_prepend_mode
2268 && let (Some(newest_ms), Some(start_ms)) = (page_newest_ms, start_ms)
2269 && newest_ms < start_ms.saturating_sub(slot_ms * 2)
2270 {
2271 break;
2273 }
2274 }
2275
2276 let contribution;
2278
2279 if out.is_empty() {
2280 contribution = filtered.len();
2281 out = filtered;
2282 } else {
2283 match mode {
2284 Mode::Backward | Mode::Latest => {
2285 if let Some(first) = out.first() {
2286 filtered.retain(|b| b.ts_event < first.ts_event);
2287 }
2288 contribution = filtered.len();
2289 if contribution != 0 {
2290 let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2291 new_out.extend_from_slice(&filtered);
2292 new_out.extend_from_slice(&out);
2293 out = new_out;
2294 }
2295 }
2296 Mode::Range => {
2297 if forward_prepend_mode || req_used_before {
2298 if let Some(first) = out.first() {
2300 filtered.retain(|b| b.ts_event < first.ts_event);
2301 }
2302 contribution = filtered.len();
2303 if contribution != 0 {
2304 let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2305 new_out.extend_from_slice(&filtered);
2306 new_out.extend_from_slice(&out);
2307 out = new_out;
2308 }
2309 } else {
2310 if let Some(last) = out.last() {
2312 filtered.retain(|b| b.ts_event > last.ts_event);
2313 }
2314 contribution = filtered.len();
2315 out.extend(filtered);
2316 }
2317 }
2318 }
2319 }
2320
2321 if contribution == 0
2323 && matches!(mode, Mode::Latest | Mode::Backward | Mode::Range)
2324 && let Some(b) = before_ms
2325 {
2326 let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2327 let new_b = b.saturating_sub(jump);
2328 if new_b != b {
2329 before_ms = Some(new_b);
2330 }
2331 }
2332
2333 if contribution == 0 {
2334 progressless_loops = progressless_loops.saturating_add(1);
2335 if progressless_loops >= 3 {
2336 break;
2337 }
2338 } else {
2339 progressless_loops = 0;
2340
2341 match mode {
2343 Mode::Latest | Mode::Backward => {
2344 if let Some(oldest) = page_oldest_ms {
2345 before_ms = Some(oldest.saturating_sub(1));
2346 have_latest_first_page = true;
2347 } else {
2348 break;
2349 }
2350 }
2351 Mode::Range => {
2352 if forward_prepend_mode || req_used_before {
2353 if let Some(oldest) = page_oldest_ms {
2354 let jump_back = slot_ms.max(60_000); before_ms = Some(oldest.saturating_sub(jump_back));
2357 after_ms = None;
2358 } else {
2359 break;
2360 }
2361 } else if let Some(newest) = page_newest_ms {
2362 after_ms = Some(newest.saturating_add(1));
2363 before_ms = None;
2364 } else {
2365 break;
2366 }
2367 }
2368 }
2369 }
2370
2371 if let Some(lim) = limit
2373 && lim > 0
2374 && out.len() >= lim as usize
2375 {
2376 break;
2377 }
2378 if let Some(ens) = end_ns
2379 && let Some(last) = out.last()
2380 && last.ts_event.as_i64() >= ens
2381 {
2382 break;
2383 }
2384 if let Some(sns) = start_ns
2385 && let Some(first) = out.first()
2386 && (matches!(mode, Mode::Backward) || forward_prepend_mode)
2387 && first.ts_event.as_i64() <= sns
2388 {
2389 if matches!(mode, Mode::Range) {
2391 if let Some(ens) = end_ns
2393 && let Some(last) = out.last()
2394 {
2395 let last_ts = last.ts_event.as_i64();
2396 if last_ts < ens {
2397 forward_prepend_mode = false;
2400 after_ms = Some((last_ts / 1_000_000).saturating_add(1));
2401 before_ms = None;
2402 continue;
2403 }
2404 }
2405 }
2406 break;
2407 }
2408
2409 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2410 }
2411
2412 if out.is_empty() && matches!(mode, Mode::Range) {
2414 let pivot = end_ms.unwrap_or(now_ms.saturating_sub(1));
2415 let hist = (now_ms.saturating_sub(pivot)) / (24 * 60 * 60 * 1000) > HISTORY_SPLIT_DAYS;
2416 let mut p = GetCandlesticksParamsBuilder::default();
2417 p.inst_id(symbol.as_str())
2418 .bar(&bar_param)
2419 .limit(300)
2420 .before_ms(pivot);
2421 let params = p.build().map_err(anyhow::Error::new)?;
2422 let raw = if hist {
2423 self.inner.get_history_candles(params).await
2424 } else {
2425 self.inner.get_candles(params).await
2426 }
2427 .map_err(anyhow::Error::new)?;
2428 if !raw.is_empty() {
2429 let ts_init = self.generate_ts_init();
2430 let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2431 for r in &raw {
2432 page.push(parse_candlestick(
2433 r,
2434 bar_type,
2435 inst.price_precision(),
2436 inst.size_precision(),
2437 ts_init,
2438 )?);
2439 }
2440 page.reverse();
2441 out = page
2442 .into_iter()
2443 .filter(|b| {
2444 let ts = b.ts_event.as_i64();
2445 let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2446 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2447 ok_after && ok_before
2448 })
2449 .collect();
2450 }
2451 }
2452
2453 if let Some(ens) = end_ns {
2455 while out.last().is_some_and(|b| b.ts_event.as_i64() > ens) {
2456 out.pop();
2457 }
2458 }
2459
2460 if matches!(mode, Mode::Range)
2462 && !forward_prepend_mode
2463 && let Some(sns) = start_ns
2464 {
2465 let lower = sns.saturating_sub(slot_ns);
2466 while out.first().is_some_and(|b| b.ts_event.as_i64() < lower) {
2467 out.remove(0);
2468 }
2469 }
2470
2471 if let Some(lim) = limit
2472 && lim > 0
2473 && out.len() > lim as usize
2474 {
2475 out.truncate(lim as usize);
2476 }
2477
2478 Ok(out)
2479 }
2480
2481 #[allow(clippy::too_many_arguments)]
2492 pub async fn request_order_status_reports(
2493 &self,
2494 account_id: AccountId,
2495 instrument_type: Option<OKXInstrumentType>,
2496 instrument_id: Option<InstrumentId>,
2497 start: Option<DateTime<Utc>>,
2498 end: Option<DateTime<Utc>>,
2499 open_only: bool,
2500 limit: Option<u32>,
2501 ) -> anyhow::Result<Vec<OrderStatusReport>> {
2502 let mut history_params = GetOrderHistoryParamsBuilder::default();
2503
2504 let instrument_type = if let Some(instrument_type) = instrument_type {
2505 instrument_type
2506 } else {
2507 let instrument_id = instrument_id.ok_or_else(|| {
2508 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2509 })?;
2510 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2511 okx_instrument_type(&instrument)?
2512 };
2513
2514 history_params.inst_type(instrument_type);
2515
2516 if let Some(instrument_id) = instrument_id.as_ref() {
2517 history_params.inst_id(instrument_id.symbol.inner().to_string());
2518 }
2519
2520 if let Some(limit) = limit {
2521 history_params.limit(limit);
2522 }
2523
2524 let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
2525
2526 let mut pending_params = GetOrderListParamsBuilder::default();
2527 pending_params.inst_type(instrument_type);
2528
2529 if let Some(instrument_id) = instrument_id.as_ref() {
2530 pending_params.inst_id(instrument_id.symbol.inner().to_string());
2531 }
2532
2533 if let Some(limit) = limit {
2534 pending_params.limit(limit);
2535 }
2536
2537 let pending_params = pending_params.build().map_err(|e| anyhow::anyhow!(e))?;
2538
2539 let combined_resp = if open_only {
2540 self.inner
2542 .get_orders_pending(pending_params)
2543 .await
2544 .map_err(|e| anyhow::anyhow!(e))?
2545 } else {
2546 let (history_resp, pending_resp) = tokio::try_join!(
2548 self.inner.get_orders_history(history_params),
2549 self.inner.get_orders_pending(pending_params)
2550 )
2551 .map_err(|e| anyhow::anyhow!(e))?;
2552
2553 let mut combined_resp = history_resp;
2555 combined_resp.extend(pending_resp);
2556 combined_resp
2557 };
2558
2559 let start_ns = start.map(UnixNanos::from);
2561 let end_ns = end.map(UnixNanos::from);
2562
2563 let ts_init = self.generate_ts_init();
2564 let mut reports = Vec::with_capacity(combined_resp.len());
2565
2566 let mut seen: AHashSet<String> = AHashSet::new();
2568
2569 for order in combined_resp {
2570 let seen_key = if !order.cl_ord_id.is_empty() {
2571 order.cl_ord_id.as_str().to_string()
2572 } else if let Some(algo_cl_ord_id) = order
2573 .algo_cl_ord_id
2574 .as_ref()
2575 .filter(|value| !value.as_str().is_empty())
2576 {
2577 algo_cl_ord_id.as_str().to_string()
2578 } else if let Some(algo_id) = order
2579 .algo_id
2580 .as_ref()
2581 .filter(|value| !value.as_str().is_empty())
2582 {
2583 algo_id.as_str().to_string()
2584 } else {
2585 order.ord_id.as_str().to_string()
2586 };
2587
2588 if !seen.insert(seen_key) {
2589 continue; }
2591
2592 let Ok(inst) = self.instrument_from_cache(order.inst_id) else {
2593 log::debug!(
2594 "Skipping order report for instrument not in cache: symbol={}",
2595 order.inst_id,
2596 );
2597 continue;
2598 };
2599
2600 let report = match parse_order_status_report(
2601 &order,
2602 account_id,
2603 inst.id(),
2604 inst.price_precision(),
2605 inst.size_precision(),
2606 ts_init,
2607 ) {
2608 Ok(report) => report,
2609 Err(e) => {
2610 log::error!("Failed to parse order status report: {e}");
2611 continue;
2612 }
2613 };
2614
2615 if let Some(start_ns) = start_ns
2616 && report.ts_last < start_ns
2617 {
2618 continue;
2619 }
2620 if let Some(end_ns) = end_ns
2621 && report.ts_last > end_ns
2622 {
2623 continue;
2624 }
2625
2626 reports.push(report);
2627 }
2628
2629 Ok(reports)
2630 }
2631
2632 pub async fn request_fill_reports(
2642 &self,
2643 account_id: AccountId,
2644 instrument_type: Option<OKXInstrumentType>,
2645 instrument_id: Option<InstrumentId>,
2646 start: Option<DateTime<Utc>>,
2647 end: Option<DateTime<Utc>>,
2648 limit: Option<u32>,
2649 ) -> anyhow::Result<Vec<FillReport>> {
2650 let mut params = GetTransactionDetailsParamsBuilder::default();
2651
2652 let instrument_type = if let Some(instrument_type) = instrument_type {
2653 instrument_type
2654 } else {
2655 let instrument_id = instrument_id.ok_or_else(|| {
2656 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2657 })?;
2658 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2659 okx_instrument_type(&instrument)?
2660 };
2661
2662 params.inst_type(instrument_type);
2663
2664 if let Some(instrument_id) = instrument_id {
2665 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2666 let instrument_type = okx_instrument_type(&instrument)?;
2667 params.inst_type(instrument_type);
2668 params.inst_id(instrument_id.symbol.inner().to_string());
2669 }
2670
2671 if let Some(limit) = limit {
2672 params.limit(limit);
2673 }
2674
2675 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2676
2677 let resp = self
2678 .inner
2679 .get_fills(params)
2680 .await
2681 .map_err(|e| anyhow::anyhow!(e))?;
2682
2683 let start_ns = start.map(UnixNanos::from);
2685 let end_ns = end.map(UnixNanos::from);
2686
2687 let ts_init = self.generate_ts_init();
2688 let mut reports = Vec::with_capacity(resp.len());
2689
2690 for detail in resp {
2691 if detail.fill_sz.is_empty() {
2693 continue;
2694 }
2695 if let Ok(qty) = detail.fill_sz.parse::<f64>() {
2696 if qty <= 0.0 {
2697 continue;
2698 }
2699 } else {
2700 continue;
2702 }
2703
2704 let Ok(inst) = self.instrument_from_cache(detail.inst_id) else {
2705 log::debug!(
2706 "Skipping fill report for instrument not in cache: symbol={}",
2707 detail.inst_id,
2708 );
2709 continue;
2710 };
2711
2712 let report = match parse_fill_report(
2713 detail,
2714 account_id,
2715 inst.id(),
2716 inst.price_precision(),
2717 inst.size_precision(),
2718 ts_init,
2719 ) {
2720 Ok(report) => report,
2721 Err(e) => {
2722 log::error!("Failed to parse fill report: {e}");
2723 continue;
2724 }
2725 };
2726
2727 if let Some(start_ns) = start_ns
2728 && report.ts_event < start_ns
2729 {
2730 continue;
2731 }
2732
2733 if let Some(end_ns) = end_ns
2734 && report.ts_event > end_ns
2735 {
2736 continue;
2737 }
2738
2739 reports.push(report);
2740 }
2741
2742 Ok(reports)
2743 }
2744
2745 pub async fn request_position_status_reports(
2772 &self,
2773 account_id: AccountId,
2774 instrument_type: Option<OKXInstrumentType>,
2775 instrument_id: Option<InstrumentId>,
2776 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2777 let mut params = GetPositionsParamsBuilder::default();
2778
2779 let instrument_type = if let Some(instrument_type) = instrument_type {
2780 instrument_type
2781 } else {
2782 let instrument_id = instrument_id.ok_or_else(|| {
2783 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2784 })?;
2785 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2786 okx_instrument_type(&instrument)?
2787 };
2788
2789 params.inst_type(instrument_type);
2790
2791 instrument_id
2792 .as_ref()
2793 .map(|i| params.inst_id(i.symbol.inner()));
2794
2795 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2796
2797 let resp = self
2798 .inner
2799 .get_positions(params)
2800 .await
2801 .map_err(|e| anyhow::anyhow!(e))?;
2802
2803 let ts_init = self.generate_ts_init();
2804 let mut reports = Vec::with_capacity(resp.len());
2805
2806 for position in resp {
2807 let Ok(inst) = self.instrument_from_cache(position.inst_id) else {
2808 log::debug!(
2809 "Skipping position report for instrument not in cache: symbol={}",
2810 position.inst_id,
2811 );
2812 continue;
2813 };
2814
2815 match parse_position_status_report(
2816 position,
2817 account_id,
2818 inst.id(),
2819 inst.size_precision(),
2820 ts_init,
2821 ) {
2822 Ok(report) => reports.push(report),
2823 Err(e) => {
2824 log::error!("Failed to parse position status report: {e}");
2825 continue;
2826 }
2827 };
2828 }
2829
2830 Ok(reports)
2831 }
2832
2833 pub async fn request_spot_margin_position_reports(
2848 &self,
2849 account_id: AccountId,
2850 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2851 let accounts = self
2852 .inner
2853 .get_balance()
2854 .await
2855 .map_err(|e| anyhow::anyhow!(e))?;
2856
2857 let ts_init = self.generate_ts_init();
2858 let mut reports = Vec::new();
2859
2860 for account in accounts {
2861 for balance in account.details {
2862 let ccy_str = balance.ccy.as_str();
2863
2864 let potential_symbols = [
2866 format!("{ccy_str}-USDT"),
2867 format!("{ccy_str}-USD"),
2868 format!("{ccy_str}-USDC"),
2869 ];
2870
2871 let instrument_result = potential_symbols.iter().find_map(|symbol| {
2872 self.instrument_from_cache(Ustr::from(symbol))
2873 .ok()
2874 .map(|inst| (inst.id(), inst.size_precision()))
2875 });
2876
2877 let (instrument_id, size_precision) = match instrument_result {
2878 Some((id, prec)) => (id, prec),
2879 None => {
2880 log::debug!(
2881 "Skipping balance for {ccy_str} - no matching instrument in cache"
2882 );
2883 continue;
2884 }
2885 };
2886
2887 match parse_spot_margin_position_from_balance(
2888 &balance,
2889 account_id,
2890 instrument_id,
2891 size_precision,
2892 ts_init,
2893 ) {
2894 Ok(Some(report)) => reports.push(report),
2895 Ok(None) => {} Err(e) => {
2897 log::error!(
2898 "Failed to parse spot margin position from balance for {ccy_str}: {e}"
2899 );
2900 continue;
2901 }
2902 };
2903 }
2904 }
2905
2906 Ok(reports)
2907 }
2908
2909 pub async fn place_algo_order(
2919 &self,
2920 request: OKXPlaceAlgoOrderRequest,
2921 ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
2922 let body =
2923 serde_json::to_vec(&request).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2924
2925 let resp: Vec<OKXPlaceAlgoOrderResponse> = self
2926 .inner
2927 .send_request::<_, ()>(
2928 Method::POST,
2929 "/api/v5/trade/order-algo",
2930 None,
2931 Some(body),
2932 true,
2933 )
2934 .await?;
2935
2936 resp.into_iter()
2937 .next()
2938 .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2939 }
2940
2941 pub async fn cancel_algo_order(
2951 &self,
2952 request: OKXCancelAlgoOrderRequest,
2953 ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
2954 let body =
2957 serde_json::to_vec(&[request]).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2958
2959 let resp: Vec<OKXCancelAlgoOrderResponse> = self
2960 .inner
2961 .send_request::<_, ()>(
2962 Method::POST,
2963 "/api/v5/trade/cancel-algos",
2964 None,
2965 Some(body),
2966 true,
2967 )
2968 .await?;
2969
2970 resp.into_iter()
2971 .next()
2972 .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2973 }
2974
2975 #[allow(clippy::too_many_arguments)]
2984 pub async fn place_algo_order_with_domain_types(
2985 &self,
2986 instrument_id: InstrumentId,
2987 td_mode: OKXTradeMode,
2988 client_order_id: ClientOrderId,
2989 order_side: OrderSide,
2990 order_type: OrderType,
2991 quantity: Quantity,
2992 trigger_price: Price,
2993 trigger_type: Option<TriggerType>,
2994 limit_price: Option<Price>,
2995 reduce_only: Option<bool>,
2996 ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
2997 if !matches!(order_side, OrderSide::Buy | OrderSide::Sell) {
2998 return Err(OKXHttpError::ValidationError(
2999 "Invalid order side".to_string(),
3000 ));
3001 }
3002 let okx_side: OKXSide = order_side.into();
3003
3004 let trigger_px_type_enum = trigger_type.map_or(OKXTriggerType::Last, Into::into);
3006
3007 let order_px = if matches!(order_type, OrderType::StopLimit | OrderType::LimitIfTouched) {
3009 limit_price.map(|p| p.to_string())
3010 } else {
3011 Some("-1".to_string())
3013 };
3014
3015 let request = OKXPlaceAlgoOrderRequest {
3016 inst_id: instrument_id.symbol.as_str().to_string(),
3017 td_mode,
3018 side: okx_side,
3019 ord_type: OKXAlgoOrderType::Trigger, sz: quantity.to_string(),
3021 algo_cl_ord_id: Some(client_order_id.as_str().to_string()),
3022 trigger_px: Some(trigger_price.to_string()),
3023 order_px,
3024 trigger_px_type: Some(trigger_px_type_enum),
3025 tgt_ccy: None, pos_side: None, close_position: None,
3028 tag: Some(OKX_NAUTILUS_BROKER_ID.to_string()),
3029 reduce_only,
3030 };
3031
3032 self.place_algo_order(request).await
3033 }
3034
3035 pub async fn cancel_algo_order_with_domain_types(
3044 &self,
3045 instrument_id: InstrumentId,
3046 algo_id: String,
3047 ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
3048 let request = OKXCancelAlgoOrderRequest {
3049 inst_id: instrument_id.symbol.to_string(),
3050 algo_id: Some(algo_id),
3051 algo_cl_ord_id: None,
3052 };
3053
3054 self.cancel_algo_order(request).await
3055 }
3056
3057 #[allow(clippy::too_many_arguments)]
3063 pub async fn request_algo_order_status_reports(
3064 &self,
3065 account_id: AccountId,
3066 instrument_type: Option<OKXInstrumentType>,
3067 instrument_id: Option<InstrumentId>,
3068 algo_id: Option<String>,
3069 algo_client_order_id: Option<ClientOrderId>,
3070 state: Option<OKXOrderStatus>,
3071 limit: Option<u32>,
3072 ) -> anyhow::Result<Vec<OrderStatusReport>> {
3073 let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
3074
3075 let inst_type = if let Some(inst_type) = instrument_type {
3076 inst_type
3077 } else if let Some(inst_id) = instrument_id {
3078 let instrument = self.instrument_from_cache(inst_id.symbol.inner())?;
3079 let inst_type = okx_instrument_type(&instrument)?;
3080 instruments_cache.insert(inst_id.symbol.inner(), instrument);
3081 inst_type
3082 } else {
3083 anyhow::bail!("instrument_type or instrument_id required for algo order query")
3084 };
3085
3086 let mut params_builder = GetAlgoOrdersParamsBuilder::default();
3087 params_builder.inst_type(inst_type);
3088 if let Some(inst_id) = instrument_id {
3089 params_builder.inst_id(inst_id.symbol.inner().to_string());
3090 }
3091 if let Some(algo_id) = algo_id.as_ref() {
3092 params_builder.algo_id(algo_id.clone());
3093 }
3094 if let Some(client_order_id) = algo_client_order_id.as_ref() {
3095 params_builder.algo_cl_ord_id(client_order_id.as_str().to_string());
3096 }
3097 if let Some(state) = state {
3098 params_builder.state(state);
3099 }
3100 if let Some(limit) = limit {
3101 params_builder.limit(limit);
3102 }
3103
3104 let params = params_builder
3105 .build()
3106 .map_err(|e| anyhow::anyhow!(format!("Failed to build algo order params: {e}")))?;
3107
3108 let ts_init = self.generate_ts_init();
3109 let mut reports = Vec::new();
3110 let mut seen: AHashSet<(String, String)> = AHashSet::new();
3111
3112 let pending = match self.inner.get_order_algo_pending(params.clone()).await {
3113 Ok(result) => result,
3114 Err(OKXHttpError::UnexpectedStatus { status, .. })
3115 if status == StatusCode::NOT_FOUND =>
3116 {
3117 Vec::new()
3118 }
3119 Err(e) => return Err(e.into()),
3120 };
3121 self.collect_algo_reports(
3122 account_id,
3123 &pending,
3124 &mut instruments_cache,
3125 ts_init,
3126 &mut seen,
3127 &mut reports,
3128 )
3129 .await?;
3130
3131 let history = match self.inner.get_order_algo_history(params).await {
3132 Ok(result) => result,
3133 Err(OKXHttpError::UnexpectedStatus { status, .. })
3134 if status == StatusCode::NOT_FOUND =>
3135 {
3136 Vec::new()
3137 }
3138 Err(e) => return Err(e.into()),
3139 };
3140 self.collect_algo_reports(
3141 account_id,
3142 &history,
3143 &mut instruments_cache,
3144 ts_init,
3145 &mut seen,
3146 &mut reports,
3147 )
3148 .await?;
3149
3150 Ok(reports)
3151 }
3152
3153 pub async fn request_algo_order_status_report(
3159 &self,
3160 account_id: AccountId,
3161 instrument_id: InstrumentId,
3162 algo_client_order_id: ClientOrderId,
3163 ) -> anyhow::Result<Option<OrderStatusReport>> {
3164 let reports = self
3165 .request_algo_order_status_reports(
3166 account_id,
3167 None,
3168 Some(instrument_id),
3169 None,
3170 Some(algo_client_order_id),
3171 None,
3172 Some(50_u32),
3173 )
3174 .await?;
3175
3176 Ok(reports.into_iter().next())
3177 }
3178
3179 pub fn raw_client(&self) -> &Arc<OKXRawHttpClient> {
3181 &self.inner
3182 }
3183
3184 async fn collect_algo_reports(
3185 &self,
3186 account_id: AccountId,
3187 orders: &[OKXOrderAlgo],
3188 instruments_cache: &mut AHashMap<Ustr, InstrumentAny>,
3189 ts_init: UnixNanos,
3190 seen: &mut AHashSet<(String, String)>,
3191 reports: &mut Vec<OrderStatusReport>,
3192 ) -> anyhow::Result<()> {
3193 for order in orders {
3194 let key = (order.algo_id.clone(), order.algo_cl_ord_id.clone());
3195 if !seen.insert(key) {
3196 continue;
3197 }
3198
3199 let instrument = if let Some(instrument) = instruments_cache.get(&order.inst_id) {
3200 instrument.clone()
3201 } else {
3202 let Ok(instrument) = self.instrument_from_cache(order.inst_id) else {
3203 log::debug!(
3204 "Skipping algo order report for instrument not in cache: symbol={}",
3205 order.inst_id,
3206 );
3207 continue;
3208 };
3209 instruments_cache.insert(order.inst_id, instrument.clone());
3210 instrument
3211 };
3212
3213 match parse_http_algo_order(order, account_id, &instrument, ts_init) {
3214 Ok(report) => reports.push(report),
3215 Err(e) => {
3216 log::error!("Failed to parse algo order report: {e}");
3217 }
3218 }
3219 }
3220
3221 Ok(())
3222 }
3223}
3224
3225fn parse_http_algo_order(
3226 order: &OKXOrderAlgo,
3227 account_id: AccountId,
3228 instrument: &InstrumentAny,
3229 ts_init: UnixNanos,
3230) -> anyhow::Result<OrderStatusReport> {
3231 let ord_px = if order.ord_px.is_empty() {
3232 "-1".to_string()
3233 } else {
3234 order.ord_px.clone()
3235 };
3236
3237 let reduce_only = if order.reduce_only.is_empty() {
3238 "false".to_string()
3239 } else {
3240 order.reduce_only.clone()
3241 };
3242
3243 let msg = OKXAlgoOrderMsg {
3244 algo_id: order.algo_id.clone(),
3245 algo_cl_ord_id: order.algo_cl_ord_id.clone(),
3246 cl_ord_id: order.cl_ord_id.clone(),
3247 ord_id: order.ord_id.clone(),
3248 inst_id: order.inst_id,
3249 inst_type: order.inst_type,
3250 ord_type: order.ord_type,
3251 state: order.state,
3252 side: order.side,
3253 pos_side: order.pos_side,
3254 sz: order.sz.clone(),
3255 trigger_px: order.trigger_px.clone(),
3256 trigger_px_type: order.trigger_px_type.unwrap_or(OKXTriggerType::None),
3257 ord_px,
3258 td_mode: order.td_mode,
3259 lever: order.lever.clone(),
3260 reduce_only,
3261 actual_px: order.actual_px.clone(),
3262 actual_sz: order.actual_sz.clone(),
3263 notional_usd: order.notional_usd.clone(),
3264 c_time: order.c_time,
3265 u_time: order.u_time,
3266 trigger_time: order.trigger_time.clone(),
3267 tag: order.tag.clone(),
3268 };
3269
3270 parse_algo_order_status_report(&msg, instrument, account_id, ts_init)
3271}