1use std::{
37 collections::HashMap,
38 fmt::{Debug, Formatter},
39 num::NonZeroU32,
40 str::FromStr,
41 sync::{
42 Arc, LazyLock,
43 atomic::{AtomicBool, Ordering},
44 },
45};
46
47use ahash::{AHashMap, AHashSet};
48use chrono::{DateTime, Utc};
49use dashmap::DashMap;
50use nautilus_core::{
51 UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var, time::get_atomic_clock_realtime,
52};
53use nautilus_model::{
54 data::{Bar, BarType, IndexPriceUpdate, MarkPriceUpdate, TradeTick},
55 enums::{AggregationSource, BarAggregation, OrderSide, OrderType, TriggerType},
56 events::AccountState,
57 identifiers::{AccountId, ClientOrderId, InstrumentId},
58 instruments::{Instrument, InstrumentAny},
59 reports::{FillReport, OrderStatusReport, PositionStatusReport},
60 types::{Price, Quantity},
61};
62use nautilus_network::{
63 http::HttpClient,
64 ratelimiter::quota::Quota,
65 retry::{RetryConfig, RetryManager},
66};
67use reqwest::{Method, StatusCode, header::USER_AGENT};
68use rust_decimal::Decimal;
69use serde::{Deserialize, Serialize, de::DeserializeOwned};
70use tokio_util::sync::CancellationToken;
71use ustr::Ustr;
72
73use super::{
74 error::OKXHttpError,
75 models::{
76 OKXAccount, OKXCancelAlgoOrderRequest, OKXCancelAlgoOrderResponse, OKXFeeRate,
77 OKXIndexTicker, OKXMarkPrice, OKXOrderAlgo, OKXOrderHistory, OKXPlaceAlgoOrderRequest,
78 OKXPlaceAlgoOrderResponse, OKXPosition, OKXPositionHistory, OKXPositionTier, OKXServerTime,
79 OKXTransactionDetail,
80 },
81 query::{
82 GetAlgoOrdersParams, GetAlgoOrdersParamsBuilder, GetCandlesticksParams,
83 GetCandlesticksParamsBuilder, GetIndexTickerParams, GetIndexTickerParamsBuilder,
84 GetInstrumentsParams, GetInstrumentsParamsBuilder, GetMarkPriceParams,
85 GetMarkPriceParamsBuilder, GetOrderHistoryParams, GetOrderHistoryParamsBuilder,
86 GetOrderListParams, GetOrderListParamsBuilder, GetPositionTiersParams,
87 GetPositionsHistoryParams, GetPositionsParams, GetPositionsParamsBuilder,
88 GetTradeFeeParams, GetTradesParams, GetTradesParamsBuilder, GetTransactionDetailsParams,
89 GetTransactionDetailsParamsBuilder, SetPositionModeParams, SetPositionModeParamsBuilder,
90 },
91};
92use crate::{
93 common::{
94 consts::{OKX_HTTP_URL, OKX_NAUTILUS_BROKER_ID, should_retry_error_code},
95 credential::Credential,
96 enums::{
97 OKXAlgoOrderType, OKXContractType, OKXInstrumentStatus, OKXInstrumentType,
98 OKXOrderStatus, OKXPositionMode, OKXSide, OKXTradeMode, OKXTriggerType,
99 },
100 models::OKXInstrument,
101 parse::{
102 okx_instrument_type, okx_instrument_type_from_symbol, parse_account_state,
103 parse_candlestick, parse_fill_report, parse_index_price_update, parse_instrument_any,
104 parse_mark_price_update, parse_order_status_report, parse_position_status_report,
105 parse_spot_margin_position_from_balance, parse_trade_tick,
106 },
107 },
108 http::{
109 models::{OKXCandlestick, OKXTrade},
110 query::GetOrderParams,
111 },
112 websocket::{messages::OKXAlgoOrderMsg, parse::parse_algo_order_status_report},
113};
114
115const OKX_SUCCESS_CODE: &str = "0";
116
117pub static OKX_REST_QUOTA: LazyLock<Quota> =
126 LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
127
128const OKX_GLOBAL_RATE_KEY: &str = "okx:global";
129
130#[derive(Debug, Serialize, Deserialize)]
132pub struct OKXResponse<T> {
133 pub code: String,
135 pub msg: String,
137 pub data: Vec<T>,
139}
140
141pub struct OKXRawHttpClient {
147 base_url: String,
148 client: HttpClient,
149 credential: Option<Credential>,
150 retry_manager: RetryManager<OKXHttpError>,
151 cancellation_token: CancellationToken,
152 is_demo: bool,
153}
154
155impl Default for OKXRawHttpClient {
156 fn default() -> Self {
157 Self::new(None, Some(60), None, None, None, false, None)
158 .expect("Failed to create default OKXRawHttpClient")
159 }
160}
161
162impl Debug for OKXRawHttpClient {
163 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
164 let credential = self.credential.as_ref().map(|_| "<redacted>");
165 f.debug_struct(stringify!(OKXRawHttpClient))
166 .field("base_url", &self.base_url)
167 .field("credential", &credential)
168 .finish_non_exhaustive()
169 }
170}
171
172impl OKXRawHttpClient {
173 fn rate_limiter_quotas() -> Vec<(String, Quota)> {
174 vec![
175 (OKX_GLOBAL_RATE_KEY.to_string(), *OKX_REST_QUOTA),
176 (
177 "okx:/api/v5/account/balance".to_string(),
178 Quota::per_second(NonZeroU32::new(5).unwrap()),
179 ),
180 (
181 "okx:/api/v5/public/instruments".to_string(),
182 Quota::per_second(NonZeroU32::new(10).unwrap()),
183 ),
184 (
185 "okx:/api/v5/market/candles".to_string(),
186 Quota::per_second(NonZeroU32::new(50).unwrap()),
187 ),
188 (
189 "okx:/api/v5/market/history-candles".to_string(),
190 Quota::per_second(NonZeroU32::new(20).unwrap()),
191 ),
192 (
193 "okx:/api/v5/market/history-trades".to_string(),
194 Quota::per_second(NonZeroU32::new(30).unwrap()),
195 ),
196 (
197 "okx:/api/v5/trade/order".to_string(),
198 Quota::per_second(NonZeroU32::new(30).unwrap()), ),
200 (
201 "okx:/api/v5/trade/orders-pending".to_string(),
202 Quota::per_second(NonZeroU32::new(20).unwrap()),
203 ),
204 (
205 "okx:/api/v5/trade/orders-history".to_string(),
206 Quota::per_second(NonZeroU32::new(20).unwrap()),
207 ),
208 (
209 "okx:/api/v5/trade/fills".to_string(),
210 Quota::per_second(NonZeroU32::new(30).unwrap()),
211 ),
212 (
213 "okx:/api/v5/trade/order-algo".to_string(),
214 Quota::per_second(NonZeroU32::new(10).unwrap()),
215 ),
216 (
217 "okx:/api/v5/trade/cancel-algos".to_string(),
218 Quota::per_second(NonZeroU32::new(10).unwrap()),
219 ),
220 ]
221 }
222
223 fn rate_limit_keys(endpoint: &str) -> Vec<Ustr> {
224 let normalized = endpoint.split('?').next().unwrap_or(endpoint);
225 let route = format!("okx:{normalized}");
226
227 vec![Ustr::from(OKX_GLOBAL_RATE_KEY), Ustr::from(route.as_str())]
228 }
229
230 pub fn cancel_all_requests(&self) {
232 self.cancellation_token.cancel();
233 }
234
235 pub fn cancellation_token(&self) -> &CancellationToken {
237 &self.cancellation_token
238 }
239
240 pub fn new(
250 base_url: Option<String>,
251 timeout_secs: Option<u64>,
252 max_retries: Option<u32>,
253 retry_delay_ms: Option<u64>,
254 retry_delay_max_ms: Option<u64>,
255 is_demo: bool,
256 proxy_url: Option<String>,
257 ) -> Result<Self, OKXHttpError> {
258 let retry_config = RetryConfig {
259 max_retries: max_retries.unwrap_or(3),
260 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
261 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
262 backoff_factor: 2.0,
263 jitter_ms: 1000,
264 operation_timeout_ms: Some(60_000),
265 immediate_first: false,
266 max_elapsed_ms: Some(180_000),
267 };
268
269 let retry_manager = RetryManager::new(retry_config);
270
271 Ok(Self {
272 base_url: base_url.unwrap_or(OKX_HTTP_URL.to_string()),
273 client: HttpClient::new(
274 Self::default_headers(is_demo),
275 vec![],
276 Self::rate_limiter_quotas(),
277 Some(*OKX_REST_QUOTA),
278 timeout_secs,
279 proxy_url,
280 )
281 .map_err(|e| {
282 OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
283 })?,
284 credential: None,
285 retry_manager,
286 cancellation_token: CancellationToken::new(),
287 is_demo,
288 })
289 }
290
291 #[allow(clippy::too_many_arguments)]
298 pub fn with_credentials(
299 api_key: String,
300 api_secret: String,
301 api_passphrase: String,
302 base_url: String,
303 timeout_secs: Option<u64>,
304 max_retries: Option<u32>,
305 retry_delay_ms: Option<u64>,
306 retry_delay_max_ms: Option<u64>,
307 is_demo: bool,
308 proxy_url: Option<String>,
309 ) -> Result<Self, OKXHttpError> {
310 let retry_config = RetryConfig {
311 max_retries: max_retries.unwrap_or(3),
312 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
313 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
314 backoff_factor: 2.0,
315 jitter_ms: 1000,
316 operation_timeout_ms: Some(60_000),
317 immediate_first: false,
318 max_elapsed_ms: Some(180_000),
319 };
320
321 let retry_manager = RetryManager::new(retry_config);
322
323 Ok(Self {
324 base_url,
325 client: HttpClient::new(
326 Self::default_headers(is_demo),
327 vec![],
328 Self::rate_limiter_quotas(),
329 Some(*OKX_REST_QUOTA),
330 timeout_secs,
331 proxy_url,
332 )
333 .map_err(|e| {
334 OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
335 })?,
336 credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
337 retry_manager,
338 cancellation_token: CancellationToken::new(),
339 is_demo,
340 })
341 }
342
343 fn default_headers(is_demo: bool) -> HashMap<String, String> {
345 let mut headers =
346 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())]);
347
348 if is_demo {
349 headers.insert("x-simulated-trading".to_string(), "1".to_string());
350 }
351
352 headers
353 }
354
355 fn sign_request(
362 &self,
363 method: &Method,
364 path: &str,
365 body: Option<&[u8]>,
366 ) -> Result<HashMap<String, String>, OKXHttpError> {
367 let credential = match self.credential.as_ref() {
368 Some(c) => c,
369 None => return Err(OKXHttpError::MissingCredentials),
370 };
371
372 let api_key = credential.api_key.to_string();
373 let api_passphrase = credential.api_passphrase.clone();
374
375 let now = Utc::now();
377 let millis = now.timestamp_subsec_millis();
378 let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string() + &format!(".{:03}Z", millis);
379 let signature = credential.sign_bytes(×tamp, method.as_str(), path, body);
380
381 let mut headers = HashMap::new();
382 headers.insert("OK-ACCESS-KEY".to_string(), api_key);
383 headers.insert("OK-ACCESS-PASSPHRASE".to_string(), api_passphrase);
384 headers.insert("OK-ACCESS-TIMESTAMP".to_string(), timestamp);
385 headers.insert("OK-ACCESS-SIGN".to_string(), signature);
386
387 Ok(headers)
388 }
389
390 async fn send_request<T: DeserializeOwned, P: Serialize>(
406 &self,
407 method: Method,
408 path: &str,
409 params: Option<&P>,
410 body: Option<Vec<u8>>,
411 authenticate: bool,
412 ) -> Result<Vec<T>, OKXHttpError> {
413 let url = format!("{}{path}", self.base_url);
414 let endpoint = path.to_string();
415 let method_clone = method.clone();
416 let body_clone = body.clone();
417
418 let operation = || {
419 let url = url.clone();
420 let method = method_clone.clone();
421 let body = body_clone.clone();
422 let endpoint = endpoint.clone();
423
424 async move {
425 let query_string = if let Some(p) = params {
427 serde_urlencoded::to_string(p).map_err(|e| {
428 OKXHttpError::JsonError(format!("Failed to serialize params: {e}"))
429 })?
430 } else {
431 String::new()
432 };
433
434 let full_path = if query_string.is_empty() {
436 endpoint.clone()
437 } else {
438 format!("{}?{}", endpoint, query_string)
439 };
440
441 let mut headers = if authenticate {
442 self.sign_request(&method, &full_path, body.as_deref())?
443 } else {
444 HashMap::new()
445 };
446
447 if body.is_some() {
449 headers.insert("Content-Type".to_string(), "application/json".to_string());
450 }
451
452 let rate_keys = Self::rate_limit_keys(endpoint.as_str())
453 .into_iter()
454 .map(|k| k.to_string())
455 .collect();
456 let resp = self
457 .client
458 .request_with_params(
459 method.clone(),
460 url,
461 params,
462 Some(headers),
463 body,
464 None,
465 Some(rate_keys),
466 )
467 .await?;
468
469 tracing::trace!("Response: {resp:?}");
470
471 if resp.status.is_success() {
472 let okx_response: OKXResponse<T> =
473 serde_json::from_slice(&resp.body).map_err(|e| {
474 tracing::error!("Failed to deserialize OKXResponse: {e}");
475 OKXHttpError::JsonError(e.to_string())
476 })?;
477
478 if okx_response.code != OKX_SUCCESS_CODE {
479 return Err(OKXHttpError::OkxError {
480 error_code: okx_response.code,
481 message: okx_response.msg,
482 });
483 }
484
485 Ok(okx_response.data)
486 } else {
487 let error_body = String::from_utf8_lossy(&resp.body);
488 if resp.status.as_u16() == StatusCode::NOT_FOUND.as_u16() {
489 tracing::debug!("HTTP 404 with body: {error_body}");
490 } else {
491 tracing::error!(
492 "HTTP error {} with body: {error_body}",
493 resp.status.as_str()
494 );
495 }
496
497 if let Ok(parsed_error) = serde_json::from_slice::<OKXResponse<T>>(&resp.body) {
498 return Err(OKXHttpError::OkxError {
499 error_code: parsed_error.code,
500 message: parsed_error.msg,
501 });
502 }
503
504 Err(OKXHttpError::UnexpectedStatus {
505 status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
506 body: error_body.to_string(),
507 })
508 }
509 }
510 };
511
512 let should_retry = |error: &OKXHttpError| -> bool {
521 match error {
522 OKXHttpError::HttpClientError(_) => true,
523 OKXHttpError::UnexpectedStatus { status, .. } => {
524 status.as_u16() >= 500 || status.as_u16() == 429
525 }
526 OKXHttpError::OkxError { error_code, .. } => should_retry_error_code(error_code),
527 _ => false,
528 }
529 };
530
531 let create_error = |msg: String| -> OKXHttpError {
532 if msg == "canceled" {
533 OKXHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
534 } else {
535 OKXHttpError::ValidationError(msg)
536 }
537 };
538
539 self.retry_manager
540 .execute_with_retry_with_cancel(
541 endpoint.as_str(),
542 operation,
543 should_retry,
544 create_error,
545 &self.cancellation_token,
546 )
547 .await
548 }
549
550 pub async fn set_position_mode(
561 &self,
562 params: SetPositionModeParams,
563 ) -> Result<Vec<serde_json::Value>, OKXHttpError> {
564 let path = "/api/v5/account/set-position-mode";
565 let body = serde_json::to_vec(¶ms)?;
566 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
567 .await
568 }
569
570 pub async fn get_position_tiers(
581 &self,
582 params: GetPositionTiersParams,
583 ) -> Result<Vec<OKXPositionTier>, OKXHttpError> {
584 self.send_request(
585 Method::GET,
586 "/api/v5/public/position-tiers",
587 Some(¶ms),
588 None,
589 false,
590 )
591 .await
592 }
593
594 pub async fn get_instruments(
605 &self,
606 params: GetInstrumentsParams,
607 ) -> Result<Vec<OKXInstrument>, OKXHttpError> {
608 self.send_request(
609 Method::GET,
610 "/api/v5/public/instruments",
611 Some(¶ms),
612 None,
613 false,
614 )
615 .await
616 }
617
618 pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
632 let response: Vec<OKXServerTime> = self
633 .send_request::<_, ()>(Method::GET, "/api/v5/public/time", None, None, false)
634 .await?;
635 response
636 .first()
637 .map(|t| t.ts)
638 .ok_or_else(|| OKXHttpError::JsonError("Empty server time response".to_string()))
639 }
640
641 pub async fn get_mark_price(
655 &self,
656 params: GetMarkPriceParams,
657 ) -> Result<Vec<OKXMarkPrice>, OKXHttpError> {
658 self.send_request(
659 Method::GET,
660 "/api/v5/public/mark-price",
661 Some(¶ms),
662 None,
663 false,
664 )
665 .await
666 }
667
668 pub async fn get_index_tickers(
678 &self,
679 params: GetIndexTickerParams,
680 ) -> Result<Vec<OKXIndexTicker>, OKXHttpError> {
681 self.send_request(
682 Method::GET,
683 "/api/v5/market/index-tickers",
684 Some(¶ms),
685 None,
686 false,
687 )
688 .await
689 }
690
691 pub async fn get_history_trades(
701 &self,
702 params: GetTradesParams,
703 ) -> Result<Vec<OKXTrade>, OKXHttpError> {
704 self.send_request(
705 Method::GET,
706 "/api/v5/market/history-trades",
707 Some(¶ms),
708 None,
709 false,
710 )
711 .await
712 }
713
714 pub async fn get_candles(
724 &self,
725 params: GetCandlesticksParams,
726 ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
727 self.send_request(
728 Method::GET,
729 "/api/v5/market/candles",
730 Some(¶ms),
731 None,
732 false,
733 )
734 .await
735 }
736
737 pub async fn get_history_candles(
747 &self,
748 params: GetCandlesticksParams,
749 ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
750 self.send_request(
751 Method::GET,
752 "/api/v5/market/history-candles",
753 Some(¶ms),
754 None,
755 false,
756 )
757 .await
758 }
759
760 pub async fn get_balance(&self) -> Result<Vec<OKXAccount>, OKXHttpError> {
771 let path = "/api/v5/account/balance";
772 self.send_request::<_, ()>(Method::GET, path, None, None, true)
773 .await
774 }
775
776 pub async fn get_trade_fee(
788 &self,
789 params: GetTradeFeeParams,
790 ) -> Result<Vec<OKXFeeRate>, OKXHttpError> {
791 self.send_request(
792 Method::GET,
793 "/api/v5/account/trade-fee",
794 Some(¶ms),
795 None,
796 true,
797 )
798 .await
799 }
800
801 pub async fn get_order(
811 &self,
812 params: GetOrderParams,
813 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
814 self.send_request(
815 Method::GET,
816 "/api/v5/trade/order",
817 Some(¶ms),
818 None,
819 true,
820 )
821 .await
822 }
823
824 pub async fn get_orders_pending(
834 &self,
835 params: GetOrderListParams,
836 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
837 self.send_request(
838 Method::GET,
839 "/api/v5/trade/orders-pending",
840 Some(¶ms),
841 None,
842 true,
843 )
844 .await
845 }
846
847 pub async fn get_orders_history(
857 &self,
858 params: GetOrderHistoryParams,
859 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
860 self.send_request(
861 Method::GET,
862 "/api/v5/trade/orders-history",
863 Some(¶ms),
864 None,
865 true,
866 )
867 .await
868 }
869
870 pub async fn get_order_algo_pending(
876 &self,
877 params: GetAlgoOrdersParams,
878 ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
879 self.send_request(
880 Method::GET,
881 "/api/v5/trade/order-algo-pending",
882 Some(¶ms),
883 None,
884 true,
885 )
886 .await
887 }
888
889 pub async fn get_order_algo_history(
895 &self,
896 params: GetAlgoOrdersParams,
897 ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
898 self.send_request(
899 Method::GET,
900 "/api/v5/trade/order-algo-history",
901 Some(¶ms),
902 None,
903 true,
904 )
905 .await
906 }
907
908 pub async fn get_fills(
918 &self,
919 params: GetTransactionDetailsParams,
920 ) -> Result<Vec<OKXTransactionDetail>, OKXHttpError> {
921 self.send_request(
922 Method::GET,
923 "/api/v5/trade/fills",
924 Some(¶ms),
925 None,
926 true,
927 )
928 .await
929 }
930
931 pub async fn get_positions(
943 &self,
944 params: GetPositionsParams,
945 ) -> Result<Vec<OKXPosition>, OKXHttpError> {
946 self.send_request(
947 Method::GET,
948 "/api/v5/account/positions",
949 Some(¶ms),
950 None,
951 true,
952 )
953 .await
954 }
955
956 pub async fn get_positions_history(
966 &self,
967 params: GetPositionsHistoryParams,
968 ) -> Result<Vec<OKXPositionHistory>, OKXHttpError> {
969 self.send_request(
970 Method::GET,
971 "/api/v5/account/positions-history",
972 Some(¶ms),
973 None,
974 true,
975 )
976 .await
977 }
978}
979
980#[derive(Debug)]
985#[cfg_attr(
986 feature = "python",
987 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
988)]
989pub struct OKXHttpClient {
990 pub(crate) inner: Arc<OKXRawHttpClient>,
991 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
992 cache_initialized: AtomicBool,
993}
994
995impl Clone for OKXHttpClient {
996 fn clone(&self) -> Self {
997 let cache_initialized = AtomicBool::new(false);
998
999 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
1000 if is_initialized {
1001 cache_initialized.store(true, Ordering::Release);
1002 }
1003
1004 Self {
1005 inner: self.inner.clone(),
1006 instruments_cache: self.instruments_cache.clone(),
1007 cache_initialized,
1008 }
1009 }
1010}
1011
1012impl Default for OKXHttpClient {
1013 fn default() -> Self {
1014 Self::new(None, Some(60), None, None, None, false, None)
1015 .expect("Failed to create default OKXHttpClient")
1016 }
1017}
1018
1019impl OKXHttpClient {
1020 pub fn new(
1030 base_url: Option<String>,
1031 timeout_secs: Option<u64>,
1032 max_retries: Option<u32>,
1033 retry_delay_ms: Option<u64>,
1034 retry_delay_max_ms: Option<u64>,
1035 is_demo: bool,
1036 proxy_url: Option<String>,
1037 ) -> anyhow::Result<Self> {
1038 Ok(Self {
1039 inner: Arc::new(OKXRawHttpClient::new(
1040 base_url,
1041 timeout_secs,
1042 max_retries,
1043 retry_delay_ms,
1044 retry_delay_max_ms,
1045 is_demo,
1046 proxy_url,
1047 )?),
1048 instruments_cache: Arc::new(DashMap::new()),
1049 cache_initialized: AtomicBool::new(false),
1050 })
1051 }
1052
1053 fn generate_ts_init(&self) -> UnixNanos {
1055 get_atomic_clock_realtime().get_time_ns()
1056 }
1057
1058 pub fn from_env() -> anyhow::Result<Self> {
1065 Self::with_credentials(None, None, None, None, None, None, None, None, false, None)
1066 }
1067
1068 #[allow(clippy::too_many_arguments)]
1075 pub fn with_credentials(
1076 api_key: Option<String>,
1077 api_secret: Option<String>,
1078 api_passphrase: Option<String>,
1079 base_url: Option<String>,
1080 timeout_secs: Option<u64>,
1081 max_retries: Option<u32>,
1082 retry_delay_ms: Option<u64>,
1083 retry_delay_max_ms: Option<u64>,
1084 is_demo: bool,
1085 proxy_url: Option<String>,
1086 ) -> anyhow::Result<Self> {
1087 let api_key = get_or_env_var(api_key, "OKX_API_KEY")?;
1088 let api_secret = get_or_env_var(api_secret, "OKX_API_SECRET")?;
1089 let api_passphrase = get_or_env_var(api_passphrase, "OKX_API_PASSPHRASE")?;
1090 let base_url = base_url.unwrap_or(OKX_HTTP_URL.to_string());
1091
1092 Ok(Self {
1093 inner: Arc::new(OKXRawHttpClient::with_credentials(
1094 api_key,
1095 api_secret,
1096 api_passphrase,
1097 base_url,
1098 timeout_secs,
1099 max_retries,
1100 retry_delay_ms,
1101 retry_delay_max_ms,
1102 is_demo,
1103 proxy_url,
1104 )?),
1105 instruments_cache: Arc::new(DashMap::new()),
1106 cache_initialized: AtomicBool::new(false),
1107 })
1108 }
1109
1110 fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1116 self.instruments_cache
1117 .get(&symbol)
1118 .map(|entry| entry.value().clone())
1119 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
1120 }
1121
1122 pub fn cancel_all_requests(&self) {
1124 self.inner.cancel_all_requests();
1125 }
1126
1127 pub fn cancellation_token(&self) -> &CancellationToken {
1129 self.inner.cancellation_token()
1130 }
1131
1132 pub fn base_url(&self) -> &str {
1134 self.inner.base_url.as_str()
1135 }
1136
1137 pub fn api_key(&self) -> Option<&str> {
1139 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
1140 }
1141
1142 #[must_use]
1144 pub fn api_key_masked(&self) -> Option<String> {
1145 self.inner.credential.as_ref().map(|c| c.api_key_masked())
1146 }
1147
1148 #[must_use]
1150 pub fn is_demo(&self) -> bool {
1151 self.inner.is_demo
1152 }
1153
1154 pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
1162 self.inner.get_server_time().await
1163 }
1164
1165 #[must_use]
1169 pub fn is_initialized(&self) -> bool {
1170 self.cache_initialized.load(Ordering::Acquire)
1171 }
1172
1173 #[must_use]
1176 pub fn get_cached_symbols(&self) -> Vec<String> {
1177 self.instruments_cache
1178 .iter()
1179 .map(|entry| entry.key().to_string())
1180 .collect()
1181 }
1182
1183 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1187 for inst in instruments {
1188 self.instruments_cache
1189 .insert(inst.raw_symbol().inner(), inst);
1190 }
1191 self.cache_initialized.store(true, Ordering::Release);
1192 }
1193
1194 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1198 self.instruments_cache
1199 .insert(instrument.raw_symbol().inner(), instrument);
1200 self.cache_initialized.store(true, Ordering::Release);
1201 }
1202
1203 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1205 self.instruments_cache
1206 .get(symbol)
1207 .map(|entry| entry.value().clone())
1208 }
1209
1210 pub async fn request_account_state(
1216 &self,
1217 account_id: AccountId,
1218 ) -> anyhow::Result<AccountState> {
1219 let resp = self
1220 .inner
1221 .get_balance()
1222 .await
1223 .map_err(|e| anyhow::anyhow!(e))?;
1224
1225 let ts_init = self.generate_ts_init();
1226 let raw = resp
1227 .first()
1228 .ok_or_else(|| anyhow::anyhow!("No account state returned from OKX"))?;
1229 let account_state = parse_account_state(raw, account_id, ts_init)?;
1230
1231 Ok(account_state)
1232 }
1233
1234 pub async fn set_position_mode(&self, position_mode: OKXPositionMode) -> anyhow::Result<()> {
1247 let mut params = SetPositionModeParamsBuilder::default();
1248 params.pos_mode(position_mode);
1249 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1250
1251 match self.inner.set_position_mode(params).await {
1252 Ok(_) => Ok(()),
1253 Err(e) => {
1254 if let OKXHttpError::OkxError {
1255 error_code,
1256 message,
1257 } = &e
1258 && error_code == "50115"
1259 {
1260 tracing::warn!(
1261 "Account does not support position mode setting (derivatives trading not enabled): {message}"
1262 );
1263 return Ok(()); }
1265 anyhow::bail!(e)
1266 }
1267 }
1268 }
1269
1270 pub async fn request_instruments(
1276 &self,
1277 instrument_type: OKXInstrumentType,
1278 instrument_family: Option<String>,
1279 ) -> anyhow::Result<Vec<InstrumentAny>> {
1280 let mut params = GetInstrumentsParamsBuilder::default();
1281 params.inst_type(instrument_type);
1282
1283 if let Some(family) = instrument_family.clone() {
1284 params.inst_family(family);
1285 }
1286
1287 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1288
1289 let resp = self
1290 .inner
1291 .get_instruments(params)
1292 .await
1293 .map_err(|e| anyhow::anyhow!(e))?;
1294
1295 let fee_rate_opt = {
1296 let fee_params = GetTradeFeeParams {
1297 inst_type: instrument_type,
1298 uly: None,
1299 inst_family: instrument_family,
1300 };
1301
1302 match self.inner.get_trade_fee(fee_params).await {
1303 Ok(rates) => rates.into_iter().next(),
1304 Err(OKXHttpError::MissingCredentials) => {
1305 tracing::debug!("Missing credentials for fee rates, using None");
1306 None
1307 }
1308 Err(e) => {
1309 tracing::warn!("Failed to fetch fee rates for {instrument_type}: {e}");
1310 None
1311 }
1312 }
1313 };
1314
1315 let ts_init = self.generate_ts_init();
1316
1317 let mut instruments: Vec<InstrumentAny> = Vec::new();
1318 for inst in &resp {
1319 if inst.state == OKXInstrumentStatus::Preopen {
1322 continue;
1323 }
1324
1325 let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1327 let is_usdt_margined = inst.ct_type == OKXContractType::Linear;
1328 let (maker_str, taker_str) = if is_usdt_margined {
1329 (&fee_rate.maker_u, &fee_rate.taker_u)
1330 } else {
1331 (&fee_rate.maker, &fee_rate.taker)
1332 };
1333
1334 let maker = if !maker_str.is_empty() {
1335 Decimal::from_str(maker_str).ok()
1336 } else {
1337 None
1338 };
1339 let taker = if !taker_str.is_empty() {
1340 Decimal::from_str(taker_str).ok()
1341 } else {
1342 None
1343 };
1344
1345 (maker, taker)
1346 } else {
1347 (None, None)
1348 };
1349
1350 match parse_instrument_any(inst, None, None, maker_fee, taker_fee, ts_init) {
1351 Ok(Some(instrument_any)) => {
1352 instruments.push(instrument_any);
1353 }
1354 Ok(None) => {
1355 }
1357 Err(e) => {
1358 tracing::warn!("Failed to parse instrument {}: {e}", inst.inst_id);
1359 }
1360 }
1361 }
1362
1363 Ok(instruments)
1364 }
1365
1366 pub async fn request_instrument(
1377 &self,
1378 instrument_id: InstrumentId,
1379 ) -> anyhow::Result<InstrumentAny> {
1380 let symbol = instrument_id.symbol.as_str();
1381 let instrument_type = okx_instrument_type_from_symbol(symbol);
1382
1383 let mut params = GetInstrumentsParamsBuilder::default();
1384 params.inst_type(instrument_type);
1385 params.inst_id(symbol);
1386
1387 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1388
1389 let resp = self
1390 .inner
1391 .get_instruments(params)
1392 .await
1393 .map_err(|e| anyhow::anyhow!(e))?;
1394
1395 let raw_inst = resp
1396 .first()
1397 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found"))?;
1398
1399 if raw_inst.state == OKXInstrumentStatus::Preopen {
1401 anyhow::bail!("Instrument {symbol} is in pre-open state");
1402 }
1403
1404 let fee_rate_opt = {
1405 let fee_params = GetTradeFeeParams {
1406 inst_type: instrument_type,
1407 uly: None,
1408 inst_family: None,
1409 };
1410
1411 match self.inner.get_trade_fee(fee_params).await {
1412 Ok(rates) => rates.into_iter().next(),
1413 Err(OKXHttpError::MissingCredentials) => {
1414 tracing::debug!("Missing credentials for fee rates, using None");
1415 None
1416 }
1417 Err(e) => {
1418 tracing::warn!("Failed to fetch fee rates for {symbol}: {e}");
1419 None
1420 }
1421 }
1422 };
1423
1424 let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1425 let is_usdt_margined = raw_inst.ct_type == OKXContractType::Linear;
1426 let (maker_str, taker_str) = if is_usdt_margined {
1427 (&fee_rate.maker_u, &fee_rate.taker_u)
1428 } else {
1429 (&fee_rate.maker, &fee_rate.taker)
1430 };
1431
1432 let maker = if !maker_str.is_empty() {
1433 Decimal::from_str(maker_str).ok()
1434 } else {
1435 None
1436 };
1437 let taker = if !taker_str.is_empty() {
1438 Decimal::from_str(taker_str).ok()
1439 } else {
1440 None
1441 };
1442
1443 (maker, taker)
1444 } else {
1445 (None, None)
1446 };
1447
1448 let ts_init = self.generate_ts_init();
1449 let instrument = parse_instrument_any(raw_inst, None, None, maker_fee, taker_fee, ts_init)?
1450 .ok_or_else(|| anyhow::anyhow!("Unsupported instrument type for {symbol}"))?;
1451
1452 self.cache_instrument(instrument.clone());
1453
1454 Ok(instrument)
1455 }
1456
1457 pub async fn request_mark_price(
1463 &self,
1464 instrument_id: InstrumentId,
1465 ) -> anyhow::Result<MarkPriceUpdate> {
1466 let mut params = GetMarkPriceParamsBuilder::default();
1467 params.inst_id(instrument_id.symbol.inner());
1468 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1469
1470 let resp = self
1471 .inner
1472 .get_mark_price(params)
1473 .await
1474 .map_err(|e| anyhow::anyhow!(e))?;
1475
1476 let raw = resp
1477 .first()
1478 .ok_or_else(|| anyhow::anyhow!("No mark price returned from OKX"))?;
1479 let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1480 let ts_init = self.generate_ts_init();
1481
1482 let mark_price =
1483 parse_mark_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1484 .map_err(|e| anyhow::anyhow!(e))?;
1485 Ok(mark_price)
1486 }
1487
1488 pub async fn request_index_price(
1494 &self,
1495 instrument_id: InstrumentId,
1496 ) -> anyhow::Result<IndexPriceUpdate> {
1497 let mut params = GetIndexTickerParamsBuilder::default();
1498 params.inst_id(instrument_id.symbol.inner());
1499 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1500
1501 let resp = self
1502 .inner
1503 .get_index_tickers(params)
1504 .await
1505 .map_err(|e| anyhow::anyhow!(e))?;
1506
1507 let raw = resp
1508 .first()
1509 .ok_or_else(|| anyhow::anyhow!("No index price returned from OKX"))?;
1510 let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1511 let ts_init = self.generate_ts_init();
1512
1513 let index_price =
1514 parse_index_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1515 .map_err(|e| anyhow::anyhow!(e))?;
1516 Ok(index_price)
1517 }
1518
1519 pub async fn request_trades(
1529 &self,
1530 instrument_id: InstrumentId,
1531 start: Option<DateTime<Utc>>,
1532 end: Option<DateTime<Utc>>,
1533 limit: Option<u32>,
1534 ) -> anyhow::Result<Vec<TradeTick>> {
1535 const OKX_TRADES_MAX_LIMIT: u32 = 100;
1536
1537 let limit = if limit == Some(0) { None } else { limit };
1538
1539 if let (Some(s), Some(e)) = (start, end) {
1540 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1541 }
1542
1543 let now = Utc::now();
1544
1545 if let Some(s) = start
1546 && s > now
1547 {
1548 return Ok(Vec::new());
1549 }
1550
1551 let end = if let Some(e) = end
1552 && e > now
1553 {
1554 Some(now)
1555 } else {
1556 end
1557 };
1558
1559 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1560 enum Mode {
1561 Latest,
1562 Backward,
1563 Range,
1564 }
1565
1566 let mode = match (start, end) {
1567 (None, None) => Mode::Latest,
1568 (Some(_), None) => Mode::Backward,
1569 (None, Some(_)) => Mode::Backward,
1570 (Some(_), Some(_)) => Mode::Range,
1571 };
1572
1573 let start_ms = start.map(|s| s.timestamp_millis());
1574 let end_ms = end.map(|e| e.timestamp_millis());
1575
1576 let ts_init = self.generate_ts_init();
1577 let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1578
1579 if matches!(mode, Mode::Backward | Mode::Range) {
1582 let mut before_trade_id: Option<String> = None;
1583 let mut pages = 0usize;
1584 let mut page_results: Vec<Vec<TradeTick>> = Vec::new();
1585 let mut seen_trades: std::collections::HashSet<(String, i64)> =
1586 std::collections::HashSet::new();
1587 let mut unique_count = 0usize;
1588 let mut consecutive_empty_pages = 0usize;
1589 const MAX_PAGES: usize = 500;
1590 const MAX_CONSECUTIVE_EMPTY: usize = 3;
1591
1592 let effective_limit = if start.is_some() {
1595 limit.unwrap_or(u32::MAX)
1596 } else {
1597 limit.unwrap_or(OKX_TRADES_MAX_LIMIT)
1598 };
1599
1600 tracing::debug!(
1601 "Starting trades pagination: mode={:?}, start={:?}, end={:?}, limit={:?}, effective_limit={}",
1602 mode,
1603 start,
1604 end,
1605 limit,
1606 effective_limit
1607 );
1608
1609 loop {
1610 if pages >= MAX_PAGES {
1611 tracing::warn!("Hit MAX_PAGES limit of {}", MAX_PAGES);
1612 break;
1613 }
1614
1615 if effective_limit < u32::MAX && unique_count >= effective_limit as usize {
1616 tracing::debug!("Reached effective limit: unique_count={}", unique_count);
1617 break;
1618 }
1619
1620 let remaining = (effective_limit as usize).saturating_sub(unique_count);
1621 let page_cap = remaining.min(OKX_TRADES_MAX_LIMIT as usize) as u32;
1622
1623 tracing::debug!(
1624 "Requesting page {}: before_id={:?}, page_cap={}, unique_count={}",
1625 pages + 1,
1626 before_trade_id,
1627 page_cap,
1628 unique_count
1629 );
1630
1631 let mut params_builder = GetTradesParamsBuilder::default();
1632 params_builder
1633 .inst_id(instrument_id.symbol.inner())
1634 .limit(page_cap)
1635 .pagination_type(1);
1636
1637 if let Some(ref before_id) = before_trade_id {
1639 params_builder.after(before_id.clone());
1640 }
1641
1642 let params = params_builder.build().map_err(anyhow::Error::new)?;
1643 let raw = self
1644 .inner
1645 .get_history_trades(params)
1646 .await
1647 .map_err(anyhow::Error::new)?;
1648
1649 tracing::debug!("Received {} raw trades from API", raw.len());
1650
1651 if !raw.is_empty() {
1652 let first_id = &raw.first().unwrap().trade_id;
1653 let last_id = &raw.last().unwrap().trade_id;
1654 tracing::debug!(
1655 "Raw response trade ID range: first={} (newest), last={} (oldest)",
1656 first_id,
1657 last_id
1658 );
1659 }
1660
1661 if raw.is_empty() {
1662 tracing::debug!("API returned empty page, stopping pagination");
1663 break;
1664 }
1665
1666 pages += 1;
1667
1668 let mut page_trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
1669 let mut hit_start_boundary = false;
1670 let mut filtered_out = 0usize;
1671 let mut duplicates = 0usize;
1672
1673 for r in &raw {
1674 match parse_trade_tick(
1675 r,
1676 instrument_id,
1677 inst.price_precision(),
1678 inst.size_precision(),
1679 ts_init,
1680 ) {
1681 Ok(trade) => {
1682 let ts_ms = trade.ts_event.as_i64() / 1_000_000;
1683
1684 if let Some(e_ms) = end_ms
1685 && ts_ms > e_ms
1686 {
1687 filtered_out += 1;
1688 continue;
1689 }
1690
1691 if let Some(s_ms) = start_ms
1692 && ts_ms < s_ms
1693 {
1694 hit_start_boundary = true;
1695 filtered_out += 1;
1696 break;
1697 }
1698
1699 let trade_key = (trade.trade_id.to_string(), trade.ts_event.as_i64());
1700 if seen_trades.insert(trade_key) {
1701 unique_count += 1;
1702 page_trades.push(trade);
1703 } else {
1704 duplicates += 1;
1705 }
1706 }
1707 Err(e) => tracing::error!("{e}"),
1708 }
1709 }
1710
1711 tracing::debug!(
1712 "Page {} processed: {} trades kept, {} filtered out, {} duplicates, hit_start_boundary={}",
1713 pages,
1714 page_trades.len(),
1715 filtered_out,
1716 duplicates,
1717 hit_start_boundary
1718 );
1719
1720 let oldest_trade_id = if !page_trades.is_empty() {
1722 let oldest_id = page_trades.last().map(|t| {
1724 let id = t.trade_id.to_string();
1725 tracing::debug!(
1726 "Setting cursor from deduplicated trades: oldest_id={}, ts_event={}",
1727 id,
1728 t.ts_event.as_i64()
1729 );
1730 id
1731 });
1732 page_trades.reverse();
1733 page_results.push(page_trades);
1734 consecutive_empty_pages = 0;
1735 oldest_id
1736 } else {
1737 if unique_count > 0 {
1740 consecutive_empty_pages += 1;
1741 if consecutive_empty_pages >= MAX_CONSECUTIVE_EMPTY {
1742 tracing::debug!(
1743 "Stopping: {} consecutive pages with no trades in range after collecting {} trades",
1744 consecutive_empty_pages,
1745 unique_count
1746 );
1747 break;
1748 }
1749 }
1750 raw.last().map(|t| {
1752 let id = t.trade_id.to_string();
1753 tracing::debug!(
1754 "Setting cursor from raw response (no unique trades): oldest_id={}",
1755 id
1756 );
1757 id
1758 })
1759 };
1760
1761 if let Some(ref old_id) = before_trade_id
1762 && oldest_trade_id.as_ref() == Some(old_id)
1763 {
1764 break;
1765 }
1766
1767 if oldest_trade_id.is_none() {
1768 break;
1769 }
1770
1771 before_trade_id = oldest_trade_id;
1772
1773 if hit_start_boundary {
1774 break;
1775 }
1776
1777 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1778 }
1779
1780 tracing::debug!(
1781 "Pagination complete: {} pages, {} unique trades collected",
1782 pages,
1783 unique_count
1784 );
1785
1786 let mut out: Vec<TradeTick> = Vec::new();
1787 for page in page_results.into_iter().rev() {
1788 out.extend(page);
1789 }
1790
1791 let mut dedup_keys = std::collections::HashSet::new();
1793 let pre_dedup_len = out.len();
1794 out.retain(|trade| {
1795 dedup_keys.insert((trade.trade_id.to_string(), trade.ts_event.as_i64()))
1796 });
1797
1798 if out.len() < pre_dedup_len {
1799 tracing::debug!(
1800 "Removed {} duplicate trades during final dedup",
1801 pre_dedup_len - out.len()
1802 );
1803 }
1804
1805 if let Some(lim) = limit
1806 && lim > 0
1807 && out.len() > lim as usize
1808 {
1809 let excess = out.len() - lim as usize;
1810 tracing::debug!("Trimming {} oldest trades to respect limit={}", excess, lim);
1811 out.drain(0..excess);
1812 }
1813
1814 tracing::debug!("Returning {} trades", out.len());
1815 return Ok(out);
1816 }
1817
1818 let req_limit = limit
1819 .unwrap_or(OKX_TRADES_MAX_LIMIT)
1820 .min(OKX_TRADES_MAX_LIMIT);
1821 let params = GetTradesParamsBuilder::default()
1822 .inst_id(instrument_id.symbol.inner())
1823 .limit(req_limit)
1824 .build()
1825 .map_err(anyhow::Error::new)?;
1826
1827 let raw = self
1828 .inner
1829 .get_history_trades(params)
1830 .await
1831 .map_err(anyhow::Error::new)?;
1832
1833 let mut trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
1834 for r in &raw {
1835 match parse_trade_tick(
1836 r,
1837 instrument_id,
1838 inst.price_precision(),
1839 inst.size_precision(),
1840 ts_init,
1841 ) {
1842 Ok(trade) => trades.push(trade),
1843 Err(e) => tracing::error!("{e}"),
1844 }
1845 }
1846
1847 trades.reverse();
1849
1850 if let Some(lim) = limit
1851 && lim > 0
1852 && trades.len() > lim as usize
1853 {
1854 trades.drain(0..trades.len() - lim as usize);
1855 }
1856
1857 Ok(trades)
1858 }
1859
1860 pub async fn request_bars(
1905 &self,
1906 bar_type: BarType,
1907 start: Option<DateTime<Utc>>,
1908 mut end: Option<DateTime<Utc>>,
1909 limit: Option<u32>,
1910 ) -> anyhow::Result<Vec<Bar>> {
1911 const HISTORY_SPLIT_DAYS: i64 = 100;
1912 const MAX_PAGES_SOFT: usize = 500;
1913
1914 let limit = if limit == Some(0) { None } else { limit };
1915
1916 anyhow::ensure!(
1917 bar_type.aggregation_source() == AggregationSource::External,
1918 "Only EXTERNAL aggregation is supported"
1919 );
1920
1921 if let (Some(s), Some(e)) = (start, end) {
1922 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1923 }
1924
1925 let now = Utc::now();
1926
1927 if let Some(s) = start
1928 && s > now
1929 {
1930 return Ok(Vec::new());
1931 }
1932 if let Some(e) = end
1933 && e > now
1934 {
1935 end = Some(now);
1936 }
1937
1938 let spec = bar_type.spec();
1939 let step = spec.step.get();
1940 let bar_param = match spec.aggregation {
1941 BarAggregation::Second => format!("{step}s"),
1942 BarAggregation::Minute => format!("{step}m"),
1943 BarAggregation::Hour => format!("{step}H"),
1944 BarAggregation::Day => format!("{step}D"),
1945 BarAggregation::Week => format!("{step}W"),
1946 BarAggregation::Month => format!("{step}M"),
1947 a => anyhow::bail!("OKX does not support {a:?} aggregation"),
1948 };
1949
1950 let slot_ms: i64 = match spec.aggregation {
1951 BarAggregation::Second => (step as i64) * 1_000,
1952 BarAggregation::Minute => (step as i64) * 60_000,
1953 BarAggregation::Hour => (step as i64) * 3_600_000,
1954 BarAggregation::Day => (step as i64) * 86_400_000,
1955 BarAggregation::Week => (step as i64) * 7 * 86_400_000,
1956 BarAggregation::Month => (step as i64) * 30 * 86_400_000,
1957 _ => unreachable!("Unsupported aggregation should have been caught above"),
1958 };
1959 let slot_ns: i64 = slot_ms * 1_000_000;
1960
1961 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1962 enum Mode {
1963 Latest,
1964 Backward,
1965 Range,
1966 }
1967
1968 let mode = match (start, end) {
1969 (None, None) => Mode::Latest,
1970 (Some(_), None) => Mode::Backward, (None, Some(_)) => Mode::Backward,
1972 (Some(_), Some(_)) => Mode::Range,
1973 };
1974
1975 let start_ns = start.and_then(|s| s.timestamp_nanos_opt());
1976 let end_ns = end.and_then(|e| e.timestamp_nanos_opt());
1977
1978 let start_ms = start.map(|s| {
1980 let ms = s.timestamp_millis();
1981 if slot_ms > 0 {
1982 (ms / slot_ms) * slot_ms } else {
1984 ms
1985 }
1986 });
1987 let end_ms = end.map(|e| {
1988 let ms = e.timestamp_millis();
1989 if slot_ms > 0 {
1990 ((ms + slot_ms - 1) / slot_ms) * slot_ms } else {
1992 ms
1993 }
1994 });
1995 let now_ms = now.timestamp_millis();
1996
1997 let symbol = bar_type.instrument_id().symbol;
1998 let inst = self.instrument_from_cache(symbol.inner())?;
1999
2000 let mut out: Vec<Bar> = Vec::new();
2001 let mut pages = 0usize;
2002
2003 let mut after_ms: Option<i64> = match mode {
2008 Mode::Range => end_ms.or(Some(now_ms)), _ => None,
2010 };
2011 let mut before_ms: Option<i64> = match mode {
2012 Mode::Backward => end_ms.map(|v| v.saturating_sub(1)),
2013 Mode::Range => start_ms, Mode::Latest => None,
2015 };
2016
2017 let mut forward_prepend_mode = matches!(mode, Mode::Range);
2019
2020 if matches!(mode, Mode::Backward | Mode::Range)
2024 && let Some(b) = before_ms
2025 {
2026 let buffer_ms = slot_ms.max(60_000); if b >= now_ms.saturating_sub(buffer_ms) {
2032 before_ms = Some(now_ms.saturating_sub(buffer_ms));
2033 }
2034 }
2035
2036 let mut have_latest_first_page = false;
2037 let mut progressless_loops = 0u8;
2038
2039 loop {
2040 if let Some(lim) = limit
2041 && lim > 0
2042 && out.len() >= lim as usize
2043 {
2044 break;
2045 }
2046 if pages >= MAX_PAGES_SOFT {
2047 break;
2048 }
2049
2050 let pivot_ms = if let Some(a) = after_ms {
2051 a
2052 } else if let Some(b) = before_ms {
2053 b
2054 } else {
2055 now_ms
2056 };
2057 let age_ms = now_ms.saturating_sub(pivot_ms);
2063 let age_hours = age_ms / (60 * 60 * 1000);
2064 let using_history = age_hours > 1; let page_ceiling = if using_history { 100 } else { 300 };
2067 let remaining = limit
2068 .filter(|&l| l > 0) .map_or(page_ceiling, |l| (l as usize).saturating_sub(out.len()));
2070 let page_cap = remaining.min(page_ceiling);
2071
2072 let mut p = GetCandlesticksParamsBuilder::default();
2073 p.inst_id(symbol.as_str())
2074 .bar(&bar_param)
2075 .limit(page_cap as u32);
2076
2077 let mut req_used_before = false;
2079
2080 match mode {
2081 Mode::Latest => {
2082 if have_latest_first_page && let Some(b) = before_ms {
2083 p.before_ms(b);
2084 req_used_before = true;
2085 }
2086 }
2087 Mode::Backward => {
2088 if let Some(b) = before_ms {
2090 p.after_ms(b);
2091 }
2092 }
2093 Mode::Range => {
2094 if let Some(a) = after_ms {
2097 p.after_ms(a);
2098 }
2099 if let Some(b) = before_ms {
2100 p.before_ms(b);
2101 req_used_before = true;
2102 }
2103 }
2104 }
2105
2106 let params = p.build().map_err(anyhow::Error::new)?;
2107
2108 let mut raw = if using_history {
2109 self.inner
2110 .get_history_candles(params.clone())
2111 .await
2112 .map_err(anyhow::Error::new)?
2113 } else {
2114 self.inner
2115 .get_candles(params.clone())
2116 .await
2117 .map_err(anyhow::Error::new)?
2118 };
2119
2120 if raw.is_empty() {
2122 if matches!(mode, Mode::Latest)
2124 && have_latest_first_page
2125 && !using_history
2126 && let Some(b) = before_ms
2127 {
2128 let mut p2 = GetCandlesticksParamsBuilder::default();
2129 p2.inst_id(symbol.as_str())
2130 .bar(&bar_param)
2131 .limit(page_cap as u32);
2132 p2.before_ms(b);
2133 let params2 = p2.build().map_err(anyhow::Error::new)?;
2134 let raw2 = self
2135 .inner
2136 .get_history_candles(params2)
2137 .await
2138 .map_err(anyhow::Error::new)?;
2139 if !raw2.is_empty() {
2140 raw = raw2;
2141 } else {
2142 let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2144 before_ms = Some(b.saturating_sub(jump));
2145 progressless_loops = progressless_loops.saturating_add(1);
2146 if progressless_loops >= 3 {
2147 break;
2148 }
2149 continue;
2150 }
2151 }
2152
2153 if raw.is_empty() && matches!(mode, Mode::Range) && pages > 0 {
2157 let backstep_ms = (page_cap as i64).saturating_mul(slot_ms.max(1));
2158 let pivot_back = after_ms.unwrap_or(now_ms).saturating_sub(backstep_ms);
2159
2160 let mut p2 = GetCandlesticksParamsBuilder::default();
2161 p2.inst_id(symbol.as_str())
2162 .bar(&bar_param)
2163 .limit(page_cap as u32)
2164 .before_ms(pivot_back);
2165 let params2 = p2.build().map_err(anyhow::Error::new)?;
2166 let raw2 = if (now_ms.saturating_sub(pivot_back)) / (24 * 60 * 60 * 1000)
2167 > HISTORY_SPLIT_DAYS
2168 {
2169 self.inner.get_history_candles(params2).await
2170 } else {
2171 self.inner.get_candles(params2).await
2172 }
2173 .map_err(anyhow::Error::new)?;
2174 if raw2.is_empty() {
2175 break;
2176 } else {
2177 raw = raw2;
2178 forward_prepend_mode = true;
2179 req_used_before = true;
2180 }
2181 }
2182
2183 if raw.is_empty()
2185 && matches!(mode, Mode::Latest)
2186 && !have_latest_first_page
2187 && !using_history
2188 {
2189 let jump_days_ms = (HISTORY_SPLIT_DAYS + 1) * 86_400_000;
2190 before_ms = Some(now_ms.saturating_sub(jump_days_ms));
2191 have_latest_first_page = true;
2192 continue;
2193 }
2194
2195 if raw.is_empty() {
2197 break;
2198 }
2199 }
2200 pages += 1;
2203
2204 let ts_init = self.generate_ts_init();
2206 let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2207 for r in &raw {
2208 page.push(parse_candlestick(
2209 r,
2210 bar_type,
2211 inst.price_precision(),
2212 inst.size_precision(),
2213 ts_init,
2214 )?);
2215 }
2216 page.reverse();
2217
2218 let page_oldest_ms = page.first().map(|b| b.ts_event.as_i64() / 1_000_000);
2219 let page_newest_ms = page.last().map(|b| b.ts_event.as_i64() / 1_000_000);
2220
2221 let mut filtered: Vec<Bar> = if matches!(mode, Mode::Range)
2225 && out.is_empty()
2226 && pages < 2
2227 {
2228 let tolerance_ns = slot_ns * 2; if !page.is_empty() {
2235 tracing::debug!(
2236 "Range mode bootstrap page: {} bars from {} to {}, filtering with start={:?} end={:?}",
2237 page.len(),
2238 page.first().unwrap().ts_event.as_i64() / 1_000_000,
2239 page.last().unwrap().ts_event.as_i64() / 1_000_000,
2240 start_ms,
2241 end_ms
2242 );
2243 }
2244
2245 let result: Vec<Bar> = page
2246 .clone()
2247 .into_iter()
2248 .filter(|b| {
2249 let ts = b.ts_event.as_i64();
2250 let ok_after =
2252 start_ns.is_none_or(|sns| ts >= sns.saturating_sub(tolerance_ns));
2253 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2254 ok_after && ok_before
2255 })
2256 .collect();
2257
2258 result
2259 } else {
2260 page.clone()
2262 .into_iter()
2263 .filter(|b| {
2264 let ts = b.ts_event.as_i64();
2265 let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2266 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2267 ok_after && ok_before
2268 })
2269 .collect()
2270 };
2271
2272 if !page.is_empty() && filtered.is_empty() {
2273 if matches!(mode, Mode::Range)
2275 && !forward_prepend_mode
2276 && let (Some(newest_ms), Some(start_ms)) = (page_newest_ms, start_ms)
2277 && newest_ms < start_ms.saturating_sub(slot_ms * 2)
2278 {
2279 break;
2281 }
2282 }
2283
2284 let contribution;
2286
2287 if out.is_empty() {
2288 contribution = filtered.len();
2289 out = filtered;
2290 } else {
2291 match mode {
2292 Mode::Backward | Mode::Latest => {
2293 if let Some(first) = out.first() {
2294 filtered.retain(|b| b.ts_event < first.ts_event);
2295 }
2296 contribution = filtered.len();
2297 if contribution != 0 {
2298 let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2299 new_out.extend_from_slice(&filtered);
2300 new_out.extend_from_slice(&out);
2301 out = new_out;
2302 }
2303 }
2304 Mode::Range => {
2305 if forward_prepend_mode || req_used_before {
2306 if let Some(first) = out.first() {
2308 filtered.retain(|b| b.ts_event < first.ts_event);
2309 }
2310 contribution = filtered.len();
2311 if contribution != 0 {
2312 let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2313 new_out.extend_from_slice(&filtered);
2314 new_out.extend_from_slice(&out);
2315 out = new_out;
2316 }
2317 } else {
2318 if let Some(last) = out.last() {
2320 filtered.retain(|b| b.ts_event > last.ts_event);
2321 }
2322 contribution = filtered.len();
2323 out.extend(filtered);
2324 }
2325 }
2326 }
2327 }
2328
2329 if contribution == 0
2331 && matches!(mode, Mode::Latest | Mode::Backward | Mode::Range)
2332 && let Some(b) = before_ms
2333 {
2334 let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2335 let new_b = b.saturating_sub(jump);
2336 if new_b != b {
2337 before_ms = Some(new_b);
2338 }
2339 }
2340
2341 if contribution == 0 {
2342 progressless_loops = progressless_loops.saturating_add(1);
2343 if progressless_loops >= 3 {
2344 break;
2345 }
2346 } else {
2347 progressless_loops = 0;
2348
2349 match mode {
2351 Mode::Latest | Mode::Backward => {
2352 if let Some(oldest) = page_oldest_ms {
2353 before_ms = Some(oldest.saturating_sub(1));
2354 have_latest_first_page = true;
2355 } else {
2356 break;
2357 }
2358 }
2359 Mode::Range => {
2360 if forward_prepend_mode || req_used_before {
2361 if let Some(oldest) = page_oldest_ms {
2362 let jump_back = slot_ms.max(60_000); before_ms = Some(oldest.saturating_sub(jump_back));
2365 after_ms = None;
2366 } else {
2367 break;
2368 }
2369 } else if let Some(newest) = page_newest_ms {
2370 after_ms = Some(newest.saturating_add(1));
2371 before_ms = None;
2372 } else {
2373 break;
2374 }
2375 }
2376 }
2377 }
2378
2379 if let Some(lim) = limit
2381 && lim > 0
2382 && out.len() >= lim as usize
2383 {
2384 break;
2385 }
2386 if let Some(ens) = end_ns
2387 && let Some(last) = out.last()
2388 && last.ts_event.as_i64() >= ens
2389 {
2390 break;
2391 }
2392 if let Some(sns) = start_ns
2393 && let Some(first) = out.first()
2394 && (matches!(mode, Mode::Backward) || forward_prepend_mode)
2395 && first.ts_event.as_i64() <= sns
2396 {
2397 if matches!(mode, Mode::Range) {
2399 if let Some(ens) = end_ns
2401 && let Some(last) = out.last()
2402 {
2403 let last_ts = last.ts_event.as_i64();
2404 if last_ts < ens {
2405 forward_prepend_mode = false;
2408 after_ms = Some((last_ts / 1_000_000).saturating_add(1));
2409 before_ms = None;
2410 continue;
2411 }
2412 }
2413 }
2414 break;
2415 }
2416
2417 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2418 }
2419
2420 if out.is_empty() && matches!(mode, Mode::Range) {
2422 let pivot = end_ms.unwrap_or(now_ms.saturating_sub(1));
2423 let hist = (now_ms.saturating_sub(pivot)) / (24 * 60 * 60 * 1000) > HISTORY_SPLIT_DAYS;
2424 let mut p = GetCandlesticksParamsBuilder::default();
2425 p.inst_id(symbol.as_str())
2426 .bar(&bar_param)
2427 .limit(300)
2428 .before_ms(pivot);
2429 let params = p.build().map_err(anyhow::Error::new)?;
2430 let raw = if hist {
2431 self.inner.get_history_candles(params).await
2432 } else {
2433 self.inner.get_candles(params).await
2434 }
2435 .map_err(anyhow::Error::new)?;
2436 if !raw.is_empty() {
2437 let ts_init = self.generate_ts_init();
2438 let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2439 for r in &raw {
2440 page.push(parse_candlestick(
2441 r,
2442 bar_type,
2443 inst.price_precision(),
2444 inst.size_precision(),
2445 ts_init,
2446 )?);
2447 }
2448 page.reverse();
2449 out = page
2450 .into_iter()
2451 .filter(|b| {
2452 let ts = b.ts_event.as_i64();
2453 let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2454 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2455 ok_after && ok_before
2456 })
2457 .collect();
2458 }
2459 }
2460
2461 if let Some(ens) = end_ns {
2463 while out.last().is_some_and(|b| b.ts_event.as_i64() > ens) {
2464 out.pop();
2465 }
2466 }
2467
2468 if matches!(mode, Mode::Range)
2470 && !forward_prepend_mode
2471 && let Some(sns) = start_ns
2472 {
2473 let lower = sns.saturating_sub(slot_ns);
2474 while out.first().is_some_and(|b| b.ts_event.as_i64() < lower) {
2475 out.remove(0);
2476 }
2477 }
2478
2479 if let Some(lim) = limit
2480 && lim > 0
2481 && out.len() > lim as usize
2482 {
2483 out.truncate(lim as usize);
2484 }
2485
2486 Ok(out)
2487 }
2488
2489 #[allow(clippy::too_many_arguments)]
2500 pub async fn request_order_status_reports(
2501 &self,
2502 account_id: AccountId,
2503 instrument_type: Option<OKXInstrumentType>,
2504 instrument_id: Option<InstrumentId>,
2505 start: Option<DateTime<Utc>>,
2506 end: Option<DateTime<Utc>>,
2507 open_only: bool,
2508 limit: Option<u32>,
2509 ) -> anyhow::Result<Vec<OrderStatusReport>> {
2510 let mut history_params = GetOrderHistoryParamsBuilder::default();
2511
2512 let instrument_type = if let Some(instrument_type) = instrument_type {
2513 instrument_type
2514 } else {
2515 let instrument_id = instrument_id.ok_or_else(|| {
2516 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2517 })?;
2518 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2519 okx_instrument_type(&instrument)?
2520 };
2521
2522 history_params.inst_type(instrument_type);
2523
2524 if let Some(instrument_id) = instrument_id.as_ref() {
2525 history_params.inst_id(instrument_id.symbol.inner().to_string());
2526 }
2527
2528 if let Some(limit) = limit {
2529 history_params.limit(limit);
2530 }
2531
2532 let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
2533
2534 let mut pending_params = GetOrderListParamsBuilder::default();
2535 pending_params.inst_type(instrument_type);
2536
2537 if let Some(instrument_id) = instrument_id.as_ref() {
2538 pending_params.inst_id(instrument_id.symbol.inner().to_string());
2539 }
2540
2541 if let Some(limit) = limit {
2542 pending_params.limit(limit);
2543 }
2544
2545 let pending_params = pending_params.build().map_err(|e| anyhow::anyhow!(e))?;
2546
2547 let combined_resp = if open_only {
2548 self.inner
2550 .get_orders_pending(pending_params)
2551 .await
2552 .map_err(|e| anyhow::anyhow!(e))?
2553 } else {
2554 let (history_resp, pending_resp) = tokio::try_join!(
2556 self.inner.get_orders_history(history_params),
2557 self.inner.get_orders_pending(pending_params)
2558 )
2559 .map_err(|e| anyhow::anyhow!(e))?;
2560
2561 let mut combined_resp = history_resp;
2563 combined_resp.extend(pending_resp);
2564 combined_resp
2565 };
2566
2567 let start_ns = start.map(UnixNanos::from);
2569 let end_ns = end.map(UnixNanos::from);
2570
2571 let ts_init = self.generate_ts_init();
2572 let mut reports = Vec::with_capacity(combined_resp.len());
2573
2574 let mut seen: AHashSet<String> = AHashSet::new();
2576
2577 for order in combined_resp {
2578 let seen_key = if !order.cl_ord_id.is_empty() {
2579 order.cl_ord_id.as_str().to_string()
2580 } else if let Some(algo_cl_ord_id) = order
2581 .algo_cl_ord_id
2582 .as_ref()
2583 .filter(|value| !value.as_str().is_empty())
2584 {
2585 algo_cl_ord_id.as_str().to_string()
2586 } else if let Some(algo_id) = order
2587 .algo_id
2588 .as_ref()
2589 .filter(|value| !value.as_str().is_empty())
2590 {
2591 algo_id.as_str().to_string()
2592 } else {
2593 order.ord_id.as_str().to_string()
2594 };
2595
2596 if !seen.insert(seen_key) {
2597 continue; }
2599
2600 let Ok(inst) = self.instrument_from_cache(order.inst_id) else {
2601 tracing::debug!(
2602 symbol = %order.inst_id,
2603 "Skipping order report for instrument not in cache"
2604 );
2605 continue;
2606 };
2607
2608 let report = match parse_order_status_report(
2609 &order,
2610 account_id,
2611 inst.id(),
2612 inst.price_precision(),
2613 inst.size_precision(),
2614 ts_init,
2615 ) {
2616 Ok(report) => report,
2617 Err(e) => {
2618 tracing::error!("Failed to parse order status report: {e}");
2619 continue;
2620 }
2621 };
2622
2623 if let Some(start_ns) = start_ns
2624 && report.ts_last < start_ns
2625 {
2626 continue;
2627 }
2628 if let Some(end_ns) = end_ns
2629 && report.ts_last > end_ns
2630 {
2631 continue;
2632 }
2633
2634 reports.push(report);
2635 }
2636
2637 Ok(reports)
2638 }
2639
2640 pub async fn request_fill_reports(
2650 &self,
2651 account_id: AccountId,
2652 instrument_type: Option<OKXInstrumentType>,
2653 instrument_id: Option<InstrumentId>,
2654 start: Option<DateTime<Utc>>,
2655 end: Option<DateTime<Utc>>,
2656 limit: Option<u32>,
2657 ) -> anyhow::Result<Vec<FillReport>> {
2658 let mut params = GetTransactionDetailsParamsBuilder::default();
2659
2660 let instrument_type = if let Some(instrument_type) = instrument_type {
2661 instrument_type
2662 } else {
2663 let instrument_id = instrument_id.ok_or_else(|| {
2664 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2665 })?;
2666 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2667 okx_instrument_type(&instrument)?
2668 };
2669
2670 params.inst_type(instrument_type);
2671
2672 if let Some(instrument_id) = instrument_id {
2673 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2674 let instrument_type = okx_instrument_type(&instrument)?;
2675 params.inst_type(instrument_type);
2676 params.inst_id(instrument_id.symbol.inner().to_string());
2677 }
2678
2679 if let Some(limit) = limit {
2680 params.limit(limit);
2681 }
2682
2683 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2684
2685 let resp = self
2686 .inner
2687 .get_fills(params)
2688 .await
2689 .map_err(|e| anyhow::anyhow!(e))?;
2690
2691 let start_ns = start.map(UnixNanos::from);
2693 let end_ns = end.map(UnixNanos::from);
2694
2695 let ts_init = self.generate_ts_init();
2696 let mut reports = Vec::with_capacity(resp.len());
2697
2698 for detail in resp {
2699 if detail.fill_sz.is_empty() {
2701 continue;
2702 }
2703 if let Ok(qty) = detail.fill_sz.parse::<f64>() {
2704 if qty <= 0.0 {
2705 continue;
2706 }
2707 } else {
2708 continue;
2710 }
2711
2712 let Ok(inst) = self.instrument_from_cache(detail.inst_id) else {
2713 tracing::debug!(
2714 symbol = %detail.inst_id,
2715 "Skipping fill report for instrument not in cache"
2716 );
2717 continue;
2718 };
2719
2720 let report = match parse_fill_report(
2721 detail,
2722 account_id,
2723 inst.id(),
2724 inst.price_precision(),
2725 inst.size_precision(),
2726 ts_init,
2727 ) {
2728 Ok(report) => report,
2729 Err(e) => {
2730 tracing::error!("Failed to parse fill report: {e}");
2731 continue;
2732 }
2733 };
2734
2735 if let Some(start_ns) = start_ns
2736 && report.ts_event < start_ns
2737 {
2738 continue;
2739 }
2740
2741 if let Some(end_ns) = end_ns
2742 && report.ts_event > end_ns
2743 {
2744 continue;
2745 }
2746
2747 reports.push(report);
2748 }
2749
2750 Ok(reports)
2751 }
2752
2753 pub async fn request_position_status_reports(
2780 &self,
2781 account_id: AccountId,
2782 instrument_type: Option<OKXInstrumentType>,
2783 instrument_id: Option<InstrumentId>,
2784 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2785 let mut params = GetPositionsParamsBuilder::default();
2786
2787 let instrument_type = if let Some(instrument_type) = instrument_type {
2788 instrument_type
2789 } else {
2790 let instrument_id = instrument_id.ok_or_else(|| {
2791 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2792 })?;
2793 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2794 okx_instrument_type(&instrument)?
2795 };
2796
2797 params.inst_type(instrument_type);
2798
2799 instrument_id
2800 .as_ref()
2801 .map(|i| params.inst_id(i.symbol.inner()));
2802
2803 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2804
2805 let resp = self
2806 .inner
2807 .get_positions(params)
2808 .await
2809 .map_err(|e| anyhow::anyhow!(e))?;
2810
2811 let ts_init = self.generate_ts_init();
2812 let mut reports = Vec::with_capacity(resp.len());
2813
2814 for position in resp {
2815 let Ok(inst) = self.instrument_from_cache(position.inst_id) else {
2816 tracing::debug!(
2817 symbol = %position.inst_id,
2818 "Skipping position report for instrument not in cache"
2819 );
2820 continue;
2821 };
2822
2823 match parse_position_status_report(
2824 position,
2825 account_id,
2826 inst.id(),
2827 inst.size_precision(),
2828 ts_init,
2829 ) {
2830 Ok(report) => reports.push(report),
2831 Err(e) => {
2832 tracing::error!("Failed to parse position status report: {e}");
2833 continue;
2834 }
2835 };
2836 }
2837
2838 Ok(reports)
2839 }
2840
2841 pub async fn request_spot_margin_position_reports(
2856 &self,
2857 account_id: AccountId,
2858 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2859 let accounts = self
2860 .inner
2861 .get_balance()
2862 .await
2863 .map_err(|e| anyhow::anyhow!(e))?;
2864
2865 let ts_init = self.generate_ts_init();
2866 let mut reports = Vec::new();
2867
2868 for account in accounts {
2869 for balance in account.details {
2870 let ccy_str = balance.ccy.as_str();
2871
2872 let potential_symbols = [
2874 format!("{ccy_str}-USDT"),
2875 format!("{ccy_str}-USD"),
2876 format!("{ccy_str}-USDC"),
2877 ];
2878
2879 let instrument_result = potential_symbols.iter().find_map(|symbol| {
2880 self.instrument_from_cache(Ustr::from(symbol))
2881 .ok()
2882 .map(|inst| (inst.id(), inst.size_precision()))
2883 });
2884
2885 let (instrument_id, size_precision) = match instrument_result {
2886 Some((id, prec)) => (id, prec),
2887 None => {
2888 tracing::debug!(
2889 "Skipping balance for {} - no matching instrument in cache",
2890 ccy_str
2891 );
2892 continue;
2893 }
2894 };
2895
2896 match parse_spot_margin_position_from_balance(
2897 &balance,
2898 account_id,
2899 instrument_id,
2900 size_precision,
2901 ts_init,
2902 ) {
2903 Ok(Some(report)) => reports.push(report),
2904 Ok(None) => {} Err(e) => {
2906 tracing::error!(
2907 "Failed to parse spot margin position from balance for {}: {e}",
2908 ccy_str
2909 );
2910 continue;
2911 }
2912 };
2913 }
2914 }
2915
2916 Ok(reports)
2917 }
2918
2919 pub async fn place_algo_order(
2929 &self,
2930 request: OKXPlaceAlgoOrderRequest,
2931 ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
2932 let body =
2933 serde_json::to_vec(&request).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2934
2935 let resp: Vec<OKXPlaceAlgoOrderResponse> = self
2936 .inner
2937 .send_request::<_, ()>(
2938 Method::POST,
2939 "/api/v5/trade/order-algo",
2940 None,
2941 Some(body),
2942 true,
2943 )
2944 .await?;
2945
2946 resp.into_iter()
2947 .next()
2948 .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2949 }
2950
2951 pub async fn cancel_algo_order(
2961 &self,
2962 request: OKXCancelAlgoOrderRequest,
2963 ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
2964 let body =
2967 serde_json::to_vec(&[request]).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2968
2969 let resp: Vec<OKXCancelAlgoOrderResponse> = self
2970 .inner
2971 .send_request::<_, ()>(
2972 Method::POST,
2973 "/api/v5/trade/cancel-algos",
2974 None,
2975 Some(body),
2976 true,
2977 )
2978 .await?;
2979
2980 resp.into_iter()
2981 .next()
2982 .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2983 }
2984
2985 #[allow(clippy::too_many_arguments)]
2994 pub async fn place_algo_order_with_domain_types(
2995 &self,
2996 instrument_id: InstrumentId,
2997 td_mode: OKXTradeMode,
2998 client_order_id: ClientOrderId,
2999 order_side: OrderSide,
3000 order_type: OrderType,
3001 quantity: Quantity,
3002 trigger_price: Price,
3003 trigger_type: Option<TriggerType>,
3004 limit_price: Option<Price>,
3005 reduce_only: Option<bool>,
3006 ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
3007 if !matches!(order_side, OrderSide::Buy | OrderSide::Sell) {
3008 return Err(OKXHttpError::ValidationError(
3009 "Invalid order side".to_string(),
3010 ));
3011 }
3012 let okx_side: OKXSide = order_side.into();
3013
3014 let trigger_px_type_enum = trigger_type.map_or(OKXTriggerType::Last, Into::into);
3016
3017 let order_px = if matches!(order_type, OrderType::StopLimit | OrderType::LimitIfTouched) {
3019 limit_price.map(|p| p.to_string())
3020 } else {
3021 Some("-1".to_string())
3023 };
3024
3025 let request = OKXPlaceAlgoOrderRequest {
3026 inst_id: instrument_id.symbol.as_str().to_string(),
3027 td_mode,
3028 side: okx_side,
3029 ord_type: OKXAlgoOrderType::Trigger, sz: quantity.to_string(),
3031 algo_cl_ord_id: Some(client_order_id.as_str().to_string()),
3032 trigger_px: Some(trigger_price.to_string()),
3033 order_px,
3034 trigger_px_type: Some(trigger_px_type_enum),
3035 tgt_ccy: None, pos_side: None, close_position: None,
3038 tag: Some(OKX_NAUTILUS_BROKER_ID.to_string()),
3039 reduce_only,
3040 };
3041
3042 self.place_algo_order(request).await
3043 }
3044
3045 pub async fn cancel_algo_order_with_domain_types(
3054 &self,
3055 instrument_id: InstrumentId,
3056 algo_id: String,
3057 ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
3058 let request = OKXCancelAlgoOrderRequest {
3059 inst_id: instrument_id.symbol.to_string(),
3060 algo_id: Some(algo_id),
3061 algo_cl_ord_id: None,
3062 };
3063
3064 self.cancel_algo_order(request).await
3065 }
3066
3067 #[allow(clippy::too_many_arguments)]
3073 pub async fn request_algo_order_status_reports(
3074 &self,
3075 account_id: AccountId,
3076 instrument_type: Option<OKXInstrumentType>,
3077 instrument_id: Option<InstrumentId>,
3078 algo_id: Option<String>,
3079 algo_client_order_id: Option<ClientOrderId>,
3080 state: Option<OKXOrderStatus>,
3081 limit: Option<u32>,
3082 ) -> anyhow::Result<Vec<OrderStatusReport>> {
3083 let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
3084
3085 let inst_type = if let Some(inst_type) = instrument_type {
3086 inst_type
3087 } else if let Some(inst_id) = instrument_id {
3088 let instrument = self.instrument_from_cache(inst_id.symbol.inner())?;
3089 let inst_type = okx_instrument_type(&instrument)?;
3090 instruments_cache.insert(inst_id.symbol.inner(), instrument);
3091 inst_type
3092 } else {
3093 anyhow::bail!("instrument_type or instrument_id required for algo order query")
3094 };
3095
3096 let mut params_builder = GetAlgoOrdersParamsBuilder::default();
3097 params_builder.inst_type(inst_type);
3098 if let Some(inst_id) = instrument_id {
3099 params_builder.inst_id(inst_id.symbol.inner().to_string());
3100 }
3101 if let Some(algo_id) = algo_id.as_ref() {
3102 params_builder.algo_id(algo_id.clone());
3103 }
3104 if let Some(client_order_id) = algo_client_order_id.as_ref() {
3105 params_builder.algo_cl_ord_id(client_order_id.as_str().to_string());
3106 }
3107 if let Some(state) = state {
3108 params_builder.state(state);
3109 }
3110 if let Some(limit) = limit {
3111 params_builder.limit(limit);
3112 }
3113
3114 let params = params_builder
3115 .build()
3116 .map_err(|e| anyhow::anyhow!(format!("Failed to build algo order params: {e}")))?;
3117
3118 let ts_init = self.generate_ts_init();
3119 let mut reports = Vec::new();
3120 let mut seen: AHashSet<(String, String)> = AHashSet::new();
3121
3122 let pending = match self.inner.get_order_algo_pending(params.clone()).await {
3123 Ok(result) => result,
3124 Err(OKXHttpError::UnexpectedStatus { status, .. })
3125 if status == StatusCode::NOT_FOUND =>
3126 {
3127 Vec::new()
3128 }
3129 Err(e) => return Err(e.into()),
3130 };
3131 self.collect_algo_reports(
3132 account_id,
3133 &pending,
3134 &mut instruments_cache,
3135 ts_init,
3136 &mut seen,
3137 &mut reports,
3138 )
3139 .await?;
3140
3141 let history = match self.inner.get_order_algo_history(params).await {
3142 Ok(result) => result,
3143 Err(OKXHttpError::UnexpectedStatus { status, .. })
3144 if status == StatusCode::NOT_FOUND =>
3145 {
3146 Vec::new()
3147 }
3148 Err(e) => return Err(e.into()),
3149 };
3150 self.collect_algo_reports(
3151 account_id,
3152 &history,
3153 &mut instruments_cache,
3154 ts_init,
3155 &mut seen,
3156 &mut reports,
3157 )
3158 .await?;
3159
3160 Ok(reports)
3161 }
3162
3163 pub async fn request_algo_order_status_report(
3169 &self,
3170 account_id: AccountId,
3171 instrument_id: InstrumentId,
3172 algo_client_order_id: ClientOrderId,
3173 ) -> anyhow::Result<Option<OrderStatusReport>> {
3174 let reports = self
3175 .request_algo_order_status_reports(
3176 account_id,
3177 None,
3178 Some(instrument_id),
3179 None,
3180 Some(algo_client_order_id),
3181 None,
3182 Some(50_u32),
3183 )
3184 .await?;
3185
3186 Ok(reports.into_iter().next())
3187 }
3188
3189 pub fn raw_client(&self) -> &Arc<OKXRawHttpClient> {
3191 &self.inner
3192 }
3193
3194 async fn collect_algo_reports(
3195 &self,
3196 account_id: AccountId,
3197 orders: &[OKXOrderAlgo],
3198 instruments_cache: &mut AHashMap<Ustr, InstrumentAny>,
3199 ts_init: UnixNanos,
3200 seen: &mut AHashSet<(String, String)>,
3201 reports: &mut Vec<OrderStatusReport>,
3202 ) -> anyhow::Result<()> {
3203 for order in orders {
3204 let key = (order.algo_id.clone(), order.algo_cl_ord_id.clone());
3205 if !seen.insert(key) {
3206 continue;
3207 }
3208
3209 let instrument = if let Some(instrument) = instruments_cache.get(&order.inst_id) {
3210 instrument.clone()
3211 } else {
3212 let Ok(instrument) = self.instrument_from_cache(order.inst_id) else {
3213 tracing::debug!(
3214 symbol = %order.inst_id,
3215 "Skipping algo order report for instrument not in cache"
3216 );
3217 continue;
3218 };
3219 instruments_cache.insert(order.inst_id, instrument.clone());
3220 instrument
3221 };
3222
3223 match parse_http_algo_order(order, account_id, &instrument, ts_init) {
3224 Ok(report) => reports.push(report),
3225 Err(e) => {
3226 tracing::error!("Failed to parse algo order report: {e}");
3227 }
3228 }
3229 }
3230
3231 Ok(())
3232 }
3233}
3234
3235fn parse_http_algo_order(
3236 order: &OKXOrderAlgo,
3237 account_id: AccountId,
3238 instrument: &InstrumentAny,
3239 ts_init: UnixNanos,
3240) -> anyhow::Result<OrderStatusReport> {
3241 let ord_px = if order.ord_px.is_empty() {
3242 "-1".to_string()
3243 } else {
3244 order.ord_px.clone()
3245 };
3246
3247 let reduce_only = if order.reduce_only.is_empty() {
3248 "false".to_string()
3249 } else {
3250 order.reduce_only.clone()
3251 };
3252
3253 let msg = OKXAlgoOrderMsg {
3254 algo_id: order.algo_id.clone(),
3255 algo_cl_ord_id: order.algo_cl_ord_id.clone(),
3256 cl_ord_id: order.cl_ord_id.clone(),
3257 ord_id: order.ord_id.clone(),
3258 inst_id: order.inst_id,
3259 inst_type: order.inst_type,
3260 ord_type: order.ord_type,
3261 state: order.state,
3262 side: order.side,
3263 pos_side: order.pos_side,
3264 sz: order.sz.clone(),
3265 trigger_px: order.trigger_px.clone(),
3266 trigger_px_type: order.trigger_px_type.unwrap_or(OKXTriggerType::None),
3267 ord_px,
3268 td_mode: order.td_mode,
3269 lever: order.lever.clone(),
3270 reduce_only,
3271 actual_px: order.actual_px.clone(),
3272 actual_sz: order.actual_sz.clone(),
3273 notional_usd: order.notional_usd.clone(),
3274 c_time: order.c_time,
3275 u_time: order.u_time,
3276 trigger_time: order.trigger_time.clone(),
3277 tag: order.tag.clone(),
3278 };
3279
3280 parse_algo_order_status_report(&msg, instrument, account_id, ts_init)
3281}