1use std::{
37 collections::HashMap,
38 fmt::{Debug, Formatter},
39 num::NonZeroU32,
40 str::FromStr,
41 sync::{
42 Arc, LazyLock,
43 atomic::{AtomicBool, Ordering},
44 },
45};
46
47use ahash::{AHashMap, AHashSet};
48use chrono::{DateTime, Utc};
49use dashmap::DashMap;
50use nautilus_core::{
51 UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var, time::get_atomic_clock_realtime,
52};
53use nautilus_model::{
54 data::{Bar, BarType, IndexPriceUpdate, MarkPriceUpdate, TradeTick},
55 enums::{AggregationSource, BarAggregation, OrderSide, OrderType, TriggerType},
56 events::AccountState,
57 identifiers::{AccountId, ClientOrderId, InstrumentId},
58 instruments::{Instrument, InstrumentAny},
59 reports::{FillReport, OrderStatusReport, PositionStatusReport},
60 types::{Price, Quantity},
61};
62use nautilus_network::{
63 http::{HttpClient, Method, StatusCode, USER_AGENT},
64 ratelimiter::quota::Quota,
65 retry::{RetryConfig, RetryManager},
66};
67use rust_decimal::Decimal;
68use serde::{Deserialize, Serialize, de::DeserializeOwned};
69use tokio_util::sync::CancellationToken;
70use ustr::Ustr;
71
72use super::{
73 error::OKXHttpError,
74 models::{
75 OKXAccount, OKXCancelAlgoOrderRequest, OKXCancelAlgoOrderResponse, OKXFeeRate,
76 OKXIndexTicker, OKXMarkPrice, OKXOrderAlgo, OKXOrderHistory, OKXPlaceAlgoOrderRequest,
77 OKXPlaceAlgoOrderResponse, OKXPosition, OKXPositionHistory, OKXPositionTier, OKXServerTime,
78 OKXTransactionDetail,
79 },
80 query::{
81 GetAlgoOrdersParams, GetAlgoOrdersParamsBuilder, GetCandlesticksParams,
82 GetCandlesticksParamsBuilder, GetIndexTickerParams, GetIndexTickerParamsBuilder,
83 GetInstrumentsParams, GetInstrumentsParamsBuilder, GetMarkPriceParams,
84 GetMarkPriceParamsBuilder, GetOrderHistoryParams, GetOrderHistoryParamsBuilder,
85 GetOrderListParams, GetOrderListParamsBuilder, GetPositionTiersParams,
86 GetPositionsHistoryParams, GetPositionsParams, GetPositionsParamsBuilder,
87 GetTradeFeeParams, GetTradesParams, GetTradesParamsBuilder, GetTransactionDetailsParams,
88 GetTransactionDetailsParamsBuilder, SetPositionModeParams, SetPositionModeParamsBuilder,
89 },
90};
91use crate::{
92 common::{
93 consts::{OKX_HTTP_URL, OKX_NAUTILUS_BROKER_ID, should_retry_error_code},
94 credential::Credential,
95 enums::{
96 OKXAlgoOrderType, OKXContractType, OKXInstrumentStatus, OKXInstrumentType,
97 OKXOrderStatus, OKXPositionMode, OKXSide, OKXTradeMode, OKXTriggerType,
98 },
99 models::OKXInstrument,
100 parse::{
101 okx_instrument_type, okx_instrument_type_from_symbol, parse_account_state,
102 parse_candlestick, parse_fill_report, parse_index_price_update, parse_instrument_any,
103 parse_mark_price_update, parse_order_status_report, parse_position_status_report,
104 parse_spot_margin_position_from_balance, parse_trade_tick,
105 },
106 },
107 http::{
108 models::{OKXCandlestick, OKXTrade},
109 query::GetOrderParams,
110 },
111 websocket::{messages::OKXAlgoOrderMsg, parse::parse_algo_order_status_report},
112};
113
114const OKX_SUCCESS_CODE: &str = "0";
115
116pub static OKX_REST_QUOTA: LazyLock<Quota> =
125 LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
126
127const OKX_GLOBAL_RATE_KEY: &str = "okx:global";
128
129#[derive(Debug, Serialize, Deserialize)]
131pub struct OKXResponse<T> {
132 pub code: String,
134 pub msg: String,
136 pub data: Vec<T>,
138}
139
140pub struct OKXRawHttpClient {
146 base_url: String,
147 client: HttpClient,
148 credential: Option<Credential>,
149 retry_manager: RetryManager<OKXHttpError>,
150 cancellation_token: CancellationToken,
151 is_demo: bool,
152}
153
154impl Default for OKXRawHttpClient {
155 fn default() -> Self {
156 Self::new(None, Some(60), None, None, None, false, None)
157 .expect("Failed to create default OKXRawHttpClient")
158 }
159}
160
161impl Debug for OKXRawHttpClient {
162 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
163 let credential = self.credential.as_ref().map(|_| "<redacted>");
164 f.debug_struct(stringify!(OKXRawHttpClient))
165 .field("base_url", &self.base_url)
166 .field("credential", &credential)
167 .finish_non_exhaustive()
168 }
169}
170
171impl OKXRawHttpClient {
172 fn rate_limiter_quotas() -> Vec<(String, Quota)> {
173 vec![
174 (OKX_GLOBAL_RATE_KEY.to_string(), *OKX_REST_QUOTA),
175 (
176 "okx:/api/v5/account/balance".to_string(),
177 Quota::per_second(NonZeroU32::new(5).unwrap()),
178 ),
179 (
180 "okx:/api/v5/public/instruments".to_string(),
181 Quota::per_second(NonZeroU32::new(10).unwrap()),
182 ),
183 (
184 "okx:/api/v5/market/candles".to_string(),
185 Quota::per_second(NonZeroU32::new(50).unwrap()),
186 ),
187 (
188 "okx:/api/v5/market/history-candles".to_string(),
189 Quota::per_second(NonZeroU32::new(20).unwrap()),
190 ),
191 (
192 "okx:/api/v5/market/history-trades".to_string(),
193 Quota::per_second(NonZeroU32::new(30).unwrap()),
194 ),
195 (
196 "okx:/api/v5/trade/order".to_string(),
197 Quota::per_second(NonZeroU32::new(30).unwrap()), ),
199 (
200 "okx:/api/v5/trade/orders-pending".to_string(),
201 Quota::per_second(NonZeroU32::new(20).unwrap()),
202 ),
203 (
204 "okx:/api/v5/trade/orders-history".to_string(),
205 Quota::per_second(NonZeroU32::new(20).unwrap()),
206 ),
207 (
208 "okx:/api/v5/trade/fills".to_string(),
209 Quota::per_second(NonZeroU32::new(30).unwrap()),
210 ),
211 (
212 "okx:/api/v5/trade/order-algo".to_string(),
213 Quota::per_second(NonZeroU32::new(10).unwrap()),
214 ),
215 (
216 "okx:/api/v5/trade/cancel-algos".to_string(),
217 Quota::per_second(NonZeroU32::new(10).unwrap()),
218 ),
219 ]
220 }
221
222 fn rate_limit_keys(endpoint: &str) -> Vec<Ustr> {
223 let normalized = endpoint.split('?').next().unwrap_or(endpoint);
224 let route = format!("okx:{normalized}");
225
226 vec![Ustr::from(OKX_GLOBAL_RATE_KEY), Ustr::from(route.as_str())]
227 }
228
229 pub fn cancel_all_requests(&self) {
231 self.cancellation_token.cancel();
232 }
233
234 pub fn cancellation_token(&self) -> &CancellationToken {
236 &self.cancellation_token
237 }
238
239 pub fn new(
249 base_url: Option<String>,
250 timeout_secs: Option<u64>,
251 max_retries: Option<u32>,
252 retry_delay_ms: Option<u64>,
253 retry_delay_max_ms: Option<u64>,
254 is_demo: bool,
255 proxy_url: Option<String>,
256 ) -> Result<Self, OKXHttpError> {
257 let retry_config = RetryConfig {
258 max_retries: max_retries.unwrap_or(3),
259 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
260 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
261 backoff_factor: 2.0,
262 jitter_ms: 1000,
263 operation_timeout_ms: Some(60_000),
264 immediate_first: false,
265 max_elapsed_ms: Some(180_000),
266 };
267
268 let retry_manager = RetryManager::new(retry_config);
269
270 Ok(Self {
271 base_url: base_url.unwrap_or(OKX_HTTP_URL.to_string()),
272 client: HttpClient::new(
273 Self::default_headers(is_demo),
274 vec![],
275 Self::rate_limiter_quotas(),
276 Some(*OKX_REST_QUOTA),
277 timeout_secs,
278 proxy_url,
279 )
280 .map_err(|e| {
281 OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
282 })?,
283 credential: None,
284 retry_manager,
285 cancellation_token: CancellationToken::new(),
286 is_demo,
287 })
288 }
289
290 #[allow(clippy::too_many_arguments)]
297 pub fn with_credentials(
298 api_key: String,
299 api_secret: String,
300 api_passphrase: String,
301 base_url: String,
302 timeout_secs: Option<u64>,
303 max_retries: Option<u32>,
304 retry_delay_ms: Option<u64>,
305 retry_delay_max_ms: Option<u64>,
306 is_demo: bool,
307 proxy_url: Option<String>,
308 ) -> Result<Self, OKXHttpError> {
309 let retry_config = RetryConfig {
310 max_retries: max_retries.unwrap_or(3),
311 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
312 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
313 backoff_factor: 2.0,
314 jitter_ms: 1000,
315 operation_timeout_ms: Some(60_000),
316 immediate_first: false,
317 max_elapsed_ms: Some(180_000),
318 };
319
320 let retry_manager = RetryManager::new(retry_config);
321
322 Ok(Self {
323 base_url,
324 client: HttpClient::new(
325 Self::default_headers(is_demo),
326 vec![],
327 Self::rate_limiter_quotas(),
328 Some(*OKX_REST_QUOTA),
329 timeout_secs,
330 proxy_url,
331 )
332 .map_err(|e| {
333 OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
334 })?,
335 credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
336 retry_manager,
337 cancellation_token: CancellationToken::new(),
338 is_demo,
339 })
340 }
341
342 fn default_headers(is_demo: bool) -> HashMap<String, String> {
344 let mut headers =
345 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())]);
346
347 if is_demo {
348 headers.insert("x-simulated-trading".to_string(), "1".to_string());
349 }
350
351 headers
352 }
353
354 fn sign_request(
361 &self,
362 method: &Method,
363 path: &str,
364 body: Option<&[u8]>,
365 ) -> Result<HashMap<String, String>, OKXHttpError> {
366 let credential = match self.credential.as_ref() {
367 Some(c) => c,
368 None => return Err(OKXHttpError::MissingCredentials),
369 };
370
371 let api_key = credential.api_key.to_string();
372 let api_passphrase = credential.api_passphrase.clone();
373
374 let now = Utc::now();
376 let millis = now.timestamp_subsec_millis();
377 let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string() + &format!(".{millis:03}Z");
378 let signature = credential.sign_bytes(×tamp, method.as_str(), path, body);
379
380 let mut headers = HashMap::new();
381 headers.insert("OK-ACCESS-KEY".to_string(), api_key);
382 headers.insert("OK-ACCESS-PASSPHRASE".to_string(), api_passphrase);
383 headers.insert("OK-ACCESS-TIMESTAMP".to_string(), timestamp);
384 headers.insert("OK-ACCESS-SIGN".to_string(), signature);
385
386 Ok(headers)
387 }
388
389 async fn send_request<T: DeserializeOwned, P: Serialize>(
405 &self,
406 method: Method,
407 path: &str,
408 params: Option<&P>,
409 body: Option<Vec<u8>>,
410 authenticate: bool,
411 ) -> Result<Vec<T>, OKXHttpError> {
412 let url = format!("{}{path}", self.base_url);
413
414 let rate_keys: Vec<String> = Self::rate_limit_keys(path)
416 .into_iter()
417 .map(|k| k.to_string())
418 .collect();
419
420 let operation = || {
421 let url = url.clone();
422 let method = method.clone();
423 let body = body.clone();
424 let rate_keys = rate_keys.clone();
425
426 async move {
427 let query_string = if let Some(p) = params {
429 serde_urlencoded::to_string(p).map_err(|e| {
430 OKXHttpError::JsonError(format!("Failed to serialize params: {e}"))
431 })?
432 } else {
433 String::new()
434 };
435
436 let full_path = if query_string.is_empty() {
438 path.to_string()
439 } else {
440 format!("{path}?{query_string}")
441 };
442
443 let mut headers = if authenticate {
444 self.sign_request(&method, &full_path, body.as_deref())?
445 } else {
446 HashMap::new()
447 };
448
449 if body.is_some() {
451 headers.insert("Content-Type".to_string(), "application/json".to_string());
452 }
453
454 let resp = self
455 .client
456 .request_with_params(
457 method.clone(),
458 url,
459 params,
460 Some(headers),
461 body,
462 None,
463 Some(rate_keys),
464 )
465 .await?;
466
467 tracing::trace!("Response: {resp:?}");
468
469 if resp.status.is_success() {
470 let okx_response: OKXResponse<T> =
471 serde_json::from_slice(&resp.body).map_err(|e| {
472 tracing::error!("Failed to deserialize OKXResponse: {e}");
473 OKXHttpError::JsonError(e.to_string())
474 })?;
475
476 if okx_response.code != OKX_SUCCESS_CODE {
477 return Err(OKXHttpError::OkxError {
478 error_code: okx_response.code,
479 message: okx_response.msg,
480 });
481 }
482
483 Ok(okx_response.data)
484 } else {
485 let error_body = String::from_utf8_lossy(&resp.body);
486 if resp.status.as_u16() == StatusCode::NOT_FOUND.as_u16() {
487 tracing::debug!("HTTP 404 with body: {error_body}");
488 } else {
489 tracing::error!(
490 "HTTP error {} with body: {error_body}",
491 resp.status.as_str()
492 );
493 }
494
495 if let Ok(parsed_error) = serde_json::from_slice::<OKXResponse<T>>(&resp.body) {
496 return Err(OKXHttpError::OkxError {
497 error_code: parsed_error.code,
498 message: parsed_error.msg,
499 });
500 }
501
502 Err(OKXHttpError::UnexpectedStatus {
503 status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
504 body: error_body.to_string(),
505 })
506 }
507 }
508 };
509
510 let should_retry = |error: &OKXHttpError| -> bool {
519 match error {
520 OKXHttpError::HttpClientError(_) => true,
521 OKXHttpError::UnexpectedStatus { status, .. } => {
522 status.as_u16() >= 500 || status.as_u16() == 429
523 }
524 OKXHttpError::OkxError { error_code, .. } => should_retry_error_code(error_code),
525 _ => false,
526 }
527 };
528
529 let create_error = |msg: String| -> OKXHttpError {
530 if msg == "canceled" {
531 OKXHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
532 } else {
533 OKXHttpError::ValidationError(msg)
534 }
535 };
536
537 self.retry_manager
538 .execute_with_retry_with_cancel(
539 path,
540 operation,
541 should_retry,
542 create_error,
543 &self.cancellation_token,
544 )
545 .await
546 }
547
548 pub async fn set_position_mode(
559 &self,
560 params: SetPositionModeParams,
561 ) -> Result<Vec<serde_json::Value>, OKXHttpError> {
562 let path = "/api/v5/account/set-position-mode";
563 let body = serde_json::to_vec(¶ms)?;
564 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
565 .await
566 }
567
568 pub async fn get_position_tiers(
579 &self,
580 params: GetPositionTiersParams,
581 ) -> Result<Vec<OKXPositionTier>, OKXHttpError> {
582 self.send_request(
583 Method::GET,
584 "/api/v5/public/position-tiers",
585 Some(¶ms),
586 None,
587 false,
588 )
589 .await
590 }
591
592 pub async fn get_instruments(
603 &self,
604 params: GetInstrumentsParams,
605 ) -> Result<Vec<OKXInstrument>, OKXHttpError> {
606 self.send_request(
607 Method::GET,
608 "/api/v5/public/instruments",
609 Some(¶ms),
610 None,
611 false,
612 )
613 .await
614 }
615
616 pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
630 let response: Vec<OKXServerTime> = self
631 .send_request::<_, ()>(Method::GET, "/api/v5/public/time", None, None, false)
632 .await?;
633 response
634 .first()
635 .map(|t| t.ts)
636 .ok_or_else(|| OKXHttpError::JsonError("Empty server time response".to_string()))
637 }
638
639 pub async fn get_mark_price(
653 &self,
654 params: GetMarkPriceParams,
655 ) -> Result<Vec<OKXMarkPrice>, OKXHttpError> {
656 self.send_request(
657 Method::GET,
658 "/api/v5/public/mark-price",
659 Some(¶ms),
660 None,
661 false,
662 )
663 .await
664 }
665
666 pub async fn get_index_tickers(
676 &self,
677 params: GetIndexTickerParams,
678 ) -> Result<Vec<OKXIndexTicker>, OKXHttpError> {
679 self.send_request(
680 Method::GET,
681 "/api/v5/market/index-tickers",
682 Some(¶ms),
683 None,
684 false,
685 )
686 .await
687 }
688
689 pub async fn get_history_trades(
699 &self,
700 params: GetTradesParams,
701 ) -> Result<Vec<OKXTrade>, OKXHttpError> {
702 self.send_request(
703 Method::GET,
704 "/api/v5/market/history-trades",
705 Some(¶ms),
706 None,
707 false,
708 )
709 .await
710 }
711
712 pub async fn get_candles(
722 &self,
723 params: GetCandlesticksParams,
724 ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
725 self.send_request(
726 Method::GET,
727 "/api/v5/market/candles",
728 Some(¶ms),
729 None,
730 false,
731 )
732 .await
733 }
734
735 pub async fn get_history_candles(
745 &self,
746 params: GetCandlesticksParams,
747 ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
748 self.send_request(
749 Method::GET,
750 "/api/v5/market/history-candles",
751 Some(¶ms),
752 None,
753 false,
754 )
755 .await
756 }
757
758 pub async fn get_balance(&self) -> Result<Vec<OKXAccount>, OKXHttpError> {
769 let path = "/api/v5/account/balance";
770 self.send_request::<_, ()>(Method::GET, path, None, None, true)
771 .await
772 }
773
774 pub async fn get_trade_fee(
786 &self,
787 params: GetTradeFeeParams,
788 ) -> Result<Vec<OKXFeeRate>, OKXHttpError> {
789 self.send_request(
790 Method::GET,
791 "/api/v5/account/trade-fee",
792 Some(¶ms),
793 None,
794 true,
795 )
796 .await
797 }
798
799 pub async fn get_order(
809 &self,
810 params: GetOrderParams,
811 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
812 self.send_request(
813 Method::GET,
814 "/api/v5/trade/order",
815 Some(¶ms),
816 None,
817 true,
818 )
819 .await
820 }
821
822 pub async fn get_orders_pending(
832 &self,
833 params: GetOrderListParams,
834 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
835 self.send_request(
836 Method::GET,
837 "/api/v5/trade/orders-pending",
838 Some(¶ms),
839 None,
840 true,
841 )
842 .await
843 }
844
845 pub async fn get_orders_history(
855 &self,
856 params: GetOrderHistoryParams,
857 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
858 self.send_request(
859 Method::GET,
860 "/api/v5/trade/orders-history",
861 Some(¶ms),
862 None,
863 true,
864 )
865 .await
866 }
867
868 pub async fn get_order_algo_pending(
874 &self,
875 params: GetAlgoOrdersParams,
876 ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
877 self.send_request(
878 Method::GET,
879 "/api/v5/trade/order-algo-pending",
880 Some(¶ms),
881 None,
882 true,
883 )
884 .await
885 }
886
887 pub async fn get_order_algo_history(
893 &self,
894 params: GetAlgoOrdersParams,
895 ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
896 self.send_request(
897 Method::GET,
898 "/api/v5/trade/order-algo-history",
899 Some(¶ms),
900 None,
901 true,
902 )
903 .await
904 }
905
906 pub async fn get_fills(
916 &self,
917 params: GetTransactionDetailsParams,
918 ) -> Result<Vec<OKXTransactionDetail>, OKXHttpError> {
919 self.send_request(
920 Method::GET,
921 "/api/v5/trade/fills",
922 Some(¶ms),
923 None,
924 true,
925 )
926 .await
927 }
928
929 pub async fn get_positions(
941 &self,
942 params: GetPositionsParams,
943 ) -> Result<Vec<OKXPosition>, OKXHttpError> {
944 self.send_request(
945 Method::GET,
946 "/api/v5/account/positions",
947 Some(¶ms),
948 None,
949 true,
950 )
951 .await
952 }
953
954 pub async fn get_positions_history(
964 &self,
965 params: GetPositionsHistoryParams,
966 ) -> Result<Vec<OKXPositionHistory>, OKXHttpError> {
967 self.send_request(
968 Method::GET,
969 "/api/v5/account/positions-history",
970 Some(¶ms),
971 None,
972 true,
973 )
974 .await
975 }
976}
977
978#[derive(Debug)]
983#[cfg_attr(
984 feature = "python",
985 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
986)]
987pub struct OKXHttpClient {
988 pub(crate) inner: Arc<OKXRawHttpClient>,
989 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
990 cache_initialized: AtomicBool,
991}
992
993impl Clone for OKXHttpClient {
994 fn clone(&self) -> Self {
995 let cache_initialized = AtomicBool::new(false);
996
997 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
998 if is_initialized {
999 cache_initialized.store(true, Ordering::Release);
1000 }
1001
1002 Self {
1003 inner: self.inner.clone(),
1004 instruments_cache: self.instruments_cache.clone(),
1005 cache_initialized,
1006 }
1007 }
1008}
1009
1010impl Default for OKXHttpClient {
1011 fn default() -> Self {
1012 Self::new(None, Some(60), None, None, None, false, None)
1013 .expect("Failed to create default OKXHttpClient")
1014 }
1015}
1016
1017impl OKXHttpClient {
1018 pub fn new(
1028 base_url: Option<String>,
1029 timeout_secs: Option<u64>,
1030 max_retries: Option<u32>,
1031 retry_delay_ms: Option<u64>,
1032 retry_delay_max_ms: Option<u64>,
1033 is_demo: bool,
1034 proxy_url: Option<String>,
1035 ) -> anyhow::Result<Self> {
1036 Ok(Self {
1037 inner: Arc::new(OKXRawHttpClient::new(
1038 base_url,
1039 timeout_secs,
1040 max_retries,
1041 retry_delay_ms,
1042 retry_delay_max_ms,
1043 is_demo,
1044 proxy_url,
1045 )?),
1046 instruments_cache: Arc::new(DashMap::new()),
1047 cache_initialized: AtomicBool::new(false),
1048 })
1049 }
1050
1051 fn generate_ts_init(&self) -> UnixNanos {
1053 get_atomic_clock_realtime().get_time_ns()
1054 }
1055
1056 pub fn from_env() -> anyhow::Result<Self> {
1063 Self::with_credentials(None, None, None, None, None, None, None, None, false, None)
1064 }
1065
1066 #[allow(clippy::too_many_arguments)]
1073 pub fn with_credentials(
1074 api_key: Option<String>,
1075 api_secret: Option<String>,
1076 api_passphrase: Option<String>,
1077 base_url: Option<String>,
1078 timeout_secs: Option<u64>,
1079 max_retries: Option<u32>,
1080 retry_delay_ms: Option<u64>,
1081 retry_delay_max_ms: Option<u64>,
1082 is_demo: bool,
1083 proxy_url: Option<String>,
1084 ) -> anyhow::Result<Self> {
1085 let api_key = get_or_env_var(api_key, "OKX_API_KEY")?;
1086 let api_secret = get_or_env_var(api_secret, "OKX_API_SECRET")?;
1087 let api_passphrase = get_or_env_var(api_passphrase, "OKX_API_PASSPHRASE")?;
1088 let base_url = base_url.unwrap_or(OKX_HTTP_URL.to_string());
1089
1090 Ok(Self {
1091 inner: Arc::new(OKXRawHttpClient::with_credentials(
1092 api_key,
1093 api_secret,
1094 api_passphrase,
1095 base_url,
1096 timeout_secs,
1097 max_retries,
1098 retry_delay_ms,
1099 retry_delay_max_ms,
1100 is_demo,
1101 proxy_url,
1102 )?),
1103 instruments_cache: Arc::new(DashMap::new()),
1104 cache_initialized: AtomicBool::new(false),
1105 })
1106 }
1107
1108 fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1114 self.instruments_cache
1115 .get(&symbol)
1116 .map(|entry| entry.value().clone())
1117 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
1118 }
1119
1120 pub fn cancel_all_requests(&self) {
1122 self.inner.cancel_all_requests();
1123 }
1124
1125 pub fn cancellation_token(&self) -> &CancellationToken {
1127 self.inner.cancellation_token()
1128 }
1129
1130 pub fn base_url(&self) -> &str {
1132 self.inner.base_url.as_str()
1133 }
1134
1135 pub fn api_key(&self) -> Option<&str> {
1137 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
1138 }
1139
1140 #[must_use]
1142 pub fn api_key_masked(&self) -> Option<String> {
1143 self.inner.credential.as_ref().map(|c| c.api_key_masked())
1144 }
1145
1146 #[must_use]
1148 pub fn is_demo(&self) -> bool {
1149 self.inner.is_demo
1150 }
1151
1152 pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
1160 self.inner.get_server_time().await
1161 }
1162
1163 #[must_use]
1167 pub fn is_initialized(&self) -> bool {
1168 self.cache_initialized.load(Ordering::Acquire)
1169 }
1170
1171 #[must_use]
1174 pub fn get_cached_symbols(&self) -> Vec<String> {
1175 self.instruments_cache
1176 .iter()
1177 .map(|entry| entry.key().to_string())
1178 .collect()
1179 }
1180
1181 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1185 for inst in instruments {
1186 self.instruments_cache
1187 .insert(inst.raw_symbol().inner(), inst);
1188 }
1189 self.cache_initialized.store(true, Ordering::Release);
1190 }
1191
1192 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1196 self.instruments_cache
1197 .insert(instrument.raw_symbol().inner(), instrument);
1198 self.cache_initialized.store(true, Ordering::Release);
1199 }
1200
1201 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1203 self.instruments_cache
1204 .get(symbol)
1205 .map(|entry| entry.value().clone())
1206 }
1207
1208 pub async fn request_account_state(
1214 &self,
1215 account_id: AccountId,
1216 ) -> anyhow::Result<AccountState> {
1217 let resp = self
1218 .inner
1219 .get_balance()
1220 .await
1221 .map_err(|e| anyhow::anyhow!(e))?;
1222
1223 let ts_init = self.generate_ts_init();
1224 let raw = resp
1225 .first()
1226 .ok_or_else(|| anyhow::anyhow!("No account state returned from OKX"))?;
1227 let account_state = parse_account_state(raw, account_id, ts_init)?;
1228
1229 Ok(account_state)
1230 }
1231
1232 pub async fn set_position_mode(&self, position_mode: OKXPositionMode) -> anyhow::Result<()> {
1245 let mut params = SetPositionModeParamsBuilder::default();
1246 params.pos_mode(position_mode);
1247 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1248
1249 match self.inner.set_position_mode(params).await {
1250 Ok(_) => Ok(()),
1251 Err(e) => {
1252 if let OKXHttpError::OkxError {
1253 error_code,
1254 message,
1255 } = &e
1256 && error_code == "50115"
1257 {
1258 tracing::warn!(
1259 "Account does not support position mode setting (derivatives trading not enabled): {message}"
1260 );
1261 return Ok(()); }
1263 anyhow::bail!(e)
1264 }
1265 }
1266 }
1267
1268 pub async fn request_instruments(
1274 &self,
1275 instrument_type: OKXInstrumentType,
1276 instrument_family: Option<String>,
1277 ) -> anyhow::Result<Vec<InstrumentAny>> {
1278 let mut params = GetInstrumentsParamsBuilder::default();
1279 params.inst_type(instrument_type);
1280
1281 if let Some(family) = instrument_family.clone() {
1282 params.inst_family(family);
1283 }
1284
1285 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1286
1287 let resp = self
1288 .inner
1289 .get_instruments(params)
1290 .await
1291 .map_err(|e| anyhow::anyhow!(e))?;
1292
1293 let fee_rate_opt = {
1294 let fee_params = GetTradeFeeParams {
1295 inst_type: instrument_type,
1296 uly: None,
1297 inst_family: instrument_family,
1298 };
1299
1300 match self.inner.get_trade_fee(fee_params).await {
1301 Ok(rates) => rates.into_iter().next(),
1302 Err(OKXHttpError::MissingCredentials) => {
1303 tracing::debug!("Missing credentials for fee rates, using None");
1304 None
1305 }
1306 Err(e) => {
1307 tracing::warn!("Failed to fetch fee rates for {instrument_type}: {e}");
1308 None
1309 }
1310 }
1311 };
1312
1313 let ts_init = self.generate_ts_init();
1314
1315 let mut instruments: Vec<InstrumentAny> = Vec::new();
1316 for inst in &resp {
1317 if inst.state == OKXInstrumentStatus::Preopen {
1320 continue;
1321 }
1322
1323 let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1328 let is_usdt_margined = inst.ct_type == OKXContractType::Linear;
1329 let (maker_str, taker_str) = if is_usdt_margined {
1330 (&fee_rate.maker_u, &fee_rate.taker_u)
1331 } else {
1332 (&fee_rate.maker, &fee_rate.taker)
1333 };
1334
1335 let maker = if !maker_str.is_empty() {
1336 Decimal::from_str(maker_str).ok().map(|v| -v)
1337 } else {
1338 None
1339 };
1340 let taker = if !taker_str.is_empty() {
1341 Decimal::from_str(taker_str).ok().map(|v| -v)
1342 } else {
1343 None
1344 };
1345
1346 (maker, taker)
1347 } else {
1348 (None, None)
1349 };
1350
1351 match parse_instrument_any(inst, None, None, maker_fee, taker_fee, ts_init) {
1352 Ok(Some(instrument_any)) => {
1353 instruments.push(instrument_any);
1354 }
1355 Ok(None) => {
1356 }
1358 Err(e) => {
1359 tracing::warn!("Failed to parse instrument {}: {e}", inst.inst_id);
1360 }
1361 }
1362 }
1363
1364 Ok(instruments)
1365 }
1366
1367 pub async fn request_instrument(
1378 &self,
1379 instrument_id: InstrumentId,
1380 ) -> anyhow::Result<InstrumentAny> {
1381 let symbol = instrument_id.symbol.as_str();
1382 let instrument_type = okx_instrument_type_from_symbol(symbol);
1383
1384 let mut params = GetInstrumentsParamsBuilder::default();
1385 params.inst_type(instrument_type);
1386 params.inst_id(symbol);
1387
1388 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1389
1390 let resp = self
1391 .inner
1392 .get_instruments(params)
1393 .await
1394 .map_err(|e| anyhow::anyhow!(e))?;
1395
1396 let raw_inst = resp
1397 .first()
1398 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found"))?;
1399
1400 if raw_inst.state == OKXInstrumentStatus::Preopen {
1402 anyhow::bail!("Instrument {symbol} is in pre-open state");
1403 }
1404
1405 let fee_rate_opt = {
1406 let fee_params = GetTradeFeeParams {
1407 inst_type: instrument_type,
1408 uly: None,
1409 inst_family: None,
1410 };
1411
1412 match self.inner.get_trade_fee(fee_params).await {
1413 Ok(rates) => rates.into_iter().next(),
1414 Err(OKXHttpError::MissingCredentials) => {
1415 tracing::debug!("Missing credentials for fee rates, using None");
1416 None
1417 }
1418 Err(e) => {
1419 tracing::warn!("Failed to fetch fee rates for {symbol}: {e}");
1420 None
1421 }
1422 }
1423 };
1424
1425 let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1429 let is_usdt_margined = raw_inst.ct_type == OKXContractType::Linear;
1430 let (maker_str, taker_str) = if is_usdt_margined {
1431 (&fee_rate.maker_u, &fee_rate.taker_u)
1432 } else {
1433 (&fee_rate.maker, &fee_rate.taker)
1434 };
1435
1436 let maker = if !maker_str.is_empty() {
1437 Decimal::from_str(maker_str).ok().map(|v| -v)
1438 } else {
1439 None
1440 };
1441 let taker = if !taker_str.is_empty() {
1442 Decimal::from_str(taker_str).ok().map(|v| -v)
1443 } else {
1444 None
1445 };
1446
1447 (maker, taker)
1448 } else {
1449 (None, None)
1450 };
1451
1452 let ts_init = self.generate_ts_init();
1453 let instrument = parse_instrument_any(raw_inst, None, None, maker_fee, taker_fee, ts_init)?
1454 .ok_or_else(|| anyhow::anyhow!("Unsupported instrument type for {symbol}"))?;
1455
1456 self.cache_instrument(instrument.clone());
1457
1458 Ok(instrument)
1459 }
1460
1461 pub async fn request_mark_price(
1467 &self,
1468 instrument_id: InstrumentId,
1469 ) -> anyhow::Result<MarkPriceUpdate> {
1470 let mut params = GetMarkPriceParamsBuilder::default();
1471 params.inst_id(instrument_id.symbol.inner());
1472 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1473
1474 let resp = self
1475 .inner
1476 .get_mark_price(params)
1477 .await
1478 .map_err(|e| anyhow::anyhow!(e))?;
1479
1480 let raw = resp
1481 .first()
1482 .ok_or_else(|| anyhow::anyhow!("No mark price returned from OKX"))?;
1483 let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1484 let ts_init = self.generate_ts_init();
1485
1486 let mark_price =
1487 parse_mark_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1488 .map_err(|e| anyhow::anyhow!(e))?;
1489 Ok(mark_price)
1490 }
1491
1492 pub async fn request_index_price(
1498 &self,
1499 instrument_id: InstrumentId,
1500 ) -> anyhow::Result<IndexPriceUpdate> {
1501 let mut params = GetIndexTickerParamsBuilder::default();
1502 params.inst_id(instrument_id.symbol.inner());
1503 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1504
1505 let resp = self
1506 .inner
1507 .get_index_tickers(params)
1508 .await
1509 .map_err(|e| anyhow::anyhow!(e))?;
1510
1511 let raw = resp
1512 .first()
1513 .ok_or_else(|| anyhow::anyhow!("No index price returned from OKX"))?;
1514 let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1515 let ts_init = self.generate_ts_init();
1516
1517 let index_price =
1518 parse_index_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1519 .map_err(|e| anyhow::anyhow!(e))?;
1520 Ok(index_price)
1521 }
1522
1523 pub async fn request_trades(
1533 &self,
1534 instrument_id: InstrumentId,
1535 start: Option<DateTime<Utc>>,
1536 end: Option<DateTime<Utc>>,
1537 limit: Option<u32>,
1538 ) -> anyhow::Result<Vec<TradeTick>> {
1539 const OKX_TRADES_MAX_LIMIT: u32 = 100;
1540
1541 let limit = if limit == Some(0) { None } else { limit };
1542
1543 if let (Some(s), Some(e)) = (start, end) {
1544 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1545 }
1546
1547 let now = Utc::now();
1548
1549 if let Some(s) = start
1550 && s > now
1551 {
1552 return Ok(Vec::new());
1553 }
1554
1555 let end = if let Some(e) = end
1556 && e > now
1557 {
1558 Some(now)
1559 } else {
1560 end
1561 };
1562
1563 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1564 enum Mode {
1565 Latest,
1566 Backward,
1567 Range,
1568 }
1569
1570 let mode = match (start, end) {
1571 (None, None) => Mode::Latest,
1572 (Some(_), None) => Mode::Backward,
1573 (None, Some(_)) => Mode::Backward,
1574 (Some(_), Some(_)) => Mode::Range,
1575 };
1576
1577 let start_ms = start.map(|s| s.timestamp_millis());
1578 let end_ms = end.map(|e| e.timestamp_millis());
1579
1580 let ts_init = self.generate_ts_init();
1581 let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1582
1583 if matches!(mode, Mode::Backward | Mode::Range) {
1586 let mut before_trade_id: Option<String> = None;
1587 let mut pages = 0usize;
1588 let mut page_results: Vec<Vec<TradeTick>> = Vec::new();
1589 let mut seen_trades: std::collections::HashSet<(String, i64)> =
1590 std::collections::HashSet::new();
1591 let mut unique_count = 0usize;
1592 let mut consecutive_empty_pages = 0usize;
1593 const MAX_PAGES: usize = 500;
1594 const MAX_CONSECUTIVE_EMPTY: usize = 3;
1595
1596 let effective_limit = if start.is_some() {
1599 limit.unwrap_or(u32::MAX)
1600 } else {
1601 limit.unwrap_or(OKX_TRADES_MAX_LIMIT)
1602 };
1603
1604 tracing::debug!(
1605 "Starting trades pagination: mode={:?}, start={:?}, end={:?}, limit={:?}, effective_limit={}",
1606 mode,
1607 start,
1608 end,
1609 limit,
1610 effective_limit
1611 );
1612
1613 loop {
1614 if pages >= MAX_PAGES {
1615 tracing::warn!("Hit MAX_PAGES limit of {}", MAX_PAGES);
1616 break;
1617 }
1618
1619 if effective_limit < u32::MAX && unique_count >= effective_limit as usize {
1620 tracing::debug!("Reached effective limit: unique_count={}", unique_count);
1621 break;
1622 }
1623
1624 let remaining = (effective_limit as usize).saturating_sub(unique_count);
1625 let page_cap = remaining.min(OKX_TRADES_MAX_LIMIT as usize) as u32;
1626
1627 tracing::debug!(
1628 "Requesting page {}: before_id={:?}, page_cap={}, unique_count={}",
1629 pages + 1,
1630 before_trade_id,
1631 page_cap,
1632 unique_count
1633 );
1634
1635 let mut params_builder = GetTradesParamsBuilder::default();
1636 params_builder
1637 .inst_id(instrument_id.symbol.inner())
1638 .limit(page_cap)
1639 .pagination_type(1);
1640
1641 if let Some(ref before_id) = before_trade_id {
1643 params_builder.after(before_id.clone());
1644 }
1645
1646 let params = params_builder.build().map_err(anyhow::Error::new)?;
1647 let raw = self
1648 .inner
1649 .get_history_trades(params)
1650 .await
1651 .map_err(anyhow::Error::new)?;
1652
1653 tracing::debug!("Received {} raw trades from API", raw.len());
1654
1655 if !raw.is_empty() {
1656 let first_id = &raw.first().unwrap().trade_id;
1657 let last_id = &raw.last().unwrap().trade_id;
1658 tracing::debug!(
1659 "Raw response trade ID range: first={} (newest), last={} (oldest)",
1660 first_id,
1661 last_id
1662 );
1663 }
1664
1665 if raw.is_empty() {
1666 tracing::debug!("API returned empty page, stopping pagination");
1667 break;
1668 }
1669
1670 pages += 1;
1671
1672 let mut page_trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
1673 let mut hit_start_boundary = false;
1674 let mut filtered_out = 0usize;
1675 let mut duplicates = 0usize;
1676
1677 for r in &raw {
1678 match parse_trade_tick(
1679 r,
1680 instrument_id,
1681 inst.price_precision(),
1682 inst.size_precision(),
1683 ts_init,
1684 ) {
1685 Ok(trade) => {
1686 let ts_ms = trade.ts_event.as_i64() / 1_000_000;
1687
1688 if let Some(e_ms) = end_ms
1689 && ts_ms > e_ms
1690 {
1691 filtered_out += 1;
1692 continue;
1693 }
1694
1695 if let Some(s_ms) = start_ms
1696 && ts_ms < s_ms
1697 {
1698 hit_start_boundary = true;
1699 filtered_out += 1;
1700 break;
1701 }
1702
1703 let trade_key = (trade.trade_id.to_string(), trade.ts_event.as_i64());
1704 if seen_trades.insert(trade_key) {
1705 unique_count += 1;
1706 page_trades.push(trade);
1707 } else {
1708 duplicates += 1;
1709 }
1710 }
1711 Err(e) => tracing::error!("{e}"),
1712 }
1713 }
1714
1715 tracing::debug!(
1716 "Page {} processed: {} trades kept, {} filtered out, {} duplicates, hit_start_boundary={}",
1717 pages,
1718 page_trades.len(),
1719 filtered_out,
1720 duplicates,
1721 hit_start_boundary
1722 );
1723
1724 let oldest_trade_id = if !page_trades.is_empty() {
1726 let oldest_id = page_trades.last().map(|t| {
1728 let id = t.trade_id.to_string();
1729 tracing::debug!(
1730 "Setting cursor from deduplicated trades: oldest_id={}, ts_event={}",
1731 id,
1732 t.ts_event.as_i64()
1733 );
1734 id
1735 });
1736 page_trades.reverse();
1737 page_results.push(page_trades);
1738 consecutive_empty_pages = 0;
1739 oldest_id
1740 } else {
1741 if unique_count > 0 {
1744 consecutive_empty_pages += 1;
1745 if consecutive_empty_pages >= MAX_CONSECUTIVE_EMPTY {
1746 tracing::debug!(
1747 "Stopping: {} consecutive pages with no trades in range after collecting {} trades",
1748 consecutive_empty_pages,
1749 unique_count
1750 );
1751 break;
1752 }
1753 }
1754 raw.last().map(|t| {
1756 let id = t.trade_id.to_string();
1757 tracing::debug!(
1758 "Setting cursor from raw response (no unique trades): oldest_id={}",
1759 id
1760 );
1761 id
1762 })
1763 };
1764
1765 if let Some(ref old_id) = before_trade_id
1766 && oldest_trade_id.as_ref() == Some(old_id)
1767 {
1768 break;
1769 }
1770
1771 if oldest_trade_id.is_none() {
1772 break;
1773 }
1774
1775 before_trade_id = oldest_trade_id;
1776
1777 if hit_start_boundary {
1778 break;
1779 }
1780
1781 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1782 }
1783
1784 tracing::debug!(
1785 "Pagination complete: {} pages, {} unique trades collected",
1786 pages,
1787 unique_count
1788 );
1789
1790 let mut out: Vec<TradeTick> = Vec::new();
1791 for page in page_results.into_iter().rev() {
1792 out.extend(page);
1793 }
1794
1795 let mut dedup_keys = std::collections::HashSet::new();
1797 let pre_dedup_len = out.len();
1798 out.retain(|trade| {
1799 dedup_keys.insert((trade.trade_id.to_string(), trade.ts_event.as_i64()))
1800 });
1801
1802 if out.len() < pre_dedup_len {
1803 tracing::debug!(
1804 "Removed {} duplicate trades during final dedup",
1805 pre_dedup_len - out.len()
1806 );
1807 }
1808
1809 if let Some(lim) = limit
1810 && lim > 0
1811 && out.len() > lim as usize
1812 {
1813 let excess = out.len() - lim as usize;
1814 tracing::debug!("Trimming {} oldest trades to respect limit={}", excess, lim);
1815 out.drain(0..excess);
1816 }
1817
1818 tracing::debug!("Returning {} trades", out.len());
1819 return Ok(out);
1820 }
1821
1822 let req_limit = limit
1823 .unwrap_or(OKX_TRADES_MAX_LIMIT)
1824 .min(OKX_TRADES_MAX_LIMIT);
1825 let params = GetTradesParamsBuilder::default()
1826 .inst_id(instrument_id.symbol.inner())
1827 .limit(req_limit)
1828 .build()
1829 .map_err(anyhow::Error::new)?;
1830
1831 let raw = self
1832 .inner
1833 .get_history_trades(params)
1834 .await
1835 .map_err(anyhow::Error::new)?;
1836
1837 let mut trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
1838 for r in &raw {
1839 match parse_trade_tick(
1840 r,
1841 instrument_id,
1842 inst.price_precision(),
1843 inst.size_precision(),
1844 ts_init,
1845 ) {
1846 Ok(trade) => trades.push(trade),
1847 Err(e) => tracing::error!("{e}"),
1848 }
1849 }
1850
1851 trades.reverse();
1853
1854 if let Some(lim) = limit
1855 && lim > 0
1856 && trades.len() > lim as usize
1857 {
1858 trades.drain(0..trades.len() - lim as usize);
1859 }
1860
1861 Ok(trades)
1862 }
1863
1864 pub async fn request_bars(
1909 &self,
1910 bar_type: BarType,
1911 start: Option<DateTime<Utc>>,
1912 mut end: Option<DateTime<Utc>>,
1913 limit: Option<u32>,
1914 ) -> anyhow::Result<Vec<Bar>> {
1915 const HISTORY_SPLIT_DAYS: i64 = 100;
1916 const MAX_PAGES_SOFT: usize = 500;
1917
1918 let limit = if limit == Some(0) { None } else { limit };
1919
1920 anyhow::ensure!(
1921 bar_type.aggregation_source() == AggregationSource::External,
1922 "Only EXTERNAL aggregation is supported"
1923 );
1924
1925 if let (Some(s), Some(e)) = (start, end) {
1926 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1927 }
1928
1929 let now = Utc::now();
1930
1931 if let Some(s) = start
1932 && s > now
1933 {
1934 return Ok(Vec::new());
1935 }
1936 if let Some(e) = end
1937 && e > now
1938 {
1939 end = Some(now);
1940 }
1941
1942 let spec = bar_type.spec();
1943 let step = spec.step.get();
1944 let bar_param = match spec.aggregation {
1945 BarAggregation::Second => format!("{step}s"),
1946 BarAggregation::Minute => format!("{step}m"),
1947 BarAggregation::Hour => format!("{step}H"),
1948 BarAggregation::Day => format!("{step}D"),
1949 BarAggregation::Week => format!("{step}W"),
1950 BarAggregation::Month => format!("{step}M"),
1951 a => anyhow::bail!("OKX does not support {a:?} aggregation"),
1952 };
1953
1954 let slot_ms: i64 = match spec.aggregation {
1955 BarAggregation::Second => (step as i64) * 1_000,
1956 BarAggregation::Minute => (step as i64) * 60_000,
1957 BarAggregation::Hour => (step as i64) * 3_600_000,
1958 BarAggregation::Day => (step as i64) * 86_400_000,
1959 BarAggregation::Week => (step as i64) * 7 * 86_400_000,
1960 BarAggregation::Month => (step as i64) * 30 * 86_400_000,
1961 _ => unreachable!("Unsupported aggregation should have been caught above"),
1962 };
1963 let slot_ns: i64 = slot_ms * 1_000_000;
1964
1965 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1966 enum Mode {
1967 Latest,
1968 Backward,
1969 Range,
1970 }
1971
1972 let mode = match (start, end) {
1973 (None, None) => Mode::Latest,
1974 (Some(_), None) => Mode::Backward, (None, Some(_)) => Mode::Backward,
1976 (Some(_), Some(_)) => Mode::Range,
1977 };
1978
1979 let start_ns = start.and_then(|s| s.timestamp_nanos_opt());
1980 let end_ns = end.and_then(|e| e.timestamp_nanos_opt());
1981
1982 let start_ms = start.map(|s| {
1984 let ms = s.timestamp_millis();
1985 if slot_ms > 0 {
1986 (ms / slot_ms) * slot_ms } else {
1988 ms
1989 }
1990 });
1991 let end_ms = end.map(|e| {
1992 let ms = e.timestamp_millis();
1993 if slot_ms > 0 {
1994 ((ms + slot_ms - 1) / slot_ms) * slot_ms } else {
1996 ms
1997 }
1998 });
1999 let now_ms = now.timestamp_millis();
2000
2001 let symbol = bar_type.instrument_id().symbol;
2002 let inst = self.instrument_from_cache(symbol.inner())?;
2003
2004 let mut out: Vec<Bar> = Vec::new();
2005 let mut pages = 0usize;
2006
2007 let mut after_ms: Option<i64> = match mode {
2012 Mode::Range => end_ms.or(Some(now_ms)), _ => None,
2014 };
2015 let mut before_ms: Option<i64> = match mode {
2016 Mode::Backward => end_ms.map(|v| v.saturating_sub(1)),
2017 Mode::Range => start_ms, Mode::Latest => None,
2019 };
2020
2021 let mut forward_prepend_mode = matches!(mode, Mode::Range);
2023
2024 if matches!(mode, Mode::Backward | Mode::Range)
2028 && let Some(b) = before_ms
2029 {
2030 let buffer_ms = slot_ms.max(60_000); if b >= now_ms.saturating_sub(buffer_ms) {
2036 before_ms = Some(now_ms.saturating_sub(buffer_ms));
2037 }
2038 }
2039
2040 let mut have_latest_first_page = false;
2041 let mut progressless_loops = 0u8;
2042
2043 loop {
2044 if let Some(lim) = limit
2045 && lim > 0
2046 && out.len() >= lim as usize
2047 {
2048 break;
2049 }
2050 if pages >= MAX_PAGES_SOFT {
2051 break;
2052 }
2053
2054 let pivot_ms = if let Some(a) = after_ms {
2055 a
2056 } else if let Some(b) = before_ms {
2057 b
2058 } else {
2059 now_ms
2060 };
2061 let age_ms = now_ms.saturating_sub(pivot_ms);
2067 let age_hours = age_ms / (60 * 60 * 1000);
2068 let using_history = age_hours > 1; let page_ceiling = if using_history { 100 } else { 300 };
2071 let remaining = limit
2072 .filter(|&l| l > 0) .map_or(page_ceiling, |l| (l as usize).saturating_sub(out.len()));
2074 let page_cap = remaining.min(page_ceiling);
2075
2076 let mut p = GetCandlesticksParamsBuilder::default();
2077 p.inst_id(symbol.as_str())
2078 .bar(&bar_param)
2079 .limit(page_cap as u32);
2080
2081 let mut req_used_before = false;
2083
2084 match mode {
2085 Mode::Latest => {
2086 if have_latest_first_page && let Some(b) = before_ms {
2087 p.before_ms(b);
2088 req_used_before = true;
2089 }
2090 }
2091 Mode::Backward => {
2092 if let Some(b) = before_ms {
2094 p.after_ms(b);
2095 }
2096 }
2097 Mode::Range => {
2098 if let Some(a) = after_ms {
2101 p.after_ms(a);
2102 }
2103 if let Some(b) = before_ms {
2104 p.before_ms(b);
2105 req_used_before = true;
2106 }
2107 }
2108 }
2109
2110 let params = p.build().map_err(anyhow::Error::new)?;
2111
2112 let mut raw = if using_history {
2113 self.inner
2114 .get_history_candles(params.clone())
2115 .await
2116 .map_err(anyhow::Error::new)?
2117 } else {
2118 self.inner
2119 .get_candles(params.clone())
2120 .await
2121 .map_err(anyhow::Error::new)?
2122 };
2123
2124 if raw.is_empty() {
2126 if matches!(mode, Mode::Latest)
2128 && have_latest_first_page
2129 && !using_history
2130 && let Some(b) = before_ms
2131 {
2132 let mut p2 = GetCandlesticksParamsBuilder::default();
2133 p2.inst_id(symbol.as_str())
2134 .bar(&bar_param)
2135 .limit(page_cap as u32);
2136 p2.before_ms(b);
2137 let params2 = p2.build().map_err(anyhow::Error::new)?;
2138 let raw2 = self
2139 .inner
2140 .get_history_candles(params2)
2141 .await
2142 .map_err(anyhow::Error::new)?;
2143 if !raw2.is_empty() {
2144 raw = raw2;
2145 } else {
2146 let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2148 before_ms = Some(b.saturating_sub(jump));
2149 progressless_loops = progressless_loops.saturating_add(1);
2150 if progressless_loops >= 3 {
2151 break;
2152 }
2153 continue;
2154 }
2155 }
2156
2157 if raw.is_empty() && matches!(mode, Mode::Range) && pages > 0 {
2161 let backstep_ms = (page_cap as i64).saturating_mul(slot_ms.max(1));
2162 let pivot_back = after_ms.unwrap_or(now_ms).saturating_sub(backstep_ms);
2163
2164 let mut p2 = GetCandlesticksParamsBuilder::default();
2165 p2.inst_id(symbol.as_str())
2166 .bar(&bar_param)
2167 .limit(page_cap as u32)
2168 .before_ms(pivot_back);
2169 let params2 = p2.build().map_err(anyhow::Error::new)?;
2170 let raw2 = if (now_ms.saturating_sub(pivot_back)) / (24 * 60 * 60 * 1000)
2171 > HISTORY_SPLIT_DAYS
2172 {
2173 self.inner.get_history_candles(params2).await
2174 } else {
2175 self.inner.get_candles(params2).await
2176 }
2177 .map_err(anyhow::Error::new)?;
2178 if raw2.is_empty() {
2179 break;
2180 } else {
2181 raw = raw2;
2182 forward_prepend_mode = true;
2183 req_used_before = true;
2184 }
2185 }
2186
2187 if raw.is_empty()
2189 && matches!(mode, Mode::Latest)
2190 && !have_latest_first_page
2191 && !using_history
2192 {
2193 let jump_days_ms = (HISTORY_SPLIT_DAYS + 1) * 86_400_000;
2194 before_ms = Some(now_ms.saturating_sub(jump_days_ms));
2195 have_latest_first_page = true;
2196 continue;
2197 }
2198
2199 if raw.is_empty() {
2201 break;
2202 }
2203 }
2204 pages += 1;
2207
2208 let ts_init = self.generate_ts_init();
2210 let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2211 for r in &raw {
2212 page.push(parse_candlestick(
2213 r,
2214 bar_type,
2215 inst.price_precision(),
2216 inst.size_precision(),
2217 ts_init,
2218 )?);
2219 }
2220 page.reverse();
2221
2222 let page_oldest_ms = page.first().map(|b| b.ts_event.as_i64() / 1_000_000);
2223 let page_newest_ms = page.last().map(|b| b.ts_event.as_i64() / 1_000_000);
2224
2225 let mut filtered: Vec<Bar> = if matches!(mode, Mode::Range)
2229 && out.is_empty()
2230 && pages < 2
2231 {
2232 let tolerance_ns = slot_ns * 2; if !page.is_empty() {
2239 tracing::debug!(
2240 "Range mode bootstrap page: {} bars from {} to {}, filtering with start={:?} end={:?}",
2241 page.len(),
2242 page.first().unwrap().ts_event.as_i64() / 1_000_000,
2243 page.last().unwrap().ts_event.as_i64() / 1_000_000,
2244 start_ms,
2245 end_ms
2246 );
2247 }
2248
2249 let result: Vec<Bar> = page
2250 .clone()
2251 .into_iter()
2252 .filter(|b| {
2253 let ts = b.ts_event.as_i64();
2254 let ok_after =
2256 start_ns.is_none_or(|sns| ts >= sns.saturating_sub(tolerance_ns));
2257 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2258 ok_after && ok_before
2259 })
2260 .collect();
2261
2262 result
2263 } else {
2264 page.clone()
2266 .into_iter()
2267 .filter(|b| {
2268 let ts = b.ts_event.as_i64();
2269 let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2270 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2271 ok_after && ok_before
2272 })
2273 .collect()
2274 };
2275
2276 if !page.is_empty() && filtered.is_empty() {
2277 if matches!(mode, Mode::Range)
2279 && !forward_prepend_mode
2280 && let (Some(newest_ms), Some(start_ms)) = (page_newest_ms, start_ms)
2281 && newest_ms < start_ms.saturating_sub(slot_ms * 2)
2282 {
2283 break;
2285 }
2286 }
2287
2288 let contribution;
2290
2291 if out.is_empty() {
2292 contribution = filtered.len();
2293 out = filtered;
2294 } else {
2295 match mode {
2296 Mode::Backward | Mode::Latest => {
2297 if let Some(first) = out.first() {
2298 filtered.retain(|b| b.ts_event < first.ts_event);
2299 }
2300 contribution = filtered.len();
2301 if contribution != 0 {
2302 let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2303 new_out.extend_from_slice(&filtered);
2304 new_out.extend_from_slice(&out);
2305 out = new_out;
2306 }
2307 }
2308 Mode::Range => {
2309 if forward_prepend_mode || req_used_before {
2310 if let Some(first) = out.first() {
2312 filtered.retain(|b| b.ts_event < first.ts_event);
2313 }
2314 contribution = filtered.len();
2315 if contribution != 0 {
2316 let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2317 new_out.extend_from_slice(&filtered);
2318 new_out.extend_from_slice(&out);
2319 out = new_out;
2320 }
2321 } else {
2322 if let Some(last) = out.last() {
2324 filtered.retain(|b| b.ts_event > last.ts_event);
2325 }
2326 contribution = filtered.len();
2327 out.extend(filtered);
2328 }
2329 }
2330 }
2331 }
2332
2333 if contribution == 0
2335 && matches!(mode, Mode::Latest | Mode::Backward | Mode::Range)
2336 && let Some(b) = before_ms
2337 {
2338 let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2339 let new_b = b.saturating_sub(jump);
2340 if new_b != b {
2341 before_ms = Some(new_b);
2342 }
2343 }
2344
2345 if contribution == 0 {
2346 progressless_loops = progressless_loops.saturating_add(1);
2347 if progressless_loops >= 3 {
2348 break;
2349 }
2350 } else {
2351 progressless_loops = 0;
2352
2353 match mode {
2355 Mode::Latest | Mode::Backward => {
2356 if let Some(oldest) = page_oldest_ms {
2357 before_ms = Some(oldest.saturating_sub(1));
2358 have_latest_first_page = true;
2359 } else {
2360 break;
2361 }
2362 }
2363 Mode::Range => {
2364 if forward_prepend_mode || req_used_before {
2365 if let Some(oldest) = page_oldest_ms {
2366 let jump_back = slot_ms.max(60_000); before_ms = Some(oldest.saturating_sub(jump_back));
2369 after_ms = None;
2370 } else {
2371 break;
2372 }
2373 } else if let Some(newest) = page_newest_ms {
2374 after_ms = Some(newest.saturating_add(1));
2375 before_ms = None;
2376 } else {
2377 break;
2378 }
2379 }
2380 }
2381 }
2382
2383 if let Some(lim) = limit
2385 && lim > 0
2386 && out.len() >= lim as usize
2387 {
2388 break;
2389 }
2390 if let Some(ens) = end_ns
2391 && let Some(last) = out.last()
2392 && last.ts_event.as_i64() >= ens
2393 {
2394 break;
2395 }
2396 if let Some(sns) = start_ns
2397 && let Some(first) = out.first()
2398 && (matches!(mode, Mode::Backward) || forward_prepend_mode)
2399 && first.ts_event.as_i64() <= sns
2400 {
2401 if matches!(mode, Mode::Range) {
2403 if let Some(ens) = end_ns
2405 && let Some(last) = out.last()
2406 {
2407 let last_ts = last.ts_event.as_i64();
2408 if last_ts < ens {
2409 forward_prepend_mode = false;
2412 after_ms = Some((last_ts / 1_000_000).saturating_add(1));
2413 before_ms = None;
2414 continue;
2415 }
2416 }
2417 }
2418 break;
2419 }
2420
2421 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2422 }
2423
2424 if out.is_empty() && matches!(mode, Mode::Range) {
2426 let pivot = end_ms.unwrap_or(now_ms.saturating_sub(1));
2427 let hist = (now_ms.saturating_sub(pivot)) / (24 * 60 * 60 * 1000) > HISTORY_SPLIT_DAYS;
2428 let mut p = GetCandlesticksParamsBuilder::default();
2429 p.inst_id(symbol.as_str())
2430 .bar(&bar_param)
2431 .limit(300)
2432 .before_ms(pivot);
2433 let params = p.build().map_err(anyhow::Error::new)?;
2434 let raw = if hist {
2435 self.inner.get_history_candles(params).await
2436 } else {
2437 self.inner.get_candles(params).await
2438 }
2439 .map_err(anyhow::Error::new)?;
2440 if !raw.is_empty() {
2441 let ts_init = self.generate_ts_init();
2442 let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2443 for r in &raw {
2444 page.push(parse_candlestick(
2445 r,
2446 bar_type,
2447 inst.price_precision(),
2448 inst.size_precision(),
2449 ts_init,
2450 )?);
2451 }
2452 page.reverse();
2453 out = page
2454 .into_iter()
2455 .filter(|b| {
2456 let ts = b.ts_event.as_i64();
2457 let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2458 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2459 ok_after && ok_before
2460 })
2461 .collect();
2462 }
2463 }
2464
2465 if let Some(ens) = end_ns {
2467 while out.last().is_some_and(|b| b.ts_event.as_i64() > ens) {
2468 out.pop();
2469 }
2470 }
2471
2472 if matches!(mode, Mode::Range)
2474 && !forward_prepend_mode
2475 && let Some(sns) = start_ns
2476 {
2477 let lower = sns.saturating_sub(slot_ns);
2478 while out.first().is_some_and(|b| b.ts_event.as_i64() < lower) {
2479 out.remove(0);
2480 }
2481 }
2482
2483 if let Some(lim) = limit
2484 && lim > 0
2485 && out.len() > lim as usize
2486 {
2487 out.truncate(lim as usize);
2488 }
2489
2490 Ok(out)
2491 }
2492
2493 #[allow(clippy::too_many_arguments)]
2504 pub async fn request_order_status_reports(
2505 &self,
2506 account_id: AccountId,
2507 instrument_type: Option<OKXInstrumentType>,
2508 instrument_id: Option<InstrumentId>,
2509 start: Option<DateTime<Utc>>,
2510 end: Option<DateTime<Utc>>,
2511 open_only: bool,
2512 limit: Option<u32>,
2513 ) -> anyhow::Result<Vec<OrderStatusReport>> {
2514 let mut history_params = GetOrderHistoryParamsBuilder::default();
2515
2516 let instrument_type = if let Some(instrument_type) = instrument_type {
2517 instrument_type
2518 } else {
2519 let instrument_id = instrument_id.ok_or_else(|| {
2520 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2521 })?;
2522 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2523 okx_instrument_type(&instrument)?
2524 };
2525
2526 history_params.inst_type(instrument_type);
2527
2528 if let Some(instrument_id) = instrument_id.as_ref() {
2529 history_params.inst_id(instrument_id.symbol.inner().to_string());
2530 }
2531
2532 if let Some(limit) = limit {
2533 history_params.limit(limit);
2534 }
2535
2536 let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
2537
2538 let mut pending_params = GetOrderListParamsBuilder::default();
2539 pending_params.inst_type(instrument_type);
2540
2541 if let Some(instrument_id) = instrument_id.as_ref() {
2542 pending_params.inst_id(instrument_id.symbol.inner().to_string());
2543 }
2544
2545 if let Some(limit) = limit {
2546 pending_params.limit(limit);
2547 }
2548
2549 let pending_params = pending_params.build().map_err(|e| anyhow::anyhow!(e))?;
2550
2551 let combined_resp = if open_only {
2552 self.inner
2554 .get_orders_pending(pending_params)
2555 .await
2556 .map_err(|e| anyhow::anyhow!(e))?
2557 } else {
2558 let (history_resp, pending_resp) = tokio::try_join!(
2560 self.inner.get_orders_history(history_params),
2561 self.inner.get_orders_pending(pending_params)
2562 )
2563 .map_err(|e| anyhow::anyhow!(e))?;
2564
2565 let mut combined_resp = history_resp;
2567 combined_resp.extend(pending_resp);
2568 combined_resp
2569 };
2570
2571 let start_ns = start.map(UnixNanos::from);
2573 let end_ns = end.map(UnixNanos::from);
2574
2575 let ts_init = self.generate_ts_init();
2576 let mut reports = Vec::with_capacity(combined_resp.len());
2577
2578 let mut seen: AHashSet<String> = AHashSet::new();
2580
2581 for order in combined_resp {
2582 let seen_key = if !order.cl_ord_id.is_empty() {
2583 order.cl_ord_id.as_str().to_string()
2584 } else if let Some(algo_cl_ord_id) = order
2585 .algo_cl_ord_id
2586 .as_ref()
2587 .filter(|value| !value.as_str().is_empty())
2588 {
2589 algo_cl_ord_id.as_str().to_string()
2590 } else if let Some(algo_id) = order
2591 .algo_id
2592 .as_ref()
2593 .filter(|value| !value.as_str().is_empty())
2594 {
2595 algo_id.as_str().to_string()
2596 } else {
2597 order.ord_id.as_str().to_string()
2598 };
2599
2600 if !seen.insert(seen_key) {
2601 continue; }
2603
2604 let Ok(inst) = self.instrument_from_cache(order.inst_id) else {
2605 tracing::debug!(
2606 symbol = %order.inst_id,
2607 "Skipping order report for instrument not in cache"
2608 );
2609 continue;
2610 };
2611
2612 let report = match parse_order_status_report(
2613 &order,
2614 account_id,
2615 inst.id(),
2616 inst.price_precision(),
2617 inst.size_precision(),
2618 ts_init,
2619 ) {
2620 Ok(report) => report,
2621 Err(e) => {
2622 tracing::error!("Failed to parse order status report: {e}");
2623 continue;
2624 }
2625 };
2626
2627 if let Some(start_ns) = start_ns
2628 && report.ts_last < start_ns
2629 {
2630 continue;
2631 }
2632 if let Some(end_ns) = end_ns
2633 && report.ts_last > end_ns
2634 {
2635 continue;
2636 }
2637
2638 reports.push(report);
2639 }
2640
2641 Ok(reports)
2642 }
2643
2644 pub async fn request_fill_reports(
2654 &self,
2655 account_id: AccountId,
2656 instrument_type: Option<OKXInstrumentType>,
2657 instrument_id: Option<InstrumentId>,
2658 start: Option<DateTime<Utc>>,
2659 end: Option<DateTime<Utc>>,
2660 limit: Option<u32>,
2661 ) -> anyhow::Result<Vec<FillReport>> {
2662 let mut params = GetTransactionDetailsParamsBuilder::default();
2663
2664 let instrument_type = if let Some(instrument_type) = instrument_type {
2665 instrument_type
2666 } else {
2667 let instrument_id = instrument_id.ok_or_else(|| {
2668 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2669 })?;
2670 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2671 okx_instrument_type(&instrument)?
2672 };
2673
2674 params.inst_type(instrument_type);
2675
2676 if let Some(instrument_id) = instrument_id {
2677 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2678 let instrument_type = okx_instrument_type(&instrument)?;
2679 params.inst_type(instrument_type);
2680 params.inst_id(instrument_id.symbol.inner().to_string());
2681 }
2682
2683 if let Some(limit) = limit {
2684 params.limit(limit);
2685 }
2686
2687 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2688
2689 let resp = self
2690 .inner
2691 .get_fills(params)
2692 .await
2693 .map_err(|e| anyhow::anyhow!(e))?;
2694
2695 let start_ns = start.map(UnixNanos::from);
2697 let end_ns = end.map(UnixNanos::from);
2698
2699 let ts_init = self.generate_ts_init();
2700 let mut reports = Vec::with_capacity(resp.len());
2701
2702 for detail in resp {
2703 if detail.fill_sz.is_empty() {
2705 continue;
2706 }
2707 if let Ok(qty) = detail.fill_sz.parse::<f64>() {
2708 if qty <= 0.0 {
2709 continue;
2710 }
2711 } else {
2712 continue;
2714 }
2715
2716 let Ok(inst) = self.instrument_from_cache(detail.inst_id) else {
2717 tracing::debug!(
2718 symbol = %detail.inst_id,
2719 "Skipping fill report for instrument not in cache"
2720 );
2721 continue;
2722 };
2723
2724 let report = match parse_fill_report(
2725 detail,
2726 account_id,
2727 inst.id(),
2728 inst.price_precision(),
2729 inst.size_precision(),
2730 ts_init,
2731 ) {
2732 Ok(report) => report,
2733 Err(e) => {
2734 tracing::error!("Failed to parse fill report: {e}");
2735 continue;
2736 }
2737 };
2738
2739 if let Some(start_ns) = start_ns
2740 && report.ts_event < start_ns
2741 {
2742 continue;
2743 }
2744
2745 if let Some(end_ns) = end_ns
2746 && report.ts_event > end_ns
2747 {
2748 continue;
2749 }
2750
2751 reports.push(report);
2752 }
2753
2754 Ok(reports)
2755 }
2756
2757 pub async fn request_position_status_reports(
2784 &self,
2785 account_id: AccountId,
2786 instrument_type: Option<OKXInstrumentType>,
2787 instrument_id: Option<InstrumentId>,
2788 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2789 let mut params = GetPositionsParamsBuilder::default();
2790
2791 let instrument_type = if let Some(instrument_type) = instrument_type {
2792 instrument_type
2793 } else {
2794 let instrument_id = instrument_id.ok_or_else(|| {
2795 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2796 })?;
2797 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2798 okx_instrument_type(&instrument)?
2799 };
2800
2801 params.inst_type(instrument_type);
2802
2803 instrument_id
2804 .as_ref()
2805 .map(|i| params.inst_id(i.symbol.inner()));
2806
2807 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2808
2809 let resp = self
2810 .inner
2811 .get_positions(params)
2812 .await
2813 .map_err(|e| anyhow::anyhow!(e))?;
2814
2815 let ts_init = self.generate_ts_init();
2816 let mut reports = Vec::with_capacity(resp.len());
2817
2818 for position in resp {
2819 let Ok(inst) = self.instrument_from_cache(position.inst_id) else {
2820 tracing::debug!(
2821 symbol = %position.inst_id,
2822 "Skipping position report for instrument not in cache"
2823 );
2824 continue;
2825 };
2826
2827 match parse_position_status_report(
2828 position,
2829 account_id,
2830 inst.id(),
2831 inst.size_precision(),
2832 ts_init,
2833 ) {
2834 Ok(report) => reports.push(report),
2835 Err(e) => {
2836 tracing::error!("Failed to parse position status report: {e}");
2837 continue;
2838 }
2839 };
2840 }
2841
2842 Ok(reports)
2843 }
2844
2845 pub async fn request_spot_margin_position_reports(
2860 &self,
2861 account_id: AccountId,
2862 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2863 let accounts = self
2864 .inner
2865 .get_balance()
2866 .await
2867 .map_err(|e| anyhow::anyhow!(e))?;
2868
2869 let ts_init = self.generate_ts_init();
2870 let mut reports = Vec::new();
2871
2872 for account in accounts {
2873 for balance in account.details {
2874 let ccy_str = balance.ccy.as_str();
2875
2876 let potential_symbols = [
2878 format!("{ccy_str}-USDT"),
2879 format!("{ccy_str}-USD"),
2880 format!("{ccy_str}-USDC"),
2881 ];
2882
2883 let instrument_result = potential_symbols.iter().find_map(|symbol| {
2884 self.instrument_from_cache(Ustr::from(symbol))
2885 .ok()
2886 .map(|inst| (inst.id(), inst.size_precision()))
2887 });
2888
2889 let (instrument_id, size_precision) = match instrument_result {
2890 Some((id, prec)) => (id, prec),
2891 None => {
2892 tracing::debug!(
2893 "Skipping balance for {} - no matching instrument in cache",
2894 ccy_str
2895 );
2896 continue;
2897 }
2898 };
2899
2900 match parse_spot_margin_position_from_balance(
2901 &balance,
2902 account_id,
2903 instrument_id,
2904 size_precision,
2905 ts_init,
2906 ) {
2907 Ok(Some(report)) => reports.push(report),
2908 Ok(None) => {} Err(e) => {
2910 tracing::error!(
2911 "Failed to parse spot margin position from balance for {}: {e}",
2912 ccy_str
2913 );
2914 continue;
2915 }
2916 };
2917 }
2918 }
2919
2920 Ok(reports)
2921 }
2922
2923 pub async fn place_algo_order(
2933 &self,
2934 request: OKXPlaceAlgoOrderRequest,
2935 ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
2936 let body =
2937 serde_json::to_vec(&request).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2938
2939 let resp: Vec<OKXPlaceAlgoOrderResponse> = self
2940 .inner
2941 .send_request::<_, ()>(
2942 Method::POST,
2943 "/api/v5/trade/order-algo",
2944 None,
2945 Some(body),
2946 true,
2947 )
2948 .await?;
2949
2950 resp.into_iter()
2951 .next()
2952 .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2953 }
2954
2955 pub async fn cancel_algo_order(
2965 &self,
2966 request: OKXCancelAlgoOrderRequest,
2967 ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
2968 let body =
2971 serde_json::to_vec(&[request]).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2972
2973 let resp: Vec<OKXCancelAlgoOrderResponse> = self
2974 .inner
2975 .send_request::<_, ()>(
2976 Method::POST,
2977 "/api/v5/trade/cancel-algos",
2978 None,
2979 Some(body),
2980 true,
2981 )
2982 .await?;
2983
2984 resp.into_iter()
2985 .next()
2986 .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2987 }
2988
2989 #[allow(clippy::too_many_arguments)]
2998 pub async fn place_algo_order_with_domain_types(
2999 &self,
3000 instrument_id: InstrumentId,
3001 td_mode: OKXTradeMode,
3002 client_order_id: ClientOrderId,
3003 order_side: OrderSide,
3004 order_type: OrderType,
3005 quantity: Quantity,
3006 trigger_price: Price,
3007 trigger_type: Option<TriggerType>,
3008 limit_price: Option<Price>,
3009 reduce_only: Option<bool>,
3010 ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
3011 if !matches!(order_side, OrderSide::Buy | OrderSide::Sell) {
3012 return Err(OKXHttpError::ValidationError(
3013 "Invalid order side".to_string(),
3014 ));
3015 }
3016 let okx_side: OKXSide = order_side.into();
3017
3018 let trigger_px_type_enum = trigger_type.map_or(OKXTriggerType::Last, Into::into);
3020
3021 let order_px = if matches!(order_type, OrderType::StopLimit | OrderType::LimitIfTouched) {
3023 limit_price.map(|p| p.to_string())
3024 } else {
3025 Some("-1".to_string())
3027 };
3028
3029 let request = OKXPlaceAlgoOrderRequest {
3030 inst_id: instrument_id.symbol.as_str().to_string(),
3031 td_mode,
3032 side: okx_side,
3033 ord_type: OKXAlgoOrderType::Trigger, sz: quantity.to_string(),
3035 algo_cl_ord_id: Some(client_order_id.as_str().to_string()),
3036 trigger_px: Some(trigger_price.to_string()),
3037 order_px,
3038 trigger_px_type: Some(trigger_px_type_enum),
3039 tgt_ccy: None, pos_side: None, close_position: None,
3042 tag: Some(OKX_NAUTILUS_BROKER_ID.to_string()),
3043 reduce_only,
3044 };
3045
3046 self.place_algo_order(request).await
3047 }
3048
3049 pub async fn cancel_algo_order_with_domain_types(
3058 &self,
3059 instrument_id: InstrumentId,
3060 algo_id: String,
3061 ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
3062 let request = OKXCancelAlgoOrderRequest {
3063 inst_id: instrument_id.symbol.to_string(),
3064 algo_id: Some(algo_id),
3065 algo_cl_ord_id: None,
3066 };
3067
3068 self.cancel_algo_order(request).await
3069 }
3070
3071 #[allow(clippy::too_many_arguments)]
3077 pub async fn request_algo_order_status_reports(
3078 &self,
3079 account_id: AccountId,
3080 instrument_type: Option<OKXInstrumentType>,
3081 instrument_id: Option<InstrumentId>,
3082 algo_id: Option<String>,
3083 algo_client_order_id: Option<ClientOrderId>,
3084 state: Option<OKXOrderStatus>,
3085 limit: Option<u32>,
3086 ) -> anyhow::Result<Vec<OrderStatusReport>> {
3087 let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
3088
3089 let inst_type = if let Some(inst_type) = instrument_type {
3090 inst_type
3091 } else if let Some(inst_id) = instrument_id {
3092 let instrument = self.instrument_from_cache(inst_id.symbol.inner())?;
3093 let inst_type = okx_instrument_type(&instrument)?;
3094 instruments_cache.insert(inst_id.symbol.inner(), instrument);
3095 inst_type
3096 } else {
3097 anyhow::bail!("instrument_type or instrument_id required for algo order query")
3098 };
3099
3100 let mut params_builder = GetAlgoOrdersParamsBuilder::default();
3101 params_builder.inst_type(inst_type);
3102 if let Some(inst_id) = instrument_id {
3103 params_builder.inst_id(inst_id.symbol.inner().to_string());
3104 }
3105 if let Some(algo_id) = algo_id.as_ref() {
3106 params_builder.algo_id(algo_id.clone());
3107 }
3108 if let Some(client_order_id) = algo_client_order_id.as_ref() {
3109 params_builder.algo_cl_ord_id(client_order_id.as_str().to_string());
3110 }
3111 if let Some(state) = state {
3112 params_builder.state(state);
3113 }
3114 if let Some(limit) = limit {
3115 params_builder.limit(limit);
3116 }
3117
3118 let params = params_builder
3119 .build()
3120 .map_err(|e| anyhow::anyhow!(format!("Failed to build algo order params: {e}")))?;
3121
3122 let ts_init = self.generate_ts_init();
3123 let mut reports = Vec::new();
3124 let mut seen: AHashSet<(String, String)> = AHashSet::new();
3125
3126 let pending = match self.inner.get_order_algo_pending(params.clone()).await {
3127 Ok(result) => result,
3128 Err(OKXHttpError::UnexpectedStatus { status, .. })
3129 if status == StatusCode::NOT_FOUND =>
3130 {
3131 Vec::new()
3132 }
3133 Err(e) => return Err(e.into()),
3134 };
3135 self.collect_algo_reports(
3136 account_id,
3137 &pending,
3138 &mut instruments_cache,
3139 ts_init,
3140 &mut seen,
3141 &mut reports,
3142 )
3143 .await?;
3144
3145 let history = match self.inner.get_order_algo_history(params).await {
3146 Ok(result) => result,
3147 Err(OKXHttpError::UnexpectedStatus { status, .. })
3148 if status == StatusCode::NOT_FOUND =>
3149 {
3150 Vec::new()
3151 }
3152 Err(e) => return Err(e.into()),
3153 };
3154 self.collect_algo_reports(
3155 account_id,
3156 &history,
3157 &mut instruments_cache,
3158 ts_init,
3159 &mut seen,
3160 &mut reports,
3161 )
3162 .await?;
3163
3164 Ok(reports)
3165 }
3166
3167 pub async fn request_algo_order_status_report(
3173 &self,
3174 account_id: AccountId,
3175 instrument_id: InstrumentId,
3176 algo_client_order_id: ClientOrderId,
3177 ) -> anyhow::Result<Option<OrderStatusReport>> {
3178 let reports = self
3179 .request_algo_order_status_reports(
3180 account_id,
3181 None,
3182 Some(instrument_id),
3183 None,
3184 Some(algo_client_order_id),
3185 None,
3186 Some(50_u32),
3187 )
3188 .await?;
3189
3190 Ok(reports.into_iter().next())
3191 }
3192
3193 pub fn raw_client(&self) -> &Arc<OKXRawHttpClient> {
3195 &self.inner
3196 }
3197
3198 async fn collect_algo_reports(
3199 &self,
3200 account_id: AccountId,
3201 orders: &[OKXOrderAlgo],
3202 instruments_cache: &mut AHashMap<Ustr, InstrumentAny>,
3203 ts_init: UnixNanos,
3204 seen: &mut AHashSet<(String, String)>,
3205 reports: &mut Vec<OrderStatusReport>,
3206 ) -> anyhow::Result<()> {
3207 for order in orders {
3208 let key = (order.algo_id.clone(), order.algo_cl_ord_id.clone());
3209 if !seen.insert(key) {
3210 continue;
3211 }
3212
3213 let instrument = if let Some(instrument) = instruments_cache.get(&order.inst_id) {
3214 instrument.clone()
3215 } else {
3216 let Ok(instrument) = self.instrument_from_cache(order.inst_id) else {
3217 tracing::debug!(
3218 symbol = %order.inst_id,
3219 "Skipping algo order report for instrument not in cache"
3220 );
3221 continue;
3222 };
3223 instruments_cache.insert(order.inst_id, instrument.clone());
3224 instrument
3225 };
3226
3227 match parse_http_algo_order(order, account_id, &instrument, ts_init) {
3228 Ok(report) => reports.push(report),
3229 Err(e) => {
3230 tracing::error!("Failed to parse algo order report: {e}");
3231 }
3232 }
3233 }
3234
3235 Ok(())
3236 }
3237}
3238
3239fn parse_http_algo_order(
3240 order: &OKXOrderAlgo,
3241 account_id: AccountId,
3242 instrument: &InstrumentAny,
3243 ts_init: UnixNanos,
3244) -> anyhow::Result<OrderStatusReport> {
3245 let ord_px = if order.ord_px.is_empty() {
3246 "-1".to_string()
3247 } else {
3248 order.ord_px.clone()
3249 };
3250
3251 let reduce_only = if order.reduce_only.is_empty() {
3252 "false".to_string()
3253 } else {
3254 order.reduce_only.clone()
3255 };
3256
3257 let msg = OKXAlgoOrderMsg {
3258 algo_id: order.algo_id.clone(),
3259 algo_cl_ord_id: order.algo_cl_ord_id.clone(),
3260 cl_ord_id: order.cl_ord_id.clone(),
3261 ord_id: order.ord_id.clone(),
3262 inst_id: order.inst_id,
3263 inst_type: order.inst_type,
3264 ord_type: order.ord_type,
3265 state: order.state,
3266 side: order.side,
3267 pos_side: order.pos_side,
3268 sz: order.sz.clone(),
3269 trigger_px: order.trigger_px.clone(),
3270 trigger_px_type: order.trigger_px_type.unwrap_or(OKXTriggerType::None),
3271 ord_px,
3272 td_mode: order.td_mode,
3273 lever: order.lever.clone(),
3274 reduce_only,
3275 actual_px: order.actual_px.clone(),
3276 actual_sz: order.actual_sz.clone(),
3277 notional_usd: order.notional_usd.clone(),
3278 c_time: order.c_time,
3279 u_time: order.u_time,
3280 trigger_time: order.trigger_time.clone(),
3281 tag: order.tag.clone(),
3282 };
3283
3284 parse_algo_order_status_report(&msg, instrument, account_id, ts_init)
3285}