1use std::{
37 collections::HashMap,
38 fmt::Debug,
39 num::NonZeroU32,
40 str::FromStr,
41 sync::{
42 Arc, LazyLock,
43 atomic::{AtomicBool, Ordering},
44 },
45};
46
47use ahash::{AHashMap, AHashSet};
48use chrono::{DateTime, Utc};
49use dashmap::DashMap;
50use nautilus_core::{
51 UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var, time::get_atomic_clock_realtime,
52};
53use nautilus_model::{
54 data::{Bar, BarType, IndexPriceUpdate, MarkPriceUpdate, TradeTick},
55 enums::{AggregationSource, BarAggregation, OrderSide, OrderType, TriggerType},
56 events::AccountState,
57 identifiers::{AccountId, ClientOrderId, InstrumentId},
58 instruments::{Instrument, InstrumentAny},
59 reports::{FillReport, OrderStatusReport, PositionStatusReport},
60 types::{Price, Quantity},
61};
62use nautilus_network::{
63 http::{HttpClient, Method, StatusCode, USER_AGENT},
64 ratelimiter::quota::Quota,
65 retry::{RetryConfig, RetryManager},
66};
67use rust_decimal::Decimal;
68use serde::{Deserialize, Serialize, de::DeserializeOwned};
69use tokio_util::sync::CancellationToken;
70use ustr::Ustr;
71
72use super::{
73 error::OKXHttpError,
74 models::{
75 OKXAccount, OKXCancelAlgoOrderRequest, OKXCancelAlgoOrderResponse, OKXFeeRate,
76 OKXIndexTicker, OKXMarkPrice, OKXOrderAlgo, OKXOrderHistory, OKXPlaceAlgoOrderRequest,
77 OKXPlaceAlgoOrderResponse, OKXPosition, OKXPositionHistory, OKXPositionTier, OKXServerTime,
78 OKXTransactionDetail,
79 },
80 query::{
81 GetAlgoOrdersParams, GetAlgoOrdersParamsBuilder, GetCandlesticksParams,
82 GetCandlesticksParamsBuilder, GetIndexTickerParams, GetIndexTickerParamsBuilder,
83 GetInstrumentsParams, GetInstrumentsParamsBuilder, GetMarkPriceParams,
84 GetMarkPriceParamsBuilder, GetOrderHistoryParams, GetOrderHistoryParamsBuilder,
85 GetOrderListParams, GetOrderListParamsBuilder, GetPositionTiersParams,
86 GetPositionsHistoryParams, GetPositionsParams, GetPositionsParamsBuilder,
87 GetTradeFeeParams, GetTradesParams, GetTradesParamsBuilder, GetTransactionDetailsParams,
88 GetTransactionDetailsParamsBuilder, SetPositionModeParams, SetPositionModeParamsBuilder,
89 },
90};
91use crate::{
92 common::{
93 consts::{OKX_HTTP_URL, OKX_NAUTILUS_BROKER_ID, should_retry_error_code},
94 credential::Credential,
95 enums::{
96 OKXAlgoOrderType, OKXContractType, OKXInstrumentStatus, OKXInstrumentType,
97 OKXOrderStatus, OKXPositionMode, OKXSide, OKXTradeMode, OKXTriggerType,
98 },
99 models::OKXInstrument,
100 parse::{
101 okx_instrument_type, okx_instrument_type_from_symbol, parse_account_state,
102 parse_candlestick, parse_fill_report, parse_index_price_update, parse_instrument_any,
103 parse_mark_price_update, parse_order_status_report, parse_position_status_report,
104 parse_spot_margin_position_from_balance, parse_trade_tick,
105 },
106 },
107 http::{
108 models::{OKXCandlestick, OKXTrade},
109 query::GetOrderParams,
110 },
111 websocket::{messages::OKXAlgoOrderMsg, parse::parse_algo_order_status_report},
112};
113
114const OKX_SUCCESS_CODE: &str = "0";
115
116fn resolve_okx_error_message(response_body: &[u8], top_level_msg: &str) -> String {
117 let message = top_level_msg.trim();
118 if !message.is_empty() {
119 return message.to_string();
120 }
121
122 if let Ok(payload) = serde_json::from_slice::<serde_json::Value>(response_body)
123 && let Some(first_item) = payload
124 .get("data")
125 .and_then(serde_json::Value::as_array)
126 .and_then(|items| items.first())
127 {
128 if let Some(s_msg) = first_item.get("sMsg").and_then(serde_json::Value::as_str) {
129 let s_msg = s_msg.trim();
130 if !s_msg.is_empty() {
131 return s_msg.to_string();
132 }
133 }
134
135 if let Some(s_code) = first_item.get("sCode").and_then(serde_json::Value::as_str) {
136 let s_code = s_code.trim();
137 if !s_code.is_empty() {
138 return s_code.to_string();
139 }
140 }
141 }
142
143 String::new()
144}
145
146pub static OKX_REST_QUOTA: LazyLock<Quota> =
155 LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
156
157const OKX_GLOBAL_RATE_KEY: &str = "okx:global";
158
159#[derive(Debug, Serialize, Deserialize)]
161pub struct OKXResponse<T> {
162 pub code: String,
164 pub msg: String,
166 pub data: Vec<T>,
168}
169
170pub struct OKXRawHttpClient {
176 base_url: String,
177 client: HttpClient,
178 credential: Option<Credential>,
179 retry_manager: RetryManager<OKXHttpError>,
180 cancellation_token: CancellationToken,
181 is_demo: bool,
182}
183
184impl Default for OKXRawHttpClient {
185 fn default() -> Self {
186 Self::new(None, Some(60), None, None, None, false, None)
187 .expect("Failed to create default OKXRawHttpClient")
188 }
189}
190
191impl Debug for OKXRawHttpClient {
192 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193 let credential = self.credential.as_ref().map(|_| "<redacted>");
194 f.debug_struct(stringify!(OKXRawHttpClient))
195 .field("base_url", &self.base_url)
196 .field("credential", &credential)
197 .finish_non_exhaustive()
198 }
199}
200
201impl OKXRawHttpClient {
202 fn rate_limiter_quotas() -> Vec<(String, Quota)> {
203 vec![
204 (OKX_GLOBAL_RATE_KEY.to_string(), *OKX_REST_QUOTA),
205 (
206 "okx:/api/v5/account/balance".to_string(),
207 Quota::per_second(NonZeroU32::new(5).unwrap()),
208 ),
209 (
210 "okx:/api/v5/public/instruments".to_string(),
211 Quota::per_second(NonZeroU32::new(10).unwrap()),
212 ),
213 (
214 "okx:/api/v5/market/candles".to_string(),
215 Quota::per_second(NonZeroU32::new(50).unwrap()),
216 ),
217 (
218 "okx:/api/v5/market/history-candles".to_string(),
219 Quota::per_second(NonZeroU32::new(20).unwrap()),
220 ),
221 (
222 "okx:/api/v5/market/history-trades".to_string(),
223 Quota::per_second(NonZeroU32::new(30).unwrap()),
224 ),
225 (
226 "okx:/api/v5/trade/order".to_string(),
227 Quota::per_second(NonZeroU32::new(30).unwrap()), ),
229 (
230 "okx:/api/v5/trade/orders-pending".to_string(),
231 Quota::per_second(NonZeroU32::new(20).unwrap()),
232 ),
233 (
234 "okx:/api/v5/trade/orders-history".to_string(),
235 Quota::per_second(NonZeroU32::new(20).unwrap()),
236 ),
237 (
238 "okx:/api/v5/trade/fills".to_string(),
239 Quota::per_second(NonZeroU32::new(30).unwrap()),
240 ),
241 (
242 "okx:/api/v5/trade/order-algo".to_string(),
243 Quota::per_second(NonZeroU32::new(10).unwrap()),
244 ),
245 (
246 "okx:/api/v5/trade/cancel-algos".to_string(),
247 Quota::per_second(NonZeroU32::new(10).unwrap()),
248 ),
249 ]
250 }
251
252 fn rate_limit_keys(endpoint: &str) -> Vec<Ustr> {
253 let normalized = endpoint.split('?').next().unwrap_or(endpoint);
254 let route = format!("okx:{normalized}");
255
256 vec![Ustr::from(OKX_GLOBAL_RATE_KEY), Ustr::from(route.as_str())]
257 }
258
259 pub fn cancel_all_requests(&self) {
261 self.cancellation_token.cancel();
262 }
263
264 pub fn cancellation_token(&self) -> &CancellationToken {
266 &self.cancellation_token
267 }
268
269 pub fn new(
279 base_url: Option<String>,
280 timeout_secs: Option<u64>,
281 max_retries: Option<u32>,
282 retry_delay_ms: Option<u64>,
283 retry_delay_max_ms: Option<u64>,
284 is_demo: bool,
285 proxy_url: Option<String>,
286 ) -> Result<Self, OKXHttpError> {
287 let retry_config = RetryConfig {
288 max_retries: max_retries.unwrap_or(3),
289 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
290 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
291 backoff_factor: 2.0,
292 jitter_ms: 1000,
293 operation_timeout_ms: Some(60_000),
294 immediate_first: false,
295 max_elapsed_ms: Some(180_000),
296 };
297
298 let retry_manager = RetryManager::new(retry_config);
299
300 Ok(Self {
301 base_url: base_url.unwrap_or(OKX_HTTP_URL.to_string()),
302 client: HttpClient::new(
303 Self::default_headers(is_demo),
304 vec![],
305 Self::rate_limiter_quotas(),
306 Some(*OKX_REST_QUOTA),
307 timeout_secs,
308 proxy_url,
309 )
310 .map_err(|e| {
311 OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
312 })?,
313 credential: None,
314 retry_manager,
315 cancellation_token: CancellationToken::new(),
316 is_demo,
317 })
318 }
319
320 #[allow(clippy::too_many_arguments)]
327 pub fn with_credentials(
328 api_key: String,
329 api_secret: String,
330 api_passphrase: String,
331 base_url: String,
332 timeout_secs: Option<u64>,
333 max_retries: Option<u32>,
334 retry_delay_ms: Option<u64>,
335 retry_delay_max_ms: Option<u64>,
336 is_demo: bool,
337 proxy_url: Option<String>,
338 ) -> Result<Self, OKXHttpError> {
339 let retry_config = RetryConfig {
340 max_retries: max_retries.unwrap_or(3),
341 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
342 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
343 backoff_factor: 2.0,
344 jitter_ms: 1000,
345 operation_timeout_ms: Some(60_000),
346 immediate_first: false,
347 max_elapsed_ms: Some(180_000),
348 };
349
350 let retry_manager = RetryManager::new(retry_config);
351
352 Ok(Self {
353 base_url,
354 client: HttpClient::new(
355 Self::default_headers(is_demo),
356 vec![],
357 Self::rate_limiter_quotas(),
358 Some(*OKX_REST_QUOTA),
359 timeout_secs,
360 proxy_url,
361 )
362 .map_err(|e| {
363 OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
364 })?,
365 credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
366 retry_manager,
367 cancellation_token: CancellationToken::new(),
368 is_demo,
369 })
370 }
371
372 fn default_headers(is_demo: bool) -> HashMap<String, String> {
374 let mut headers =
375 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())]);
376
377 if is_demo {
378 headers.insert("x-simulated-trading".to_string(), "1".to_string());
379 }
380
381 headers
382 }
383
384 fn sign_request(
391 &self,
392 method: &Method,
393 path: &str,
394 body: Option<&[u8]>,
395 ) -> Result<HashMap<String, String>, OKXHttpError> {
396 let credential = match self.credential.as_ref() {
397 Some(c) => c,
398 None => return Err(OKXHttpError::MissingCredentials),
399 };
400
401 let api_key = credential.api_key.to_string();
402 let api_passphrase = credential.api_passphrase.clone();
403
404 let now = Utc::now();
406 let millis = now.timestamp_subsec_millis();
407 let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string() + &format!(".{millis:03}Z");
408 let signature = credential.sign_bytes(×tamp, method.as_str(), path, body);
409
410 let mut headers = HashMap::new();
411 headers.insert("OK-ACCESS-KEY".to_string(), api_key);
412 headers.insert("OK-ACCESS-PASSPHRASE".to_string(), api_passphrase);
413 headers.insert("OK-ACCESS-TIMESTAMP".to_string(), timestamp);
414 headers.insert("OK-ACCESS-SIGN".to_string(), signature);
415
416 Ok(headers)
417 }
418
419 async fn send_request<T: DeserializeOwned, P: Serialize>(
435 &self,
436 method: Method,
437 path: &str,
438 params: Option<&P>,
439 body: Option<Vec<u8>>,
440 authenticate: bool,
441 ) -> Result<Vec<T>, OKXHttpError> {
442 let url = format!("{}{path}", self.base_url);
443
444 let rate_keys: Vec<String> = Self::rate_limit_keys(path)
446 .into_iter()
447 .map(|k| k.to_string())
448 .collect();
449
450 let operation = || {
451 let url = url.clone();
452 let method = method.clone();
453 let body = body.clone();
454 let rate_keys = rate_keys.clone();
455
456 async move {
457 let query_string = if let Some(p) = params {
459 serde_urlencoded::to_string(p).map_err(|e| {
460 OKXHttpError::JsonError(format!("Failed to serialize params: {e}"))
461 })?
462 } else {
463 String::new()
464 };
465
466 let full_path = if query_string.is_empty() {
468 path.to_string()
469 } else {
470 format!("{path}?{query_string}")
471 };
472
473 let mut headers = if authenticate {
474 self.sign_request(&method, &full_path, body.as_deref())?
475 } else {
476 HashMap::new()
477 };
478
479 if body.is_some() {
481 headers.insert("Content-Type".to_string(), "application/json".to_string());
482 }
483
484 let resp = self
485 .client
486 .request_with_params(
487 method.clone(),
488 url,
489 params,
490 Some(headers),
491 body,
492 None,
493 Some(rate_keys),
494 )
495 .await?;
496
497 log::trace!("Response: {resp:?}");
498
499 if resp.status.is_success() {
500 let okx_response: OKXResponse<T> =
501 serde_json::from_slice(&resp.body).map_err(|e| {
502 log::error!("Failed to deserialize OKXResponse: {e}");
503 OKXHttpError::JsonError(e.to_string())
504 })?;
505
506 if okx_response.code != OKX_SUCCESS_CODE {
507 return Err(OKXHttpError::OkxError {
508 error_code: okx_response.code,
509 message: resolve_okx_error_message(&resp.body, &okx_response.msg),
510 });
511 }
512
513 Ok(okx_response.data)
514 } else {
515 let error_body = String::from_utf8_lossy(&resp.body);
516 if resp.status.as_u16() == StatusCode::NOT_FOUND.as_u16() {
517 log::debug!("HTTP 404 with body: {error_body}");
518 } else {
519 log::error!(
520 "HTTP error {} with body: {error_body}",
521 resp.status.as_str()
522 );
523 }
524
525 if let Ok(parsed_error) = serde_json::from_slice::<OKXResponse<T>>(&resp.body) {
526 return Err(OKXHttpError::OkxError {
527 error_code: parsed_error.code,
528 message: resolve_okx_error_message(&resp.body, &parsed_error.msg),
529 });
530 }
531
532 Err(OKXHttpError::UnexpectedStatus {
533 status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
534 body: error_body.to_string(),
535 })
536 }
537 }
538 };
539
540 let should_retry = |error: &OKXHttpError| -> bool {
549 match error {
550 OKXHttpError::HttpClientError(_) => true,
551 OKXHttpError::UnexpectedStatus { status, .. } => {
552 status.as_u16() >= 500 || status.as_u16() == 429
553 }
554 OKXHttpError::OkxError { error_code, .. } => should_retry_error_code(error_code),
555 _ => false,
556 }
557 };
558
559 let create_error = |msg: String| -> OKXHttpError {
560 if msg == "canceled" {
561 OKXHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
562 } else {
563 OKXHttpError::ValidationError(msg)
564 }
565 };
566
567 self.retry_manager
568 .execute_with_retry_with_cancel(
569 path,
570 operation,
571 should_retry,
572 create_error,
573 &self.cancellation_token,
574 )
575 .await
576 }
577
578 pub async fn set_position_mode(
589 &self,
590 params: SetPositionModeParams,
591 ) -> Result<Vec<serde_json::Value>, OKXHttpError> {
592 let path = "/api/v5/account/set-position-mode";
593 let body = serde_json::to_vec(¶ms)?;
594 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
595 .await
596 }
597
598 pub async fn get_position_tiers(
609 &self,
610 params: GetPositionTiersParams,
611 ) -> Result<Vec<OKXPositionTier>, OKXHttpError> {
612 self.send_request(
613 Method::GET,
614 "/api/v5/public/position-tiers",
615 Some(¶ms),
616 None,
617 false,
618 )
619 .await
620 }
621
622 pub async fn get_instruments(
633 &self,
634 params: GetInstrumentsParams,
635 ) -> Result<Vec<OKXInstrument>, OKXHttpError> {
636 self.send_request(
637 Method::GET,
638 "/api/v5/public/instruments",
639 Some(¶ms),
640 None,
641 false,
642 )
643 .await
644 }
645
646 pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
660 let response: Vec<OKXServerTime> = self
661 .send_request::<_, ()>(Method::GET, "/api/v5/public/time", None, None, false)
662 .await?;
663 response
664 .first()
665 .map(|t| t.ts)
666 .ok_or_else(|| OKXHttpError::JsonError("Empty server time response".to_string()))
667 }
668
669 pub async fn get_mark_price(
683 &self,
684 params: GetMarkPriceParams,
685 ) -> Result<Vec<OKXMarkPrice>, OKXHttpError> {
686 self.send_request(
687 Method::GET,
688 "/api/v5/public/mark-price",
689 Some(¶ms),
690 None,
691 false,
692 )
693 .await
694 }
695
696 pub async fn get_index_tickers(
706 &self,
707 params: GetIndexTickerParams,
708 ) -> Result<Vec<OKXIndexTicker>, OKXHttpError> {
709 self.send_request(
710 Method::GET,
711 "/api/v5/market/index-tickers",
712 Some(¶ms),
713 None,
714 false,
715 )
716 .await
717 }
718
719 pub async fn get_history_trades(
729 &self,
730 params: GetTradesParams,
731 ) -> Result<Vec<OKXTrade>, OKXHttpError> {
732 self.send_request(
733 Method::GET,
734 "/api/v5/market/history-trades",
735 Some(¶ms),
736 None,
737 false,
738 )
739 .await
740 }
741
742 pub async fn get_candles(
752 &self,
753 params: GetCandlesticksParams,
754 ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
755 self.send_request(
756 Method::GET,
757 "/api/v5/market/candles",
758 Some(¶ms),
759 None,
760 false,
761 )
762 .await
763 }
764
765 pub async fn get_history_candles(
775 &self,
776 params: GetCandlesticksParams,
777 ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
778 self.send_request(
779 Method::GET,
780 "/api/v5/market/history-candles",
781 Some(¶ms),
782 None,
783 false,
784 )
785 .await
786 }
787
788 pub async fn get_balance(&self) -> Result<Vec<OKXAccount>, OKXHttpError> {
799 let path = "/api/v5/account/balance";
800 self.send_request::<_, ()>(Method::GET, path, None, None, true)
801 .await
802 }
803
804 pub async fn get_trade_fee(
816 &self,
817 params: GetTradeFeeParams,
818 ) -> Result<Vec<OKXFeeRate>, OKXHttpError> {
819 self.send_request(
820 Method::GET,
821 "/api/v5/account/trade-fee",
822 Some(¶ms),
823 None,
824 true,
825 )
826 .await
827 }
828
829 pub async fn get_order(
839 &self,
840 params: GetOrderParams,
841 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
842 self.send_request(
843 Method::GET,
844 "/api/v5/trade/order",
845 Some(¶ms),
846 None,
847 true,
848 )
849 .await
850 }
851
852 pub async fn get_orders_pending(
862 &self,
863 params: GetOrderListParams,
864 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
865 self.send_request(
866 Method::GET,
867 "/api/v5/trade/orders-pending",
868 Some(¶ms),
869 None,
870 true,
871 )
872 .await
873 }
874
875 pub async fn get_orders_history(
885 &self,
886 params: GetOrderHistoryParams,
887 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
888 self.send_request(
889 Method::GET,
890 "/api/v5/trade/orders-history",
891 Some(¶ms),
892 None,
893 true,
894 )
895 .await
896 }
897
898 pub async fn get_order_algo_pending(
904 &self,
905 params: GetAlgoOrdersParams,
906 ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
907 self.send_request(
908 Method::GET,
909 "/api/v5/trade/order-algo-pending",
910 Some(¶ms),
911 None,
912 true,
913 )
914 .await
915 }
916
917 pub async fn get_order_algo_history(
923 &self,
924 params: GetAlgoOrdersParams,
925 ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
926 self.send_request(
927 Method::GET,
928 "/api/v5/trade/order-algo-history",
929 Some(¶ms),
930 None,
931 true,
932 )
933 .await
934 }
935
936 pub async fn get_fills(
946 &self,
947 params: GetTransactionDetailsParams,
948 ) -> Result<Vec<OKXTransactionDetail>, OKXHttpError> {
949 self.send_request(
950 Method::GET,
951 "/api/v5/trade/fills",
952 Some(¶ms),
953 None,
954 true,
955 )
956 .await
957 }
958
959 pub async fn get_positions(
971 &self,
972 params: GetPositionsParams,
973 ) -> Result<Vec<OKXPosition>, OKXHttpError> {
974 self.send_request(
975 Method::GET,
976 "/api/v5/account/positions",
977 Some(¶ms),
978 None,
979 true,
980 )
981 .await
982 }
983
984 pub async fn get_positions_history(
994 &self,
995 params: GetPositionsHistoryParams,
996 ) -> Result<Vec<OKXPositionHistory>, OKXHttpError> {
997 self.send_request(
998 Method::GET,
999 "/api/v5/account/positions-history",
1000 Some(¶ms),
1001 None,
1002 true,
1003 )
1004 .await
1005 }
1006}
1007
1008#[derive(Debug)]
1013#[cfg_attr(
1014 feature = "python",
1015 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.okx")
1016)]
1017pub struct OKXHttpClient {
1018 pub(crate) inner: Arc<OKXRawHttpClient>,
1019 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
1020 cache_initialized: AtomicBool,
1021}
1022
1023impl Clone for OKXHttpClient {
1024 fn clone(&self) -> Self {
1025 let cache_initialized = AtomicBool::new(false);
1026
1027 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
1028 if is_initialized {
1029 cache_initialized.store(true, Ordering::Release);
1030 }
1031
1032 Self {
1033 inner: self.inner.clone(),
1034 instruments_cache: self.instruments_cache.clone(),
1035 cache_initialized,
1036 }
1037 }
1038}
1039
1040impl Default for OKXHttpClient {
1041 fn default() -> Self {
1042 Self::new(None, Some(60), None, None, None, false, None)
1043 .expect("Failed to create default OKXHttpClient")
1044 }
1045}
1046
1047impl OKXHttpClient {
1048 pub fn new(
1058 base_url: Option<String>,
1059 timeout_secs: Option<u64>,
1060 max_retries: Option<u32>,
1061 retry_delay_ms: Option<u64>,
1062 retry_delay_max_ms: Option<u64>,
1063 is_demo: bool,
1064 proxy_url: Option<String>,
1065 ) -> anyhow::Result<Self> {
1066 Ok(Self {
1067 inner: Arc::new(OKXRawHttpClient::new(
1068 base_url,
1069 timeout_secs,
1070 max_retries,
1071 retry_delay_ms,
1072 retry_delay_max_ms,
1073 is_demo,
1074 proxy_url,
1075 )?),
1076 instruments_cache: Arc::new(DashMap::new()),
1077 cache_initialized: AtomicBool::new(false),
1078 })
1079 }
1080
1081 fn generate_ts_init(&self) -> UnixNanos {
1083 get_atomic_clock_realtime().get_time_ns()
1084 }
1085
1086 pub fn from_env() -> anyhow::Result<Self> {
1093 Self::with_credentials(None, None, None, None, None, None, None, None, false, None)
1094 }
1095
1096 #[allow(clippy::too_many_arguments)]
1103 pub fn with_credentials(
1104 api_key: Option<String>,
1105 api_secret: Option<String>,
1106 api_passphrase: Option<String>,
1107 base_url: Option<String>,
1108 timeout_secs: Option<u64>,
1109 max_retries: Option<u32>,
1110 retry_delay_ms: Option<u64>,
1111 retry_delay_max_ms: Option<u64>,
1112 is_demo: bool,
1113 proxy_url: Option<String>,
1114 ) -> anyhow::Result<Self> {
1115 let api_key = get_or_env_var(api_key, "OKX_API_KEY")?;
1116 let api_secret = get_or_env_var(api_secret, "OKX_API_SECRET")?;
1117 let api_passphrase = get_or_env_var(api_passphrase, "OKX_API_PASSPHRASE")?;
1118 let base_url = base_url.unwrap_or(OKX_HTTP_URL.to_string());
1119
1120 Ok(Self {
1121 inner: Arc::new(OKXRawHttpClient::with_credentials(
1122 api_key,
1123 api_secret,
1124 api_passphrase,
1125 base_url,
1126 timeout_secs,
1127 max_retries,
1128 retry_delay_ms,
1129 retry_delay_max_ms,
1130 is_demo,
1131 proxy_url,
1132 )?),
1133 instruments_cache: Arc::new(DashMap::new()),
1134 cache_initialized: AtomicBool::new(false),
1135 })
1136 }
1137
1138 fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1144 self.instruments_cache
1145 .get(&symbol)
1146 .map(|entry| entry.value().clone())
1147 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
1148 }
1149
1150 pub fn cancel_all_requests(&self) {
1152 self.inner.cancel_all_requests();
1153 }
1154
1155 pub fn cancellation_token(&self) -> &CancellationToken {
1157 self.inner.cancellation_token()
1158 }
1159
1160 pub fn base_url(&self) -> &str {
1162 self.inner.base_url.as_str()
1163 }
1164
1165 pub fn api_key(&self) -> Option<&str> {
1167 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
1168 }
1169
1170 #[must_use]
1172 pub fn api_key_masked(&self) -> Option<String> {
1173 self.inner.credential.as_ref().map(|c| c.api_key_masked())
1174 }
1175
1176 #[must_use]
1178 pub fn is_demo(&self) -> bool {
1179 self.inner.is_demo
1180 }
1181
1182 pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
1190 self.inner.get_server_time().await
1191 }
1192
1193 #[must_use]
1197 pub fn is_initialized(&self) -> bool {
1198 self.cache_initialized.load(Ordering::Acquire)
1199 }
1200
1201 #[must_use]
1204 pub fn get_cached_symbols(&self) -> Vec<String> {
1205 self.instruments_cache
1206 .iter()
1207 .map(|entry| entry.key().to_string())
1208 .collect()
1209 }
1210
1211 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1215 for inst in instruments {
1216 self.instruments_cache
1217 .insert(inst.raw_symbol().inner(), inst);
1218 }
1219 self.cache_initialized.store(true, Ordering::Release);
1220 }
1221
1222 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1226 self.instruments_cache
1227 .insert(instrument.raw_symbol().inner(), instrument);
1228 self.cache_initialized.store(true, Ordering::Release);
1229 }
1230
1231 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1233 self.instruments_cache
1234 .get(symbol)
1235 .map(|entry| entry.value().clone())
1236 }
1237
1238 pub async fn request_account_state(
1244 &self,
1245 account_id: AccountId,
1246 ) -> anyhow::Result<AccountState> {
1247 let resp = self
1248 .inner
1249 .get_balance()
1250 .await
1251 .map_err(|e| anyhow::anyhow!(e))?;
1252
1253 let ts_init = self.generate_ts_init();
1254 let raw = resp
1255 .first()
1256 .ok_or_else(|| anyhow::anyhow!("No account state returned from OKX"))?;
1257 let account_state = parse_account_state(raw, account_id, ts_init)?;
1258
1259 Ok(account_state)
1260 }
1261
1262 pub async fn set_position_mode(&self, position_mode: OKXPositionMode) -> anyhow::Result<()> {
1275 let mut params = SetPositionModeParamsBuilder::default();
1276 params.pos_mode(position_mode);
1277 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1278
1279 match self.inner.set_position_mode(params).await {
1280 Ok(_) => Ok(()),
1281 Err(e) => {
1282 if let OKXHttpError::OkxError {
1283 error_code,
1284 message,
1285 } = &e
1286 && error_code == "50115"
1287 {
1288 log::warn!(
1289 "Account does not support position mode setting (derivatives trading not enabled): {message}"
1290 );
1291 return Ok(()); }
1293 anyhow::bail!(e)
1294 }
1295 }
1296 }
1297
1298 pub async fn request_instruments(
1310 &self,
1311 instrument_type: OKXInstrumentType,
1312 instrument_family: Option<String>,
1313 ) -> anyhow::Result<(Vec<InstrumentAny>, Vec<(Ustr, u64)>)> {
1314 let mut params = GetInstrumentsParamsBuilder::default();
1315 params.inst_type(instrument_type);
1316
1317 if let Some(family) = instrument_family.clone() {
1318 params.inst_family(family);
1319 }
1320
1321 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1322
1323 let resp = self
1324 .inner
1325 .get_instruments(params)
1326 .await
1327 .map_err(|e| anyhow::anyhow!(e))?;
1328
1329 let fee_rate_opt = {
1330 let fee_params = GetTradeFeeParams {
1331 inst_type: instrument_type,
1332 uly: None,
1333 inst_family: instrument_family,
1334 };
1335
1336 match self.inner.get_trade_fee(fee_params).await {
1337 Ok(rates) => rates.into_iter().next(),
1338 Err(OKXHttpError::MissingCredentials) => {
1339 log::debug!("Missing credentials for fee rates, using None");
1340 None
1341 }
1342 Err(e) => {
1343 log::warn!("Failed to fetch fee rates for {instrument_type}: {e}");
1344 None
1345 }
1346 }
1347 };
1348
1349 let ts_init = self.generate_ts_init();
1350
1351 let mut instruments: Vec<InstrumentAny> = Vec::new();
1352 let mut inst_id_codes: Vec<(Ustr, u64)> = Vec::new();
1353
1354 for inst in &resp {
1355 if let Some(code) = inst.inst_id_code {
1357 inst_id_codes.push((inst.inst_id, code));
1358 }
1359 if inst.state == OKXInstrumentStatus::Preopen {
1362 continue;
1363 }
1364
1365 let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1370 let is_usdt_margined = inst.ct_type == OKXContractType::Linear;
1371 let (maker_str, taker_str) = if is_usdt_margined {
1372 (&fee_rate.maker_u, &fee_rate.taker_u)
1373 } else {
1374 (&fee_rate.maker, &fee_rate.taker)
1375 };
1376
1377 let maker = if maker_str.is_empty() {
1378 None
1379 } else {
1380 Decimal::from_str(maker_str).ok().map(|v| -v)
1381 };
1382 let taker = if taker_str.is_empty() {
1383 None
1384 } else {
1385 Decimal::from_str(taker_str).ok().map(|v| -v)
1386 };
1387
1388 (maker, taker)
1389 } else {
1390 (None, None)
1391 };
1392
1393 match parse_instrument_any(inst, None, None, maker_fee, taker_fee, ts_init) {
1394 Ok(Some(instrument_any)) => {
1395 instruments.push(instrument_any);
1396 }
1397 Ok(None) => {
1398 }
1400 Err(e) => {
1401 log::warn!("Failed to parse instrument {}: {e}", inst.inst_id);
1402 }
1403 }
1404 }
1405
1406 Ok((instruments, inst_id_codes))
1407 }
1408
1409 pub async fn request_instrument(
1420 &self,
1421 instrument_id: InstrumentId,
1422 ) -> anyhow::Result<InstrumentAny> {
1423 let symbol = instrument_id.symbol.as_str();
1424 let instrument_type = okx_instrument_type_from_symbol(symbol);
1425
1426 let mut params = GetInstrumentsParamsBuilder::default();
1427 params.inst_type(instrument_type);
1428 params.inst_id(symbol);
1429
1430 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1431
1432 let resp = self
1433 .inner
1434 .get_instruments(params)
1435 .await
1436 .map_err(|e| anyhow::anyhow!(e))?;
1437
1438 let raw_inst = resp
1439 .first()
1440 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found"))?;
1441
1442 if raw_inst.state == OKXInstrumentStatus::Preopen {
1444 anyhow::bail!("Instrument {symbol} is in pre-open state");
1445 }
1446
1447 let fee_rate_opt = {
1448 let fee_params = GetTradeFeeParams {
1449 inst_type: instrument_type,
1450 uly: None,
1451 inst_family: None,
1452 };
1453
1454 match self.inner.get_trade_fee(fee_params).await {
1455 Ok(rates) => rates.into_iter().next(),
1456 Err(OKXHttpError::MissingCredentials) => {
1457 log::debug!("Missing credentials for fee rates, using None");
1458 None
1459 }
1460 Err(e) => {
1461 log::warn!("Failed to fetch fee rates for {symbol}: {e}");
1462 None
1463 }
1464 }
1465 };
1466
1467 let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1471 let is_usdt_margined = raw_inst.ct_type == OKXContractType::Linear;
1472 let (maker_str, taker_str) = if is_usdt_margined {
1473 (&fee_rate.maker_u, &fee_rate.taker_u)
1474 } else {
1475 (&fee_rate.maker, &fee_rate.taker)
1476 };
1477
1478 let maker = if maker_str.is_empty() {
1479 None
1480 } else {
1481 Decimal::from_str(maker_str).ok().map(|v| -v)
1482 };
1483 let taker = if taker_str.is_empty() {
1484 None
1485 } else {
1486 Decimal::from_str(taker_str).ok().map(|v| -v)
1487 };
1488
1489 (maker, taker)
1490 } else {
1491 (None, None)
1492 };
1493
1494 let ts_init = self.generate_ts_init();
1495 let instrument = parse_instrument_any(raw_inst, None, None, maker_fee, taker_fee, ts_init)?
1496 .ok_or_else(|| anyhow::anyhow!("Unsupported instrument type for {symbol}"))?;
1497
1498 self.cache_instrument(instrument.clone());
1499
1500 Ok(instrument)
1501 }
1502
1503 pub async fn request_mark_price(
1509 &self,
1510 instrument_id: InstrumentId,
1511 ) -> anyhow::Result<MarkPriceUpdate> {
1512 let mut params = GetMarkPriceParamsBuilder::default();
1513 params.inst_id(instrument_id.symbol.inner());
1514 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1515
1516 let resp = self
1517 .inner
1518 .get_mark_price(params)
1519 .await
1520 .map_err(|e| anyhow::anyhow!(e))?;
1521
1522 let raw = resp
1523 .first()
1524 .ok_or_else(|| anyhow::anyhow!("No mark price returned from OKX"))?;
1525 let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1526 let ts_init = self.generate_ts_init();
1527
1528 let mark_price =
1529 parse_mark_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1530 .map_err(|e| anyhow::anyhow!(e))?;
1531 Ok(mark_price)
1532 }
1533
1534 pub async fn request_index_price(
1540 &self,
1541 instrument_id: InstrumentId,
1542 ) -> anyhow::Result<IndexPriceUpdate> {
1543 let mut params = GetIndexTickerParamsBuilder::default();
1544 params.inst_id(instrument_id.symbol.inner());
1545 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1546
1547 let resp = self
1548 .inner
1549 .get_index_tickers(params)
1550 .await
1551 .map_err(|e| anyhow::anyhow!(e))?;
1552
1553 let raw = resp
1554 .first()
1555 .ok_or_else(|| anyhow::anyhow!("No index price returned from OKX"))?;
1556 let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1557 let ts_init = self.generate_ts_init();
1558
1559 let index_price =
1560 parse_index_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1561 .map_err(|e| anyhow::anyhow!(e))?;
1562 Ok(index_price)
1563 }
1564
1565 pub async fn request_trades(
1575 &self,
1576 instrument_id: InstrumentId,
1577 start: Option<DateTime<Utc>>,
1578 end: Option<DateTime<Utc>>,
1579 limit: Option<u32>,
1580 ) -> anyhow::Result<Vec<TradeTick>> {
1581 const OKX_TRADES_MAX_LIMIT: u32 = 100;
1582 const MAX_PAGES: usize = 500;
1583 const MAX_CONSECUTIVE_EMPTY: usize = 3;
1584
1585 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1586 enum Mode {
1587 Latest,
1588 Backward,
1589 Range,
1590 }
1591
1592 let limit = if limit == Some(0) { None } else { limit };
1593
1594 if let (Some(s), Some(e)) = (start, end) {
1595 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1596 }
1597
1598 let now = Utc::now();
1599
1600 if let Some(s) = start
1601 && s > now
1602 {
1603 return Ok(Vec::new());
1604 }
1605
1606 let end = if let Some(e) = end
1607 && e > now
1608 {
1609 Some(now)
1610 } else {
1611 end
1612 };
1613
1614 let mode = match (start, end) {
1615 (None, None) => Mode::Latest,
1616 (Some(_), None) => Mode::Backward,
1617 (None, Some(_)) => Mode::Backward,
1618 (Some(_), Some(_)) => Mode::Range,
1619 };
1620
1621 let start_ms = start.map(|s| s.timestamp_millis());
1622 let end_ms = end.map(|e| e.timestamp_millis());
1623
1624 let ts_init = self.generate_ts_init();
1625 let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1626
1627 if matches!(mode, Mode::Backward | Mode::Range) {
1630 let mut before_trade_id: Option<String> = None;
1631 let mut pages = 0usize;
1632 let mut page_results: Vec<Vec<TradeTick>> = Vec::new();
1633 let mut seen_trades: std::collections::HashSet<(String, i64)> =
1634 std::collections::HashSet::new();
1635 let mut unique_count = 0usize;
1636 let mut consecutive_empty_pages = 0usize;
1637
1638 let effective_limit = if start.is_some() {
1641 limit.unwrap_or(u32::MAX)
1642 } else {
1643 limit.unwrap_or(OKX_TRADES_MAX_LIMIT)
1644 };
1645
1646 log::debug!(
1647 "Starting trades pagination: mode={mode:?}, start={start:?}, end={end:?}, limit={limit:?}, effective_limit={effective_limit}"
1648 );
1649
1650 loop {
1651 if pages >= MAX_PAGES {
1652 log::warn!("Hit MAX_PAGES limit of {MAX_PAGES}");
1653 break;
1654 }
1655
1656 if effective_limit < u32::MAX && unique_count >= effective_limit as usize {
1657 log::debug!("Reached effective limit: unique_count={unique_count}");
1658 break;
1659 }
1660
1661 let remaining = (effective_limit as usize).saturating_sub(unique_count);
1662 let page_cap = remaining.min(OKX_TRADES_MAX_LIMIT as usize) as u32;
1663
1664 log::debug!(
1665 "Requesting page {}: before_id={:?}, page_cap={}, unique_count={}",
1666 pages + 1,
1667 before_trade_id,
1668 page_cap,
1669 unique_count
1670 );
1671
1672 let mut params_builder = GetTradesParamsBuilder::default();
1673 params_builder
1674 .inst_id(instrument_id.symbol.inner())
1675 .limit(page_cap)
1676 .pagination_type(1);
1677
1678 if let Some(ref before_id) = before_trade_id {
1680 params_builder.after(before_id.clone());
1681 }
1682
1683 let params = params_builder.build().map_err(anyhow::Error::new)?;
1684 let raw = self
1685 .inner
1686 .get_history_trades(params)
1687 .await
1688 .map_err(anyhow::Error::new)?;
1689
1690 log::debug!("Received {} raw trades from API", raw.len());
1691
1692 if !raw.is_empty() {
1693 let first_id = &raw.first().unwrap().trade_id;
1694 let last_id = &raw.last().unwrap().trade_id;
1695 log::debug!(
1696 "Raw response trade ID range: first={first_id} (newest), last={last_id} (oldest)"
1697 );
1698 }
1699
1700 if raw.is_empty() {
1701 log::debug!("API returned empty page, stopping pagination");
1702 break;
1703 }
1704
1705 pages += 1;
1706
1707 let mut page_trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
1708 let mut hit_start_boundary = false;
1709 let mut filtered_out = 0usize;
1710 let mut duplicates = 0usize;
1711
1712 for r in &raw {
1713 match parse_trade_tick(
1714 r,
1715 instrument_id,
1716 inst.price_precision(),
1717 inst.size_precision(),
1718 ts_init,
1719 ) {
1720 Ok(trade) => {
1721 let ts_ms = trade.ts_event.as_i64() / 1_000_000;
1722
1723 if let Some(e_ms) = end_ms
1724 && ts_ms > e_ms
1725 {
1726 filtered_out += 1;
1727 continue;
1728 }
1729
1730 if let Some(s_ms) = start_ms
1731 && ts_ms < s_ms
1732 {
1733 hit_start_boundary = true;
1734 filtered_out += 1;
1735 break;
1736 }
1737
1738 let trade_key = (trade.trade_id.to_string(), trade.ts_event.as_i64());
1739 if seen_trades.insert(trade_key) {
1740 unique_count += 1;
1741 page_trades.push(trade);
1742 } else {
1743 duplicates += 1;
1744 }
1745 }
1746 Err(e) => log::error!("{e}"),
1747 }
1748 }
1749
1750 log::debug!(
1751 "Page {} processed: {} trades kept, {} filtered out, {} duplicates, hit_start_boundary={}",
1752 pages,
1753 page_trades.len(),
1754 filtered_out,
1755 duplicates,
1756 hit_start_boundary
1757 );
1758
1759 let oldest_trade_id = if page_trades.is_empty() {
1761 if unique_count > 0 {
1764 consecutive_empty_pages += 1;
1765 if consecutive_empty_pages >= MAX_CONSECUTIVE_EMPTY {
1766 log::debug!(
1767 "Stopping: {consecutive_empty_pages} consecutive pages with no trades in range after collecting {unique_count} trades"
1768 );
1769 break;
1770 }
1771 }
1772 raw.last().map(|t| {
1774 let id = t.trade_id.to_string();
1775 log::debug!(
1776 "Setting cursor from raw response (no unique trades): oldest_id={id}"
1777 );
1778 id
1779 })
1780 } else {
1781 let oldest_id = page_trades.last().map(|t| {
1783 let id = t.trade_id.to_string();
1784 log::debug!(
1785 "Setting cursor from deduplicated trades: oldest_id={}, ts_event={}",
1786 id,
1787 t.ts_event.as_i64()
1788 );
1789 id
1790 });
1791 page_trades.reverse();
1792 page_results.push(page_trades);
1793 consecutive_empty_pages = 0;
1794 oldest_id
1795 };
1796
1797 if let Some(ref old_id) = before_trade_id
1798 && oldest_trade_id.as_ref() == Some(old_id)
1799 {
1800 break;
1801 }
1802
1803 if oldest_trade_id.is_none() {
1804 break;
1805 }
1806
1807 before_trade_id = oldest_trade_id;
1808
1809 if hit_start_boundary {
1810 break;
1811 }
1812
1813 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1814 }
1815
1816 log::debug!(
1817 "Pagination complete: {pages} pages, {unique_count} unique trades collected"
1818 );
1819
1820 let mut out: Vec<TradeTick> = Vec::new();
1821 for page in page_results.into_iter().rev() {
1822 out.extend(page);
1823 }
1824
1825 let mut dedup_keys = std::collections::HashSet::new();
1827 let pre_dedup_len = out.len();
1828 out.retain(|trade| {
1829 dedup_keys.insert((trade.trade_id.to_string(), trade.ts_event.as_i64()))
1830 });
1831
1832 if out.len() < pre_dedup_len {
1833 log::debug!(
1834 "Removed {} duplicate trades during final dedup",
1835 pre_dedup_len - out.len()
1836 );
1837 }
1838
1839 if let Some(lim) = limit
1840 && lim > 0
1841 && out.len() > lim as usize
1842 {
1843 let excess = out.len() - lim as usize;
1844 log::debug!("Trimming {excess} oldest trades to respect limit={lim}");
1845 out.drain(0..excess);
1846 }
1847
1848 log::debug!("Returning {} trades", out.len());
1849 return Ok(out);
1850 }
1851
1852 let req_limit = limit
1853 .unwrap_or(OKX_TRADES_MAX_LIMIT)
1854 .min(OKX_TRADES_MAX_LIMIT);
1855 let params = GetTradesParamsBuilder::default()
1856 .inst_id(instrument_id.symbol.inner())
1857 .limit(req_limit)
1858 .build()
1859 .map_err(anyhow::Error::new)?;
1860
1861 let raw = self
1862 .inner
1863 .get_history_trades(params)
1864 .await
1865 .map_err(anyhow::Error::new)?;
1866
1867 let mut trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
1868 for r in &raw {
1869 match parse_trade_tick(
1870 r,
1871 instrument_id,
1872 inst.price_precision(),
1873 inst.size_precision(),
1874 ts_init,
1875 ) {
1876 Ok(trade) => trades.push(trade),
1877 Err(e) => log::error!("{e}"),
1878 }
1879 }
1880
1881 trades.reverse();
1883
1884 if let Some(lim) = limit
1885 && lim > 0
1886 && trades.len() > lim as usize
1887 {
1888 trades.drain(0..trades.len() - lim as usize);
1889 }
1890
1891 Ok(trades)
1892 }
1893
1894 pub async fn request_bars(
1939 &self,
1940 bar_type: BarType,
1941 start: Option<DateTime<Utc>>,
1942 mut end: Option<DateTime<Utc>>,
1943 limit: Option<u32>,
1944 ) -> anyhow::Result<Vec<Bar>> {
1945 const HISTORY_SPLIT_DAYS: i64 = 100;
1946 const MAX_PAGES_SOFT: usize = 500;
1947
1948 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1949 enum Mode {
1950 Latest,
1951 Backward,
1952 Range,
1953 }
1954
1955 let limit = if limit == Some(0) { None } else { limit };
1956
1957 anyhow::ensure!(
1958 bar_type.aggregation_source() == AggregationSource::External,
1959 "Only EXTERNAL aggregation is supported"
1960 );
1961
1962 if let (Some(s), Some(e)) = (start, end) {
1963 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1964 }
1965
1966 let now = Utc::now();
1967
1968 if let Some(s) = start
1969 && s > now
1970 {
1971 return Ok(Vec::new());
1972 }
1973 if let Some(e) = end
1974 && e > now
1975 {
1976 end = Some(now);
1977 }
1978
1979 let spec = bar_type.spec();
1980 let step = spec.step.get();
1981 let bar_param = match spec.aggregation {
1982 BarAggregation::Second => format!("{step}s"),
1983 BarAggregation::Minute => format!("{step}m"),
1984 BarAggregation::Hour => format!("{step}H"),
1985 BarAggregation::Day => format!("{step}D"),
1986 BarAggregation::Week => format!("{step}W"),
1987 BarAggregation::Month => format!("{step}M"),
1988 a => anyhow::bail!("OKX does not support {a:?} aggregation"),
1989 };
1990
1991 let slot_ms: i64 = match spec.aggregation {
1992 BarAggregation::Second => (step as i64) * 1_000,
1993 BarAggregation::Minute => (step as i64) * 60_000,
1994 BarAggregation::Hour => (step as i64) * 3_600_000,
1995 BarAggregation::Day => (step as i64) * 86_400_000,
1996 BarAggregation::Week => (step as i64) * 7 * 86_400_000,
1997 BarAggregation::Month => (step as i64) * 30 * 86_400_000,
1998 _ => unreachable!("Unsupported aggregation should have been caught above"),
1999 };
2000 let slot_ns: i64 = slot_ms * 1_000_000;
2001
2002 let mode = match (start, end) {
2003 (None, None) => Mode::Latest,
2004 (Some(_), None) => Mode::Backward, (None, Some(_)) => Mode::Backward,
2006 (Some(_), Some(_)) => Mode::Range,
2007 };
2008
2009 let start_ns = start.and_then(|s| s.timestamp_nanos_opt());
2010 let end_ns = end.and_then(|e| e.timestamp_nanos_opt());
2011
2012 let start_ms = start.map(|s| {
2014 let ms = s.timestamp_millis();
2015 if slot_ms > 0 {
2016 (ms / slot_ms) * slot_ms } else {
2018 ms
2019 }
2020 });
2021 let end_ms = end.map(|e| {
2022 let ms = e.timestamp_millis();
2023 if slot_ms > 0 {
2024 ((ms + slot_ms - 1) / slot_ms) * slot_ms } else {
2026 ms
2027 }
2028 });
2029 let now_ms = now.timestamp_millis();
2030
2031 let symbol = bar_type.instrument_id().symbol;
2032 let inst = self.instrument_from_cache(symbol.inner())?;
2033
2034 let mut out: Vec<Bar> = Vec::new();
2035 let mut pages = 0usize;
2036
2037 let mut after_ms: Option<i64> = match mode {
2042 Mode::Range => end_ms.or(Some(now_ms)), _ => None,
2044 };
2045 let mut before_ms: Option<i64> = match mode {
2046 Mode::Backward => end_ms.map(|v| v.saturating_sub(1)),
2047 Mode::Range => start_ms, Mode::Latest => None,
2049 };
2050
2051 let mut forward_prepend_mode = matches!(mode, Mode::Range);
2053
2054 if matches!(mode, Mode::Backward | Mode::Range)
2058 && let Some(b) = before_ms
2059 {
2060 let buffer_ms = slot_ms.max(60_000); if b >= now_ms.saturating_sub(buffer_ms) {
2066 before_ms = Some(now_ms.saturating_sub(buffer_ms));
2067 }
2068 }
2069
2070 let mut have_latest_first_page = false;
2071 let mut progressless_loops = 0u8;
2072
2073 loop {
2074 if let Some(lim) = limit
2075 && lim > 0
2076 && out.len() >= lim as usize
2077 {
2078 break;
2079 }
2080 if pages >= MAX_PAGES_SOFT {
2081 break;
2082 }
2083
2084 let pivot_ms = if let Some(a) = after_ms {
2085 a
2086 } else if let Some(b) = before_ms {
2087 b
2088 } else {
2089 now_ms
2090 };
2091 let age_ms = now_ms.saturating_sub(pivot_ms);
2097 let age_hours = age_ms / (60 * 60 * 1000);
2098 let using_history = age_hours > 1; let page_ceiling = if using_history { 100 } else { 300 };
2101 let remaining = limit
2102 .filter(|&l| l > 0) .map_or(page_ceiling, |l| (l as usize).saturating_sub(out.len()));
2104 let page_cap = remaining.min(page_ceiling);
2105
2106 let mut p = GetCandlesticksParamsBuilder::default();
2107 p.inst_id(symbol.as_str())
2108 .bar(&bar_param)
2109 .limit(page_cap as u32);
2110
2111 let mut req_used_before = false;
2113
2114 match mode {
2115 Mode::Latest => {
2116 if have_latest_first_page && let Some(b) = before_ms {
2117 p.before_ms(b);
2118 req_used_before = true;
2119 }
2120 }
2121 Mode::Backward => {
2122 if let Some(b) = before_ms {
2124 p.after_ms(b);
2125 }
2126 }
2127 Mode::Range => {
2128 if let Some(a) = after_ms {
2131 p.after_ms(a);
2132 }
2133 if let Some(b) = before_ms {
2134 p.before_ms(b);
2135 req_used_before = true;
2136 }
2137 }
2138 }
2139
2140 let params = p.build().map_err(anyhow::Error::new)?;
2141
2142 let mut raw = if using_history {
2143 self.inner
2144 .get_history_candles(params.clone())
2145 .await
2146 .map_err(anyhow::Error::new)?
2147 } else {
2148 self.inner
2149 .get_candles(params.clone())
2150 .await
2151 .map_err(anyhow::Error::new)?
2152 };
2153
2154 if raw.is_empty() {
2156 if matches!(mode, Mode::Latest)
2158 && have_latest_first_page
2159 && !using_history
2160 && let Some(b) = before_ms
2161 {
2162 let mut p2 = GetCandlesticksParamsBuilder::default();
2163 p2.inst_id(symbol.as_str())
2164 .bar(&bar_param)
2165 .limit(page_cap as u32);
2166 p2.before_ms(b);
2167 let params2 = p2.build().map_err(anyhow::Error::new)?;
2168 let raw2 = self
2169 .inner
2170 .get_history_candles(params2)
2171 .await
2172 .map_err(anyhow::Error::new)?;
2173 if raw2.is_empty() {
2174 let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2176 before_ms = Some(b.saturating_sub(jump));
2177 progressless_loops = progressless_loops.saturating_add(1);
2178 if progressless_loops >= 3 {
2179 break;
2180 }
2181 continue;
2182 } else {
2183 raw = raw2;
2184 }
2185 }
2186
2187 if raw.is_empty() && matches!(mode, Mode::Range) && pages > 0 {
2191 let backstep_ms = (page_cap as i64).saturating_mul(slot_ms.max(1));
2192 let pivot_back = after_ms.unwrap_or(now_ms).saturating_sub(backstep_ms);
2193
2194 let mut p2 = GetCandlesticksParamsBuilder::default();
2195 p2.inst_id(symbol.as_str())
2196 .bar(&bar_param)
2197 .limit(page_cap as u32)
2198 .before_ms(pivot_back);
2199 let params2 = p2.build().map_err(anyhow::Error::new)?;
2200 let raw2 = if (now_ms.saturating_sub(pivot_back)) / (24 * 60 * 60 * 1000)
2201 > HISTORY_SPLIT_DAYS
2202 {
2203 self.inner.get_history_candles(params2).await
2204 } else {
2205 self.inner.get_candles(params2).await
2206 }
2207 .map_err(anyhow::Error::new)?;
2208 if raw2.is_empty() {
2209 break;
2210 } else {
2211 raw = raw2;
2212 forward_prepend_mode = true;
2213 req_used_before = true;
2214 }
2215 }
2216
2217 if raw.is_empty()
2219 && matches!(mode, Mode::Latest)
2220 && !have_latest_first_page
2221 && !using_history
2222 {
2223 let jump_days_ms = (HISTORY_SPLIT_DAYS + 1) * 86_400_000;
2224 before_ms = Some(now_ms.saturating_sub(jump_days_ms));
2225 have_latest_first_page = true;
2226 continue;
2227 }
2228
2229 if raw.is_empty() {
2231 break;
2232 }
2233 }
2234 pages += 1;
2237
2238 let ts_init = self.generate_ts_init();
2240 let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2241 for r in &raw {
2242 page.push(parse_candlestick(
2243 r,
2244 bar_type,
2245 inst.price_precision(),
2246 inst.size_precision(),
2247 ts_init,
2248 )?);
2249 }
2250 page.reverse();
2251
2252 let page_oldest_ms = page.first().map(|b| b.ts_event.as_i64() / 1_000_000);
2253 let page_newest_ms = page.last().map(|b| b.ts_event.as_i64() / 1_000_000);
2254
2255 let mut filtered: Vec<Bar> = if matches!(mode, Mode::Range)
2259 && out.is_empty()
2260 && pages < 2
2261 {
2262 let tolerance_ns = slot_ns * 2; if !page.is_empty() {
2269 log::debug!(
2270 "Range mode bootstrap page: {} bars from {} to {}, filtering with start={:?} end={:?}",
2271 page.len(),
2272 page.first().unwrap().ts_event.as_i64() / 1_000_000,
2273 page.last().unwrap().ts_event.as_i64() / 1_000_000,
2274 start_ms,
2275 end_ms
2276 );
2277 }
2278
2279 let result: Vec<Bar> = page
2280 .clone()
2281 .into_iter()
2282 .filter(|b| {
2283 let ts = b.ts_event.as_i64();
2284 let ok_after =
2286 start_ns.is_none_or(|sns| ts >= sns.saturating_sub(tolerance_ns));
2287 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2288 ok_after && ok_before
2289 })
2290 .collect();
2291
2292 result
2293 } else {
2294 page.clone()
2296 .into_iter()
2297 .filter(|b| {
2298 let ts = b.ts_event.as_i64();
2299 let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2300 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2301 ok_after && ok_before
2302 })
2303 .collect()
2304 };
2305
2306 if !page.is_empty() && filtered.is_empty() {
2307 if matches!(mode, Mode::Range)
2309 && !forward_prepend_mode
2310 && let (Some(newest_ms), Some(start_ms)) = (page_newest_ms, start_ms)
2311 && newest_ms < start_ms.saturating_sub(slot_ms * 2)
2312 {
2313 break;
2315 }
2316 }
2317
2318 let contribution;
2320
2321 if out.is_empty() {
2322 contribution = filtered.len();
2323 out = filtered;
2324 } else {
2325 match mode {
2326 Mode::Backward | Mode::Latest => {
2327 if let Some(first) = out.first() {
2328 filtered.retain(|b| b.ts_event < first.ts_event);
2329 }
2330 contribution = filtered.len();
2331 if contribution != 0 {
2332 let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2333 new_out.extend_from_slice(&filtered);
2334 new_out.extend_from_slice(&out);
2335 out = new_out;
2336 }
2337 }
2338 Mode::Range => {
2339 if forward_prepend_mode || req_used_before {
2340 if let Some(first) = out.first() {
2342 filtered.retain(|b| b.ts_event < first.ts_event);
2343 }
2344 contribution = filtered.len();
2345 if contribution != 0 {
2346 let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2347 new_out.extend_from_slice(&filtered);
2348 new_out.extend_from_slice(&out);
2349 out = new_out;
2350 }
2351 } else {
2352 if let Some(last) = out.last() {
2354 filtered.retain(|b| b.ts_event > last.ts_event);
2355 }
2356 contribution = filtered.len();
2357 out.extend(filtered);
2358 }
2359 }
2360 }
2361 }
2362
2363 if contribution == 0
2365 && matches!(mode, Mode::Latest | Mode::Backward | Mode::Range)
2366 && let Some(b) = before_ms
2367 {
2368 let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2369 let new_b = b.saturating_sub(jump);
2370 if new_b != b {
2371 before_ms = Some(new_b);
2372 }
2373 }
2374
2375 if contribution == 0 {
2376 progressless_loops = progressless_loops.saturating_add(1);
2377 if progressless_loops >= 3 {
2378 break;
2379 }
2380 } else {
2381 progressless_loops = 0;
2382
2383 match mode {
2385 Mode::Latest | Mode::Backward => {
2386 if let Some(oldest) = page_oldest_ms {
2387 before_ms = Some(oldest.saturating_sub(1));
2388 have_latest_first_page = true;
2389 } else {
2390 break;
2391 }
2392 }
2393 Mode::Range => {
2394 if forward_prepend_mode || req_used_before {
2395 if let Some(oldest) = page_oldest_ms {
2396 let jump_back = slot_ms.max(60_000); before_ms = Some(oldest.saturating_sub(jump_back));
2399 after_ms = None;
2400 } else {
2401 break;
2402 }
2403 } else if let Some(newest) = page_newest_ms {
2404 after_ms = Some(newest.saturating_add(1));
2405 before_ms = None;
2406 } else {
2407 break;
2408 }
2409 }
2410 }
2411 }
2412
2413 if let Some(lim) = limit
2415 && lim > 0
2416 && out.len() >= lim as usize
2417 {
2418 break;
2419 }
2420 if let Some(ens) = end_ns
2421 && let Some(last) = out.last()
2422 && last.ts_event.as_i64() >= ens
2423 {
2424 break;
2425 }
2426 if let Some(sns) = start_ns
2427 && let Some(first) = out.first()
2428 && (matches!(mode, Mode::Backward) || forward_prepend_mode)
2429 && first.ts_event.as_i64() <= sns
2430 {
2431 if matches!(mode, Mode::Range) {
2433 if let Some(ens) = end_ns
2435 && let Some(last) = out.last()
2436 {
2437 let last_ts = last.ts_event.as_i64();
2438 if last_ts < ens {
2439 forward_prepend_mode = false;
2442 after_ms = Some((last_ts / 1_000_000).saturating_add(1));
2443 before_ms = None;
2444 continue;
2445 }
2446 }
2447 }
2448 break;
2449 }
2450
2451 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2452 }
2453
2454 if out.is_empty() && matches!(mode, Mode::Range) {
2456 let pivot = end_ms.unwrap_or(now_ms.saturating_sub(1));
2457 let hist = (now_ms.saturating_sub(pivot)) / (24 * 60 * 60 * 1000) > HISTORY_SPLIT_DAYS;
2458 let mut p = GetCandlesticksParamsBuilder::default();
2459 p.inst_id(symbol.as_str())
2460 .bar(&bar_param)
2461 .limit(300)
2462 .before_ms(pivot);
2463 let params = p.build().map_err(anyhow::Error::new)?;
2464 let raw = if hist {
2465 self.inner.get_history_candles(params).await
2466 } else {
2467 self.inner.get_candles(params).await
2468 }
2469 .map_err(anyhow::Error::new)?;
2470 if !raw.is_empty() {
2471 let ts_init = self.generate_ts_init();
2472 let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2473 for r in &raw {
2474 page.push(parse_candlestick(
2475 r,
2476 bar_type,
2477 inst.price_precision(),
2478 inst.size_precision(),
2479 ts_init,
2480 )?);
2481 }
2482 page.reverse();
2483 out = page
2484 .into_iter()
2485 .filter(|b| {
2486 let ts = b.ts_event.as_i64();
2487 let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2488 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2489 ok_after && ok_before
2490 })
2491 .collect();
2492 }
2493 }
2494
2495 if let Some(ens) = end_ns {
2497 while out.last().is_some_and(|b| b.ts_event.as_i64() > ens) {
2498 out.pop();
2499 }
2500 }
2501
2502 if matches!(mode, Mode::Range)
2504 && !forward_prepend_mode
2505 && let Some(sns) = start_ns
2506 {
2507 let lower = sns.saturating_sub(slot_ns);
2508 while out.first().is_some_and(|b| b.ts_event.as_i64() < lower) {
2509 out.remove(0);
2510 }
2511 }
2512
2513 if let Some(lim) = limit
2514 && lim > 0
2515 && out.len() > lim as usize
2516 {
2517 out.truncate(lim as usize);
2518 }
2519
2520 Ok(out)
2521 }
2522
2523 #[allow(clippy::too_many_arguments)]
2534 pub async fn request_order_status_reports(
2535 &self,
2536 account_id: AccountId,
2537 instrument_type: Option<OKXInstrumentType>,
2538 instrument_id: Option<InstrumentId>,
2539 start: Option<DateTime<Utc>>,
2540 end: Option<DateTime<Utc>>,
2541 open_only: bool,
2542 limit: Option<u32>,
2543 ) -> anyhow::Result<Vec<OrderStatusReport>> {
2544 let mut history_params = GetOrderHistoryParamsBuilder::default();
2545
2546 let instrument_type = if let Some(instrument_type) = instrument_type {
2547 instrument_type
2548 } else {
2549 let instrument_id = instrument_id.ok_or_else(|| {
2550 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2551 })?;
2552 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2553 okx_instrument_type(&instrument)?
2554 };
2555
2556 history_params.inst_type(instrument_type);
2557
2558 if let Some(instrument_id) = instrument_id.as_ref() {
2559 history_params.inst_id(instrument_id.symbol.inner().to_string());
2560 }
2561
2562 if let Some(limit) = limit {
2563 history_params.limit(limit);
2564 }
2565
2566 let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
2567
2568 let mut pending_params = GetOrderListParamsBuilder::default();
2569 pending_params.inst_type(instrument_type);
2570
2571 if let Some(instrument_id) = instrument_id.as_ref() {
2572 pending_params.inst_id(instrument_id.symbol.inner().to_string());
2573 }
2574
2575 if let Some(limit) = limit {
2576 pending_params.limit(limit);
2577 }
2578
2579 let pending_params = pending_params.build().map_err(|e| anyhow::anyhow!(e))?;
2580
2581 let combined_resp = if open_only {
2582 self.inner
2584 .get_orders_pending(pending_params)
2585 .await
2586 .map_err(|e| anyhow::anyhow!(e))?
2587 } else {
2588 let (history_resp, pending_resp) = tokio::try_join!(
2590 self.inner.get_orders_history(history_params),
2591 self.inner.get_orders_pending(pending_params)
2592 )
2593 .map_err(|e| anyhow::anyhow!(e))?;
2594
2595 let mut combined_resp = history_resp;
2597 combined_resp.extend(pending_resp);
2598 combined_resp
2599 };
2600
2601 let start_ns = start.map(UnixNanos::from);
2603 let end_ns = end.map(UnixNanos::from);
2604
2605 let ts_init = self.generate_ts_init();
2606 let mut reports = Vec::with_capacity(combined_resp.len());
2607
2608 let mut seen: AHashSet<String> = AHashSet::new();
2610
2611 for order in combined_resp {
2612 let seen_key = if !order.cl_ord_id.is_empty() {
2613 order.cl_ord_id.as_str().to_string()
2614 } else if let Some(algo_cl_ord_id) = order
2615 .algo_cl_ord_id
2616 .as_ref()
2617 .filter(|value| !value.as_str().is_empty())
2618 {
2619 algo_cl_ord_id.as_str().to_string()
2620 } else if let Some(algo_id) = order
2621 .algo_id
2622 .as_ref()
2623 .filter(|value| !value.as_str().is_empty())
2624 {
2625 algo_id.as_str().to_string()
2626 } else {
2627 order.ord_id.as_str().to_string()
2628 };
2629
2630 if !seen.insert(seen_key) {
2631 continue; }
2633
2634 let Ok(inst) = self.instrument_from_cache(order.inst_id) else {
2635 log::debug!(
2636 "Skipping order report for instrument not in cache: symbol={}",
2637 order.inst_id,
2638 );
2639 continue;
2640 };
2641
2642 let report = match parse_order_status_report(
2643 &order,
2644 account_id,
2645 inst.id(),
2646 inst.price_precision(),
2647 inst.size_precision(),
2648 ts_init,
2649 ) {
2650 Ok(report) => report,
2651 Err(e) => {
2652 log::error!("Failed to parse order status report: {e}");
2653 continue;
2654 }
2655 };
2656
2657 if let Some(start_ns) = start_ns
2658 && report.ts_last < start_ns
2659 {
2660 continue;
2661 }
2662 if let Some(end_ns) = end_ns
2663 && report.ts_last > end_ns
2664 {
2665 continue;
2666 }
2667
2668 reports.push(report);
2669 }
2670
2671 Ok(reports)
2672 }
2673
2674 pub async fn request_fill_reports(
2684 &self,
2685 account_id: AccountId,
2686 instrument_type: Option<OKXInstrumentType>,
2687 instrument_id: Option<InstrumentId>,
2688 start: Option<DateTime<Utc>>,
2689 end: Option<DateTime<Utc>>,
2690 limit: Option<u32>,
2691 ) -> anyhow::Result<Vec<FillReport>> {
2692 let mut params = GetTransactionDetailsParamsBuilder::default();
2693
2694 let instrument_type = if let Some(instrument_type) = instrument_type {
2695 instrument_type
2696 } else {
2697 let instrument_id = instrument_id.ok_or_else(|| {
2698 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2699 })?;
2700 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2701 okx_instrument_type(&instrument)?
2702 };
2703
2704 params.inst_type(instrument_type);
2705
2706 if let Some(instrument_id) = instrument_id {
2707 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2708 let instrument_type = okx_instrument_type(&instrument)?;
2709 params.inst_type(instrument_type);
2710 params.inst_id(instrument_id.symbol.inner().to_string());
2711 }
2712
2713 if let Some(limit) = limit {
2714 params.limit(limit);
2715 }
2716
2717 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2718
2719 let resp = self
2720 .inner
2721 .get_fills(params)
2722 .await
2723 .map_err(|e| anyhow::anyhow!(e))?;
2724
2725 let start_ns = start.map(UnixNanos::from);
2727 let end_ns = end.map(UnixNanos::from);
2728
2729 let ts_init = self.generate_ts_init();
2730 let mut reports = Vec::with_capacity(resp.len());
2731
2732 for detail in resp {
2733 if detail.fill_sz.is_empty() {
2735 continue;
2736 }
2737 if let Ok(qty) = detail.fill_sz.parse::<f64>() {
2738 if qty <= 0.0 {
2739 continue;
2740 }
2741 } else {
2742 continue;
2744 }
2745
2746 let Ok(inst) = self.instrument_from_cache(detail.inst_id) else {
2747 log::debug!(
2748 "Skipping fill report for instrument not in cache: symbol={}",
2749 detail.inst_id,
2750 );
2751 continue;
2752 };
2753
2754 let report = match parse_fill_report(
2755 detail,
2756 account_id,
2757 inst.id(),
2758 inst.price_precision(),
2759 inst.size_precision(),
2760 ts_init,
2761 ) {
2762 Ok(report) => report,
2763 Err(e) => {
2764 log::error!("Failed to parse fill report: {e}");
2765 continue;
2766 }
2767 };
2768
2769 if let Some(start_ns) = start_ns
2770 && report.ts_event < start_ns
2771 {
2772 continue;
2773 }
2774
2775 if let Some(end_ns) = end_ns
2776 && report.ts_event > end_ns
2777 {
2778 continue;
2779 }
2780
2781 reports.push(report);
2782 }
2783
2784 Ok(reports)
2785 }
2786
2787 pub async fn request_position_status_reports(
2814 &self,
2815 account_id: AccountId,
2816 instrument_type: Option<OKXInstrumentType>,
2817 instrument_id: Option<InstrumentId>,
2818 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2819 let mut params = GetPositionsParamsBuilder::default();
2820
2821 let instrument_type = if let Some(instrument_type) = instrument_type {
2822 instrument_type
2823 } else {
2824 let instrument_id = instrument_id.ok_or_else(|| {
2825 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2826 })?;
2827 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2828 okx_instrument_type(&instrument)?
2829 };
2830
2831 params.inst_type(instrument_type);
2832
2833 instrument_id
2834 .as_ref()
2835 .map(|i| params.inst_id(i.symbol.inner()));
2836
2837 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2838
2839 let resp = self
2840 .inner
2841 .get_positions(params)
2842 .await
2843 .map_err(|e| anyhow::anyhow!(e))?;
2844
2845 let ts_init = self.generate_ts_init();
2846 let mut reports = Vec::with_capacity(resp.len());
2847
2848 for position in resp {
2849 let Ok(inst) = self.instrument_from_cache(position.inst_id) else {
2850 log::debug!(
2851 "Skipping position report for instrument not in cache: symbol={}",
2852 position.inst_id,
2853 );
2854 continue;
2855 };
2856
2857 match parse_position_status_report(
2858 position,
2859 account_id,
2860 inst.id(),
2861 inst.size_precision(),
2862 ts_init,
2863 ) {
2864 Ok(report) => reports.push(report),
2865 Err(e) => {
2866 log::error!("Failed to parse position status report: {e}");
2867 continue;
2868 }
2869 };
2870 }
2871
2872 Ok(reports)
2873 }
2874
2875 pub async fn request_spot_margin_position_reports(
2890 &self,
2891 account_id: AccountId,
2892 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2893 let accounts = self
2894 .inner
2895 .get_balance()
2896 .await
2897 .map_err(|e| anyhow::anyhow!(e))?;
2898
2899 let ts_init = self.generate_ts_init();
2900 let mut reports = Vec::new();
2901
2902 for account in accounts {
2903 for balance in account.details {
2904 let ccy_str = balance.ccy.as_str();
2905
2906 let potential_symbols = [
2908 format!("{ccy_str}-USDT"),
2909 format!("{ccy_str}-USD"),
2910 format!("{ccy_str}-USDC"),
2911 ];
2912
2913 let instrument_result = potential_symbols.iter().find_map(|symbol| {
2914 self.instrument_from_cache(Ustr::from(symbol))
2915 .ok()
2916 .map(|inst| (inst.id(), inst.size_precision()))
2917 });
2918
2919 let (instrument_id, size_precision) = match instrument_result {
2920 Some((id, prec)) => (id, prec),
2921 None => {
2922 log::debug!(
2923 "Skipping balance for {ccy_str} - no matching instrument in cache"
2924 );
2925 continue;
2926 }
2927 };
2928
2929 match parse_spot_margin_position_from_balance(
2930 &balance,
2931 account_id,
2932 instrument_id,
2933 size_precision,
2934 ts_init,
2935 ) {
2936 Ok(Some(report)) => reports.push(report),
2937 Ok(None) => {} Err(e) => {
2939 log::error!(
2940 "Failed to parse spot margin position from balance for {ccy_str}: {e}"
2941 );
2942 continue;
2943 }
2944 };
2945 }
2946 }
2947
2948 Ok(reports)
2949 }
2950
2951 pub async fn place_algo_order(
2961 &self,
2962 request: OKXPlaceAlgoOrderRequest,
2963 ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
2964 let body =
2965 serde_json::to_vec(&request).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2966
2967 let resp: Vec<OKXPlaceAlgoOrderResponse> = self
2968 .inner
2969 .send_request::<_, ()>(
2970 Method::POST,
2971 "/api/v5/trade/order-algo",
2972 None,
2973 Some(body),
2974 true,
2975 )
2976 .await?;
2977
2978 resp.into_iter()
2979 .next()
2980 .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2981 }
2982
2983 pub async fn cancel_algo_order(
2993 &self,
2994 request: OKXCancelAlgoOrderRequest,
2995 ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
2996 let body =
2999 serde_json::to_vec(&[request]).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
3000
3001 let resp: Vec<OKXCancelAlgoOrderResponse> = self
3002 .inner
3003 .send_request::<_, ()>(
3004 Method::POST,
3005 "/api/v5/trade/cancel-algos",
3006 None,
3007 Some(body),
3008 true,
3009 )
3010 .await?;
3011
3012 resp.into_iter()
3013 .next()
3014 .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
3015 }
3016
3017 pub async fn cancel_algo_orders(
3027 &self,
3028 requests: Vec<OKXCancelAlgoOrderRequest>,
3029 ) -> Result<Vec<OKXCancelAlgoOrderResponse>, OKXHttpError> {
3030 if requests.is_empty() {
3031 return Ok(Vec::new());
3032 }
3033
3034 let body =
3035 serde_json::to_vec(&requests).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
3036
3037 self.inner
3038 .send_request::<_, ()>(
3039 Method::POST,
3040 "/api/v5/trade/cancel-algos",
3041 None,
3042 Some(body),
3043 true,
3044 )
3045 .await
3046 }
3047
3048 #[allow(clippy::too_many_arguments)]
3057 pub async fn place_algo_order_with_domain_types(
3058 &self,
3059 instrument_id: InstrumentId,
3060 td_mode: OKXTradeMode,
3061 client_order_id: ClientOrderId,
3062 order_side: OrderSide,
3063 order_type: OrderType,
3064 quantity: Quantity,
3065 trigger_price: Price,
3066 trigger_type: Option<TriggerType>,
3067 limit_price: Option<Price>,
3068 reduce_only: Option<bool>,
3069 ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
3070 if !matches!(order_side, OrderSide::Buy | OrderSide::Sell) {
3071 return Err(OKXHttpError::ValidationError(
3072 "Invalid order side".to_string(),
3073 ));
3074 }
3075 let okx_side: OKXSide = order_side.into();
3076
3077 let trigger_px_type_enum = trigger_type.map_or(OKXTriggerType::Last, Into::into);
3079
3080 let order_px = if matches!(order_type, OrderType::StopLimit | OrderType::LimitIfTouched) {
3082 limit_price.map(|p| p.to_string())
3083 } else {
3084 Some("-1".to_string())
3086 };
3087
3088 let request = OKXPlaceAlgoOrderRequest {
3089 inst_id: instrument_id.symbol.as_str().to_string(),
3090 inst_id_code: None,
3091 td_mode,
3092 side: okx_side,
3093 ord_type: OKXAlgoOrderType::Trigger, sz: quantity.to_string(),
3095 algo_cl_ord_id: Some(client_order_id.as_str().to_string()),
3096 trigger_px: Some(trigger_price.to_string()),
3097 order_px,
3098 trigger_px_type: Some(trigger_px_type_enum),
3099 tgt_ccy: None, pos_side: None, close_position: None,
3102 tag: Some(OKX_NAUTILUS_BROKER_ID.to_string()),
3103 reduce_only,
3104 };
3105
3106 self.place_algo_order(request).await
3107 }
3108
3109 pub async fn cancel_algo_order_with_domain_types(
3118 &self,
3119 instrument_id: InstrumentId,
3120 algo_id: String,
3121 ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
3122 let request = OKXCancelAlgoOrderRequest {
3123 inst_id: instrument_id.symbol.to_string(),
3124 inst_id_code: None,
3125 algo_id: Some(algo_id),
3126 algo_cl_ord_id: None,
3127 };
3128
3129 self.cancel_algo_order(request).await
3130 }
3131
3132 #[allow(clippy::too_many_arguments)]
3138 pub async fn request_algo_order_status_reports(
3139 &self,
3140 account_id: AccountId,
3141 instrument_type: Option<OKXInstrumentType>,
3142 instrument_id: Option<InstrumentId>,
3143 algo_id: Option<String>,
3144 algo_client_order_id: Option<ClientOrderId>,
3145 state: Option<OKXOrderStatus>,
3146 limit: Option<u32>,
3147 ) -> anyhow::Result<Vec<OrderStatusReport>> {
3148 let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
3149
3150 let inst_type = if let Some(inst_type) = instrument_type {
3151 inst_type
3152 } else if let Some(inst_id) = instrument_id {
3153 let instrument = self.instrument_from_cache(inst_id.symbol.inner())?;
3154 let inst_type = okx_instrument_type(&instrument)?;
3155 instruments_cache.insert(inst_id.symbol.inner(), instrument);
3156 inst_type
3157 } else {
3158 anyhow::bail!("instrument_type or instrument_id required for algo order query")
3159 };
3160
3161 let mut params_builder = GetAlgoOrdersParamsBuilder::default();
3162 params_builder.inst_type(inst_type);
3163 if let Some(inst_id) = instrument_id {
3164 params_builder.inst_id(inst_id.symbol.inner().to_string());
3165 }
3166 if let Some(algo_id) = algo_id.as_ref() {
3167 params_builder.algo_id(algo_id.clone());
3168 }
3169 if let Some(client_order_id) = algo_client_order_id.as_ref() {
3170 params_builder.algo_cl_ord_id(client_order_id.as_str().to_string());
3171 }
3172 if let Some(state) = state {
3173 params_builder.state(state);
3174 }
3175 if let Some(limit) = limit {
3176 params_builder.limit(limit);
3177 }
3178
3179 let params = params_builder
3180 .build()
3181 .map_err(|e| anyhow::anyhow!(format!("Failed to build algo order params: {e}")))?;
3182
3183 let ts_init = self.generate_ts_init();
3184 let mut reports = Vec::new();
3185 let mut seen: AHashSet<(String, String)> = AHashSet::new();
3186
3187 let pending = match self.inner.get_order_algo_pending(params.clone()).await {
3188 Ok(result) => result,
3189 Err(OKXHttpError::UnexpectedStatus { status, .. })
3190 if status == StatusCode::NOT_FOUND =>
3191 {
3192 Vec::new()
3193 }
3194 Err(e) => return Err(e.into()),
3195 };
3196 self.collect_algo_reports(
3197 account_id,
3198 &pending,
3199 &mut instruments_cache,
3200 ts_init,
3201 &mut seen,
3202 &mut reports,
3203 )
3204 .await?;
3205
3206 let history = match self.inner.get_order_algo_history(params).await {
3207 Ok(result) => result,
3208 Err(OKXHttpError::UnexpectedStatus { status, .. })
3209 if status == StatusCode::NOT_FOUND =>
3210 {
3211 Vec::new()
3212 }
3213 Err(e) => return Err(e.into()),
3214 };
3215 self.collect_algo_reports(
3216 account_id,
3217 &history,
3218 &mut instruments_cache,
3219 ts_init,
3220 &mut seen,
3221 &mut reports,
3222 )
3223 .await?;
3224
3225 Ok(reports)
3226 }
3227
3228 pub async fn request_algo_order_status_report(
3234 &self,
3235 account_id: AccountId,
3236 instrument_id: InstrumentId,
3237 algo_client_order_id: ClientOrderId,
3238 ) -> anyhow::Result<Option<OrderStatusReport>> {
3239 let reports = self
3240 .request_algo_order_status_reports(
3241 account_id,
3242 None,
3243 Some(instrument_id),
3244 None,
3245 Some(algo_client_order_id),
3246 None,
3247 Some(50_u32),
3248 )
3249 .await?;
3250
3251 Ok(reports.into_iter().next())
3252 }
3253
3254 pub fn raw_client(&self) -> &Arc<OKXRawHttpClient> {
3256 &self.inner
3257 }
3258
3259 async fn collect_algo_reports(
3260 &self,
3261 account_id: AccountId,
3262 orders: &[OKXOrderAlgo],
3263 instruments_cache: &mut AHashMap<Ustr, InstrumentAny>,
3264 ts_init: UnixNanos,
3265 seen: &mut AHashSet<(String, String)>,
3266 reports: &mut Vec<OrderStatusReport>,
3267 ) -> anyhow::Result<()> {
3268 for order in orders {
3269 let key = (order.algo_id.clone(), order.algo_cl_ord_id.clone());
3270 if !seen.insert(key) {
3271 continue;
3272 }
3273
3274 let instrument = if let Some(instrument) = instruments_cache.get(&order.inst_id) {
3275 instrument.clone()
3276 } else {
3277 let Ok(instrument) = self.instrument_from_cache(order.inst_id) else {
3278 log::debug!(
3279 "Skipping algo order report for instrument not in cache: symbol={}",
3280 order.inst_id,
3281 );
3282 continue;
3283 };
3284 instruments_cache.insert(order.inst_id, instrument.clone());
3285 instrument
3286 };
3287
3288 match parse_http_algo_order(order, account_id, &instrument, ts_init) {
3289 Ok(report) => reports.push(report),
3290 Err(e) => {
3291 log::error!("Failed to parse algo order report: {e}");
3292 }
3293 }
3294 }
3295
3296 Ok(())
3297 }
3298}
3299
3300fn parse_http_algo_order(
3301 order: &OKXOrderAlgo,
3302 account_id: AccountId,
3303 instrument: &InstrumentAny,
3304 ts_init: UnixNanos,
3305) -> anyhow::Result<OrderStatusReport> {
3306 let ord_px = if order.ord_px.is_empty() {
3307 "-1".to_string()
3308 } else {
3309 order.ord_px.clone()
3310 };
3311
3312 let reduce_only = if order.reduce_only.is_empty() {
3313 "false".to_string()
3314 } else {
3315 order.reduce_only.clone()
3316 };
3317
3318 let msg = OKXAlgoOrderMsg {
3319 algo_id: order.algo_id.clone(),
3320 algo_cl_ord_id: order.algo_cl_ord_id.clone(),
3321 cl_ord_id: order.cl_ord_id.clone(),
3322 ord_id: order.ord_id.clone(),
3323 inst_id: order.inst_id,
3324 inst_type: order.inst_type,
3325 ord_type: order.ord_type,
3326 state: order.state,
3327 side: order.side,
3328 pos_side: order.pos_side,
3329 sz: order.sz.clone(),
3330 trigger_px: order.trigger_px.clone(),
3331 trigger_px_type: order.trigger_px_type.unwrap_or(OKXTriggerType::None),
3332 ord_px,
3333 td_mode: order.td_mode,
3334 lever: order.lever.clone(),
3335 reduce_only,
3336 actual_px: order.actual_px.clone(),
3337 actual_sz: order.actual_sz.clone(),
3338 notional_usd: order.notional_usd.clone(),
3339 c_time: order.c_time,
3340 u_time: order.u_time,
3341 trigger_time: order.trigger_time.clone(),
3342 tag: order.tag.clone(),
3343 };
3344
3345 parse_algo_order_status_report(&msg, instrument, account_id, ts_init)
3346}