1use std::{
36 collections::HashMap,
37 fmt::Debug,
38 num::NonZeroU32,
39 str::FromStr,
40 sync::{Arc, LazyLock, Mutex},
41};
42
43use ahash::{AHashMap, AHashSet};
44use chrono::{DateTime, Utc};
45use nautilus_core::{
46 MUTEX_POISONED, UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var,
47 time::get_atomic_clock_realtime,
48};
49use nautilus_model::{
50 data::{Bar, BarType, IndexPriceUpdate, MarkPriceUpdate, TradeTick},
51 enums::{AggregationSource, BarAggregation, OrderSide, OrderType, TriggerType},
52 events::AccountState,
53 identifiers::{AccountId, ClientOrderId, InstrumentId},
54 instruments::{Instrument, InstrumentAny},
55 reports::{FillReport, OrderStatusReport, PositionStatusReport},
56 types::{Price, Quantity},
57};
58use nautilus_network::{
59 http::HttpClient,
60 ratelimiter::quota::Quota,
61 retry::{RetryConfig, RetryManager},
62};
63use reqwest::{Method, StatusCode, header::USER_AGENT};
64use rust_decimal::Decimal;
65use serde::{Deserialize, Serialize, de::DeserializeOwned};
66use tokio_util::sync::CancellationToken;
67use ustr::Ustr;
68
69use super::{
70 error::OKXHttpError,
71 models::{
72 OKXAccount, OKXCancelAlgoOrderRequest, OKXCancelAlgoOrderResponse, OKXFeeRate,
73 OKXIndexTicker, OKXMarkPrice, OKXOrderAlgo, OKXOrderHistory, OKXPlaceAlgoOrderRequest,
74 OKXPlaceAlgoOrderResponse, OKXPosition, OKXPositionHistory, OKXPositionTier, OKXServerTime,
75 OKXTransactionDetail,
76 },
77 query::{
78 GetAlgoOrdersParams, GetAlgoOrdersParamsBuilder, GetCandlesticksParams,
79 GetCandlesticksParamsBuilder, GetIndexTickerParams, GetIndexTickerParamsBuilder,
80 GetInstrumentsParams, GetInstrumentsParamsBuilder, GetMarkPriceParams,
81 GetMarkPriceParamsBuilder, GetOrderHistoryParams, GetOrderHistoryParamsBuilder,
82 GetOrderListParams, GetOrderListParamsBuilder, GetPositionTiersParams,
83 GetPositionsHistoryParams, GetPositionsParams, GetPositionsParamsBuilder,
84 GetTradeFeeParams, GetTradesParams, GetTradesParamsBuilder, GetTransactionDetailsParams,
85 GetTransactionDetailsParamsBuilder, SetPositionModeParams, SetPositionModeParamsBuilder,
86 },
87};
88use crate::{
89 common::{
90 consts::{OKX_HTTP_URL, OKX_NAUTILUS_BROKER_ID, should_retry_error_code},
91 credential::Credential,
92 enums::{
93 OKXAlgoOrderType, OKXInstrumentType, OKXOrderStatus, OKXPositionMode, OKXSide,
94 OKXTradeMode, OKXTriggerType,
95 },
96 models::OKXInstrument,
97 parse::{
98 okx_instrument_type, parse_account_state, parse_candlestick, parse_fill_report,
99 parse_index_price_update, parse_instrument_any, parse_mark_price_update,
100 parse_order_status_report, parse_position_status_report, parse_trade_tick,
101 },
102 },
103 http::{
104 models::{OKXCandlestick, OKXTrade},
105 query::{GetOrderParams, GetPendingOrdersParams},
106 },
107 websocket::{messages::OKXAlgoOrderMsg, parse::parse_algo_order_status_report},
108};
109
110const OKX_SUCCESS_CODE: &str = "0";
111
112pub static OKX_REST_QUOTA: LazyLock<Quota> =
121 LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
122
123const OKX_GLOBAL_RATE_KEY: &str = "okx:global";
124
125#[derive(Debug, Serialize, Deserialize)]
127pub struct OKXResponse<T> {
128 pub code: String,
130 pub msg: String,
132 pub data: Vec<T>,
134}
135
136pub struct OKXHttpInnerClient {
142 base_url: String,
143 client: HttpClient,
144 credential: Option<Credential>,
145 retry_manager: RetryManager<OKXHttpError>,
146 cancellation_token: CancellationToken,
147 is_demo: bool,
148}
149
150impl Default for OKXHttpInnerClient {
151 fn default() -> Self {
152 Self::new(None, Some(60), None, None, None, false)
153 .expect("Failed to create default OKXHttpInnerClient")
154 }
155}
156
157impl Debug for OKXHttpInnerClient {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 let credential = self.credential.as_ref().map(|_| "<redacted>");
160 f.debug_struct(stringify!(OKXHttpInnerClient))
161 .field("base_url", &self.base_url)
162 .field("credential", &credential)
163 .finish_non_exhaustive()
164 }
165}
166
167impl OKXHttpInnerClient {
168 fn rate_limiter_quotas() -> Vec<(String, Quota)> {
169 vec![
170 (OKX_GLOBAL_RATE_KEY.to_string(), *OKX_REST_QUOTA),
171 (
172 "okx:/api/v5/account/balance".to_string(),
173 Quota::per_second(NonZeroU32::new(5).unwrap()),
174 ),
175 (
176 "okx:/api/v5/public/instruments".to_string(),
177 Quota::per_second(NonZeroU32::new(10).unwrap()),
178 ),
179 (
180 "okx:/api/v5/market/candles".to_string(),
181 Quota::per_second(NonZeroU32::new(50).unwrap()),
182 ),
183 (
184 "okx:/api/v5/market/history-candles".to_string(),
185 Quota::per_second(NonZeroU32::new(20).unwrap()),
186 ),
187 (
188 "okx:/api/v5/market/history-trades".to_string(),
189 Quota::per_second(NonZeroU32::new(30).unwrap()),
190 ),
191 (
192 "okx:/api/v5/trade/order".to_string(),
193 Quota::per_second(NonZeroU32::new(30).unwrap()), ),
195 (
196 "okx:/api/v5/trade/orders-pending".to_string(),
197 Quota::per_second(NonZeroU32::new(20).unwrap()),
198 ),
199 (
200 "okx:/api/v5/trade/orders-history".to_string(),
201 Quota::per_second(NonZeroU32::new(20).unwrap()),
202 ),
203 (
204 "okx:/api/v5/trade/fills".to_string(),
205 Quota::per_second(NonZeroU32::new(30).unwrap()),
206 ),
207 (
208 "okx:/api/v5/trade/order-algo".to_string(),
209 Quota::per_second(NonZeroU32::new(10).unwrap()),
210 ),
211 (
212 "okx:/api/v5/trade/cancel-algos".to_string(),
213 Quota::per_second(NonZeroU32::new(10).unwrap()),
214 ),
215 ]
216 }
217
218 fn rate_limit_keys(endpoint: &str) -> Vec<Ustr> {
219 let normalized = endpoint.split('?').next().unwrap_or(endpoint);
220 let route = format!("okx:{normalized}");
221
222 vec![Ustr::from(OKX_GLOBAL_RATE_KEY), Ustr::from(route.as_str())]
223 }
224
225 pub fn cancel_all_requests(&self) {
227 self.cancellation_token.cancel();
228 }
229
230 pub fn cancellation_token(&self) -> &CancellationToken {
232 &self.cancellation_token
233 }
234
235 pub fn new(
245 base_url: Option<String>,
246 timeout_secs: Option<u64>,
247 max_retries: Option<u32>,
248 retry_delay_ms: Option<u64>,
249 retry_delay_max_ms: Option<u64>,
250 is_demo: bool,
251 ) -> Result<Self, OKXHttpError> {
252 let retry_config = RetryConfig {
253 max_retries: max_retries.unwrap_or(3),
254 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
255 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
256 backoff_factor: 2.0,
257 jitter_ms: 1000,
258 operation_timeout_ms: Some(60_000),
259 immediate_first: false,
260 max_elapsed_ms: Some(180_000),
261 };
262
263 let retry_manager = RetryManager::new(retry_config).map_err(|e| {
264 OKXHttpError::ValidationError(format!("Failed to create retry manager: {e}"))
265 })?;
266
267 Ok(Self {
268 base_url: base_url.unwrap_or(OKX_HTTP_URL.to_string()),
269 client: HttpClient::new(
270 Self::default_headers(is_demo),
271 vec![],
272 Self::rate_limiter_quotas(),
273 Some(*OKX_REST_QUOTA),
274 timeout_secs,
275 ),
276 credential: None,
277 retry_manager,
278 cancellation_token: CancellationToken::new(),
279 is_demo,
280 })
281 }
282
283 #[allow(clippy::too_many_arguments)]
290 pub fn with_credentials(
291 api_key: String,
292 api_secret: String,
293 api_passphrase: String,
294 base_url: String,
295 timeout_secs: Option<u64>,
296 max_retries: Option<u32>,
297 retry_delay_ms: Option<u64>,
298 retry_delay_max_ms: Option<u64>,
299 is_demo: bool,
300 ) -> Result<Self, OKXHttpError> {
301 let retry_config = RetryConfig {
302 max_retries: max_retries.unwrap_or(3),
303 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
304 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
305 backoff_factor: 2.0,
306 jitter_ms: 1000,
307 operation_timeout_ms: Some(60_000),
308 immediate_first: false,
309 max_elapsed_ms: Some(180_000),
310 };
311
312 let retry_manager = RetryManager::new(retry_config).map_err(|e| {
313 OKXHttpError::ValidationError(format!("Failed to create retry manager: {e}"))
314 })?;
315
316 Ok(Self {
317 base_url,
318 client: HttpClient::new(
319 Self::default_headers(is_demo),
320 vec![],
321 Self::rate_limiter_quotas(),
322 Some(*OKX_REST_QUOTA),
323 timeout_secs,
324 ),
325 credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
326 retry_manager,
327 cancellation_token: CancellationToken::new(),
328 is_demo,
329 })
330 }
331
332 fn default_headers(is_demo: bool) -> HashMap<String, String> {
334 let mut headers =
335 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())]);
336
337 if is_demo {
338 headers.insert("x-simulated-trading".to_string(), "1".to_string());
339 }
340
341 headers
342 }
343
344 fn build_path<S: Serialize>(base: &str, params: &S) -> Result<String, OKXHttpError> {
350 let query = serde_urlencoded::to_string(params)
351 .map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
352 if query.is_empty() {
353 Ok(base.to_owned())
354 } else {
355 Ok(format!("{base}?{query}"))
356 }
357 }
358
359 fn sign_request(
366 &self,
367 method: &Method,
368 path: &str,
369 body: Option<&[u8]>,
370 ) -> Result<HashMap<String, String>, OKXHttpError> {
371 let credential = match self.credential.as_ref() {
372 Some(c) => c,
373 None => return Err(OKXHttpError::MissingCredentials),
374 };
375
376 let api_key = credential.api_key.to_string();
377 let api_passphrase = credential.api_passphrase.clone();
378
379 let now = Utc::now();
381 let millis = now.timestamp_subsec_millis();
382 let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string() + &format!(".{:03}Z", millis);
383 let signature = credential.sign_bytes(×tamp, method.as_str(), path, body);
384
385 let mut headers = HashMap::new();
386 headers.insert("OK-ACCESS-KEY".to_string(), api_key);
387 headers.insert("OK-ACCESS-PASSPHRASE".to_string(), api_passphrase);
388 headers.insert("OK-ACCESS-TIMESTAMP".to_string(), timestamp);
389 headers.insert("OK-ACCESS-SIGN".to_string(), signature);
390
391 Ok(headers)
392 }
393
394 async fn send_request<T: DeserializeOwned>(
410 &self,
411 method: Method,
412 path: &str,
413 body: Option<Vec<u8>>,
414 authenticate: bool,
415 ) -> Result<Vec<T>, OKXHttpError> {
416 let url = format!("{}{path}", self.base_url);
417 let endpoint = path.to_string();
418 let method_clone = method.clone();
419 let body_clone = body.clone();
420
421 let operation = || {
422 let url = url.clone();
423 let method = method_clone.clone();
424 let body = body_clone.clone();
425 let endpoint = endpoint.clone();
426
427 async move {
428 let mut headers = if authenticate {
429 self.sign_request(&method, endpoint.as_str(), body.as_deref())?
430 } else {
431 HashMap::new()
432 };
433
434 if body.is_some() {
436 headers.insert("Content-Type".to_string(), "application/json".to_string());
437 }
438
439 let rate_keys = Self::rate_limit_keys(endpoint.as_str());
440 let resp = self
441 .client
442 .request_with_ustr_keys(
443 method.clone(),
444 url,
445 Some(headers),
446 body,
447 None,
448 Some(rate_keys),
449 )
450 .await?;
451
452 tracing::trace!("Response: {resp:?}");
453
454 if resp.status.is_success() {
455 let okx_response: OKXResponse<T> =
456 serde_json::from_slice(&resp.body).map_err(|e| {
457 tracing::error!("Failed to deserialize OKXResponse: {e}");
458 OKXHttpError::JsonError(e.to_string())
459 })?;
460
461 if okx_response.code != OKX_SUCCESS_CODE {
462 return Err(OKXHttpError::OkxError {
463 error_code: okx_response.code,
464 message: okx_response.msg,
465 });
466 }
467
468 Ok(okx_response.data)
469 } else {
470 let error_body = String::from_utf8_lossy(&resp.body);
471 if resp.status.as_u16() == StatusCode::NOT_FOUND.as_u16() {
472 tracing::debug!("HTTP 404 with body: {error_body}");
473 } else {
474 tracing::error!(
475 "HTTP error {} with body: {error_body}",
476 resp.status.as_str()
477 );
478 }
479
480 if let Ok(parsed_error) = serde_json::from_slice::<OKXResponse<T>>(&resp.body) {
481 return Err(OKXHttpError::OkxError {
482 error_code: parsed_error.code,
483 message: parsed_error.msg,
484 });
485 }
486
487 Err(OKXHttpError::UnexpectedStatus {
488 status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
489 body: error_body.to_string(),
490 })
491 }
492 }
493 };
494
495 let should_retry = |error: &OKXHttpError| -> bool {
504 match error {
505 OKXHttpError::HttpClientError(_) => true,
506 OKXHttpError::UnexpectedStatus { status, .. } => {
507 status.as_u16() >= 500 || status.as_u16() == 429
508 }
509 OKXHttpError::OkxError { error_code, .. } => should_retry_error_code(error_code),
510 _ => false,
511 }
512 };
513
514 let create_error = |msg: String| -> OKXHttpError {
515 if msg == "canceled" {
516 OKXHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
517 } else {
518 OKXHttpError::ValidationError(msg)
519 }
520 };
521
522 self.retry_manager
523 .execute_with_retry_with_cancel(
524 endpoint.as_str(),
525 operation,
526 should_retry,
527 create_error,
528 &self.cancellation_token,
529 )
530 .await
531 }
532
533 pub async fn http_set_position_mode(
544 &self,
545 params: SetPositionModeParams,
546 ) -> Result<Vec<serde_json::Value>, OKXHttpError> {
547 let path = "/api/v5/account/set-position-mode";
548 let body = serde_json::to_vec(¶ms)?;
549 self.send_request(Method::POST, path, Some(body), true)
550 .await
551 }
552
553 pub async fn http_get_position_tiers(
564 &self,
565 params: GetPositionTiersParams,
566 ) -> Result<Vec<OKXPositionTier>, OKXHttpError> {
567 let path = Self::build_path("/api/v5/public/position-tiers", ¶ms)?;
568 self.send_request(Method::GET, &path, None, false).await
569 }
570
571 pub async fn http_get_instruments(
582 &self,
583 params: GetInstrumentsParams,
584 ) -> Result<Vec<OKXInstrument>, OKXHttpError> {
585 let path = Self::build_path("/api/v5/public/instruments", ¶ms)?;
586 self.send_request(Method::GET, &path, None, false).await
587 }
588
589 pub async fn http_get_server_time(&self) -> Result<u64, OKXHttpError> {
603 let response: Vec<OKXServerTime> = self
604 .send_request(Method::GET, "/api/v5/public/time", None, false)
605 .await?;
606 response
607 .first()
608 .map(|t| t.ts)
609 .ok_or_else(|| OKXHttpError::JsonError("Empty server time response".to_string()))
610 }
611
612 pub async fn http_get_mark_price(
626 &self,
627 params: GetMarkPriceParams,
628 ) -> Result<Vec<OKXMarkPrice>, OKXHttpError> {
629 let path = Self::build_path("/api/v5/public/mark-price", ¶ms)?;
630 self.send_request(Method::GET, &path, None, false).await
631 }
632
633 pub async fn http_get_index_ticker(
643 &self,
644 params: GetIndexTickerParams,
645 ) -> Result<Vec<OKXIndexTicker>, OKXHttpError> {
646 let path = Self::build_path("/api/v5/market/index-tickers", ¶ms)?;
647 self.send_request(Method::GET, &path, None, false).await
648 }
649
650 pub async fn http_get_trades(
660 &self,
661 params: GetTradesParams,
662 ) -> Result<Vec<OKXTrade>, OKXHttpError> {
663 let path = Self::build_path("/api/v5/market/history-trades", ¶ms)?;
664 self.send_request(Method::GET, &path, None, false).await
665 }
666
667 pub async fn http_get_candlesticks(
677 &self,
678 params: GetCandlesticksParams,
679 ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
680 let path = Self::build_path("/api/v5/market/candles", ¶ms)?;
681 self.send_request(Method::GET, &path, None, false).await
682 }
683
684 pub async fn http_get_candlesticks_history(
694 &self,
695 params: GetCandlesticksParams,
696 ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
697 let path = Self::build_path("/api/v5/market/history-candles", ¶ms)?;
698 self.send_request(Method::GET, &path, None, false).await
699 }
700
701 pub async fn http_get_pending_orders(
711 &self,
712 params: GetPendingOrdersParams,
713 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
714 let path = Self::build_path("/api/v5/trade/orders-pending", ¶ms)?;
715 self.send_request(Method::GET, &path, None, true).await
716 }
717
718 pub async fn http_get_order(
728 &self,
729 params: GetOrderParams,
730 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
731 let path = Self::build_path("/api/v5/trade/order", ¶ms)?;
732 self.send_request(Method::GET, &path, None, true).await
733 }
734
735 pub async fn http_get_balance(&self) -> Result<Vec<OKXAccount>, OKXHttpError> {
746 let path = "/api/v5/account/balance";
747 self.send_request(Method::GET, path, None, true).await
748 }
749
750 pub async fn http_get_trade_fee(
762 &self,
763 params: GetTradeFeeParams,
764 ) -> Result<Vec<OKXFeeRate>, OKXHttpError> {
765 let path = Self::build_path("/api/v5/account/trade-fee", ¶ms)?;
766 self.send_request(Method::GET, &path, None, true).await
767 }
768
769 pub async fn http_get_order_history(
779 &self,
780 params: GetOrderHistoryParams,
781 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
782 let path = Self::build_path("/api/v5/trade/orders-history", ¶ms)?;
783 self.send_request(Method::GET, &path, None, true).await
784 }
785
786 pub async fn http_get_order_list(
796 &self,
797 params: GetOrderListParams,
798 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
799 let path = Self::build_path("/api/v5/trade/orders-pending", ¶ms)?;
800 self.send_request(Method::GET, &path, None, true).await
801 }
802
803 pub async fn http_get_order_algo_pending(
809 &self,
810 params: GetAlgoOrdersParams,
811 ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
812 let path = Self::build_path("/api/v5/trade/order-algo-pending", ¶ms)?;
813 self.send_request(Method::GET, &path, None, true).await
814 }
815
816 pub async fn http_get_order_algo_history(
822 &self,
823 params: GetAlgoOrdersParams,
824 ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
825 let path = Self::build_path("/api/v5/trade/order-algo-history", ¶ms)?;
826 self.send_request(Method::GET, &path, None, true).await
827 }
828
829 pub async fn http_get_positions(
841 &self,
842 params: GetPositionsParams,
843 ) -> Result<Vec<OKXPosition>, OKXHttpError> {
844 let path = Self::build_path("/api/v5/account/positions", ¶ms)?;
845 self.send_request(Method::GET, &path, None, true).await
846 }
847
848 pub async fn http_get_position_history(
858 &self,
859 params: GetPositionsHistoryParams,
860 ) -> Result<Vec<OKXPositionHistory>, OKXHttpError> {
861 let path = Self::build_path("/api/v5/account/positions-history", ¶ms)?;
862 self.send_request(Method::GET, &path, None, true).await
863 }
864
865 pub async fn http_get_transaction_details(
875 &self,
876 params: GetTransactionDetailsParams,
877 ) -> Result<Vec<OKXTransactionDetail>, OKXHttpError> {
878 let path = Self::build_path("/api/v5/trade/fills", ¶ms)?;
879 self.send_request(Method::GET, &path, None, true).await
880 }
881}
882
883#[derive(Clone, Debug)]
888#[cfg_attr(
889 feature = "python",
890 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
891)]
892pub struct OKXHttpClient {
893 pub(crate) inner: Arc<OKXHttpInnerClient>,
894 pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
895 cache_initialized: bool,
896}
897
898impl Default for OKXHttpClient {
899 fn default() -> Self {
900 Self::new(None, Some(60), None, None, None, false)
901 .expect("Failed to create default OKXHttpClient")
902 }
903}
904
905impl OKXHttpClient {
906 pub fn new(
916 base_url: Option<String>,
917 timeout_secs: Option<u64>,
918 max_retries: Option<u32>,
919 retry_delay_ms: Option<u64>,
920 retry_delay_max_ms: Option<u64>,
921 is_demo: bool,
922 ) -> anyhow::Result<Self> {
923 Ok(Self {
924 inner: Arc::new(OKXHttpInnerClient::new(
925 base_url,
926 timeout_secs,
927 max_retries,
928 retry_delay_ms,
929 retry_delay_max_ms,
930 is_demo,
931 )?),
932 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
933 cache_initialized: false,
934 })
935 }
936
937 pub fn from_env() -> anyhow::Result<Self> {
944 Self::with_credentials(None, None, None, None, None, None, None, None, false)
945 }
946
947 #[allow(clippy::too_many_arguments)]
954 pub fn with_credentials(
955 api_key: Option<String>,
956 api_secret: Option<String>,
957 api_passphrase: Option<String>,
958 base_url: Option<String>,
959 timeout_secs: Option<u64>,
960 max_retries: Option<u32>,
961 retry_delay_ms: Option<u64>,
962 retry_delay_max_ms: Option<u64>,
963 is_demo: bool,
964 ) -> anyhow::Result<Self> {
965 let api_key = get_or_env_var(api_key, "OKX_API_KEY")?;
966 let api_secret = get_or_env_var(api_secret, "OKX_API_SECRET")?;
967 let api_passphrase = get_or_env_var(api_passphrase, "OKX_API_PASSPHRASE")?;
968 let base_url = base_url.unwrap_or(OKX_HTTP_URL.to_string());
969
970 Ok(Self {
971 inner: Arc::new(OKXHttpInnerClient::with_credentials(
972 api_key,
973 api_secret,
974 api_passphrase,
975 base_url,
976 timeout_secs,
977 max_retries,
978 retry_delay_ms,
979 retry_delay_max_ms,
980 is_demo,
981 )?),
982 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
983 cache_initialized: false,
984 })
985 }
986
987 fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
993 self.instruments_cache
994 .lock()
995 .expect("`instruments_cache` lock poisoned")
996 .get(&symbol)
997 .cloned()
998 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
999 }
1000
1001 async fn instrument_or_fetch(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1002 if let Ok(inst) = self.get_instrument_from_cache(symbol) {
1003 return Ok(inst);
1004 }
1005
1006 for group in [
1007 OKXInstrumentType::Spot,
1008 OKXInstrumentType::Margin,
1009 OKXInstrumentType::Futures,
1010 OKXInstrumentType::Swap,
1011 OKXInstrumentType::Option,
1012 ] {
1013 if let Ok(instruments) = self.request_instruments(group, None).await {
1014 let mut guard = self.instruments_cache.lock().expect(MUTEX_POISONED);
1015 for inst in instruments {
1016 guard.insert(inst.raw_symbol().inner(), inst);
1017 }
1018 drop(guard);
1019
1020 if let Ok(inst) = self.get_instrument_from_cache(symbol) {
1021 return Ok(inst);
1022 }
1023 }
1024 }
1025
1026 anyhow::bail!("Instrument {symbol} not in cache and fetch failed");
1027 }
1028
1029 pub fn cancel_all_requests(&self) {
1031 self.inner.cancel_all_requests();
1032 }
1033
1034 pub fn cancellation_token(&self) -> &CancellationToken {
1036 self.inner.cancellation_token()
1037 }
1038
1039 pub fn base_url(&self) -> &str {
1041 self.inner.base_url.as_str()
1042 }
1043
1044 pub fn api_key(&self) -> Option<&str> {
1046 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
1047 }
1048
1049 #[must_use]
1051 pub fn is_demo(&self) -> bool {
1052 self.inner.is_demo
1053 }
1054
1055 pub async fn http_get_server_time(&self) -> Result<u64, OKXHttpError> {
1063 self.inner.http_get_server_time().await
1064 }
1065
1066 #[must_use]
1070 pub const fn is_initialized(&self) -> bool {
1071 self.cache_initialized
1072 }
1073
1074 fn generate_ts_init(&self) -> UnixNanos {
1076 get_atomic_clock_realtime().get_time_ns()
1077 }
1078
1079 #[must_use]
1081 pub fn get_cached_symbols(&self) -> Vec<String> {
1089 self.instruments_cache
1090 .lock()
1091 .unwrap()
1092 .keys()
1093 .map(std::string::ToString::to_string)
1094 .collect()
1095 }
1096
1097 pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
1106 for inst in instruments {
1107 self.instruments_cache
1108 .lock()
1109 .unwrap()
1110 .insert(inst.raw_symbol().inner(), inst);
1111 }
1112 self.cache_initialized = true;
1113 }
1114
1115 pub fn add_instrument(&mut self, instrument: InstrumentAny) {
1124 self.instruments_cache
1125 .lock()
1126 .unwrap()
1127 .insert(instrument.raw_symbol().inner(), instrument);
1128 self.cache_initialized = true;
1129 }
1130
1131 pub async fn request_account_state(
1137 &self,
1138 account_id: AccountId,
1139 ) -> anyhow::Result<AccountState> {
1140 let resp = self
1141 .inner
1142 .http_get_balance()
1143 .await
1144 .map_err(|e| anyhow::anyhow!(e))?;
1145
1146 let ts_init = self.generate_ts_init();
1147 let raw = resp
1148 .first()
1149 .ok_or_else(|| anyhow::anyhow!("No account state returned from OKX"))?;
1150 let account_state = parse_account_state(raw, account_id, ts_init)?;
1151
1152 Ok(account_state)
1153 }
1154
1155 pub async fn set_position_mode(&self, position_mode: OKXPositionMode) -> anyhow::Result<()> {
1168 let mut params = SetPositionModeParamsBuilder::default();
1169 params.pos_mode(position_mode);
1170 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1171
1172 match self.inner.http_set_position_mode(params).await {
1173 Ok(_) => Ok(()),
1174 Err(e) => {
1175 if let OKXHttpError::OkxError {
1176 error_code,
1177 message,
1178 } = &e
1179 && error_code == "50115"
1180 {
1181 tracing::warn!(
1182 "Account does not support position mode setting (derivatives trading not enabled): {message}"
1183 );
1184 return Ok(()); }
1186 anyhow::bail!(e)
1187 }
1188 }
1189 }
1190
1191 pub async fn request_instruments(
1197 &self,
1198 instrument_type: OKXInstrumentType,
1199 instrument_family: Option<String>,
1200 ) -> anyhow::Result<Vec<InstrumentAny>> {
1201 let mut params = GetInstrumentsParamsBuilder::default();
1202 params.inst_type(instrument_type);
1203
1204 if let Some(family) = instrument_family.clone() {
1205 params.inst_family(family);
1206 }
1207
1208 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1209
1210 let resp = self
1211 .inner
1212 .http_get_instruments(params)
1213 .await
1214 .map_err(|e| anyhow::anyhow!(e))?;
1215
1216 let fee_rate_opt = {
1217 let fee_params = GetTradeFeeParams {
1218 inst_type: instrument_type,
1219 uly: None,
1220 inst_family: instrument_family,
1221 };
1222
1223 match self.inner.http_get_trade_fee(fee_params).await {
1224 Ok(rates) => rates.into_iter().next(),
1225 Err(OKXHttpError::MissingCredentials) => {
1226 log::debug!("Missing credentials for fee rates, using None");
1227 None
1228 }
1229 Err(e) => {
1230 log::warn!("Failed to fetch fee rates for {instrument_type}: {e}");
1231 None
1232 }
1233 }
1234 };
1235
1236 let ts_init = self.generate_ts_init();
1237
1238 let mut instruments: Vec<InstrumentAny> = Vec::new();
1239 for inst in &resp {
1240 let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1242 let is_usdt_margined =
1243 inst.ct_type == crate::common::enums::OKXContractType::Linear;
1244 let (maker_str, taker_str) = if is_usdt_margined {
1245 (&fee_rate.maker_u, &fee_rate.taker_u)
1246 } else {
1247 (&fee_rate.maker, &fee_rate.taker)
1248 };
1249
1250 let maker = if !maker_str.is_empty() {
1251 Decimal::from_str(maker_str).ok()
1252 } else {
1253 None
1254 };
1255 let taker = if !taker_str.is_empty() {
1256 Decimal::from_str(taker_str).ok()
1257 } else {
1258 None
1259 };
1260
1261 (maker, taker)
1262 } else {
1263 (None, None)
1264 };
1265
1266 match parse_instrument_any(inst, None, None, maker_fee, taker_fee, ts_init) {
1267 Ok(Some(instrument_any)) => {
1268 instruments.push(instrument_any);
1269 }
1270 Ok(None) => {
1271 }
1273 Err(e) => {
1274 log::warn!("Failed to parse instrument {}: {e}", inst.inst_id);
1275 }
1276 }
1277 }
1278
1279 Ok(instruments)
1280 }
1281
1282 pub async fn request_mark_price(
1288 &self,
1289 instrument_id: InstrumentId,
1290 ) -> anyhow::Result<MarkPriceUpdate> {
1291 let mut params = GetMarkPriceParamsBuilder::default();
1292 params.inst_id(instrument_id.symbol.inner());
1293 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1294
1295 let resp = self
1296 .inner
1297 .http_get_mark_price(params)
1298 .await
1299 .map_err(|e| anyhow::anyhow!(e))?;
1300
1301 let raw = resp
1302 .first()
1303 .ok_or_else(|| anyhow::anyhow!("No mark price returned from OKX"))?;
1304 let inst = self
1305 .instrument_or_fetch(instrument_id.symbol.inner())
1306 .await?;
1307 let ts_init = self.generate_ts_init();
1308
1309 let mark_price =
1310 parse_mark_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1311 .map_err(|e| anyhow::anyhow!(e))?;
1312 Ok(mark_price)
1313 }
1314
1315 pub async fn request_index_price(
1321 &self,
1322 instrument_id: InstrumentId,
1323 ) -> anyhow::Result<IndexPriceUpdate> {
1324 let mut params = GetIndexTickerParamsBuilder::default();
1325 params.inst_id(instrument_id.symbol.inner());
1326 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1327
1328 let resp = self
1329 .inner
1330 .http_get_index_ticker(params)
1331 .await
1332 .map_err(|e| anyhow::anyhow!(e))?;
1333
1334 let raw = resp
1335 .first()
1336 .ok_or_else(|| anyhow::anyhow!("No index price returned from OKX"))?;
1337 let inst = self
1338 .instrument_or_fetch(instrument_id.symbol.inner())
1339 .await?;
1340 let ts_init = self.generate_ts_init();
1341
1342 let index_price =
1343 parse_index_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1344 .map_err(|e| anyhow::anyhow!(e))?;
1345 Ok(index_price)
1346 }
1347
1348 pub async fn request_trades(
1354 &self,
1355 instrument_id: InstrumentId,
1356 start: Option<DateTime<Utc>>,
1357 end: Option<DateTime<Utc>>,
1358 limit: Option<u32>,
1359 ) -> anyhow::Result<Vec<TradeTick>> {
1360 let mut params = GetTradesParamsBuilder::default();
1361
1362 params.inst_id(instrument_id.symbol.inner());
1363 if let Some(s) = start {
1364 params.after(s.timestamp_millis().to_string());
1365 }
1366 if let Some(e) = end {
1367 params.before(e.timestamp_millis().to_string());
1368 }
1369 const OKX_TRADES_MAX_LIMIT: u32 = 100;
1373 if let Some(l) = limit
1374 && l > 0
1375 {
1376 params.limit(l.min(OKX_TRADES_MAX_LIMIT));
1377 }
1378
1379 let params = params.build().map_err(anyhow::Error::new)?;
1380
1381 let raw_trades = self
1383 .inner
1384 .http_get_trades(params)
1385 .await
1386 .map_err(anyhow::Error::new)?;
1387
1388 let ts_init = self.generate_ts_init();
1389 let inst = self
1390 .instrument_or_fetch(instrument_id.symbol.inner())
1391 .await?;
1392
1393 let mut trades = Vec::with_capacity(raw_trades.len());
1394 for raw in raw_trades {
1395 match parse_trade_tick(
1396 &raw,
1397 instrument_id,
1398 inst.price_precision(),
1399 inst.size_precision(),
1400 ts_init,
1401 ) {
1402 Ok(trade) => trades.push(trade),
1403 Err(e) => tracing::error!("{e}"),
1404 }
1405 }
1406
1407 Ok(trades)
1408 }
1409
1410 pub async fn request_bars(
1455 &self,
1456 bar_type: BarType,
1457 start: Option<DateTime<Utc>>,
1458 mut end: Option<DateTime<Utc>>,
1459 limit: Option<u32>,
1460 ) -> anyhow::Result<Vec<Bar>> {
1461 const HISTORY_SPLIT_DAYS: i64 = 100;
1462 const MAX_PAGES_SOFT: usize = 500;
1463
1464 let limit = if limit == Some(0) { None } else { limit };
1465
1466 anyhow::ensure!(
1467 bar_type.aggregation_source() == AggregationSource::External,
1468 "Only EXTERNAL aggregation is supported"
1469 );
1470 if let (Some(s), Some(e)) = (start, end) {
1471 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1472 }
1473
1474 let now = Utc::now();
1475 if let Some(s) = start
1476 && s > now
1477 {
1478 return Ok(Vec::new());
1479 }
1480 if let Some(e) = end
1481 && e > now
1482 {
1483 end = Some(now);
1484 }
1485
1486 let spec = bar_type.spec();
1487 let step = spec.step.get();
1488 let bar_param = match spec.aggregation {
1489 BarAggregation::Second => format!("{step}s"),
1490 BarAggregation::Minute => format!("{step}m"),
1491 BarAggregation::Hour => format!("{step}H"),
1492 BarAggregation::Day => format!("{step}D"),
1493 BarAggregation::Week => format!("{step}W"),
1494 BarAggregation::Month => format!("{step}M"),
1495 a => anyhow::bail!("OKX does not support {a:?} aggregation"),
1496 };
1497
1498 let slot_ms: i64 = match spec.aggregation {
1499 BarAggregation::Second => (step as i64) * 1_000,
1500 BarAggregation::Minute => (step as i64) * 60_000,
1501 BarAggregation::Hour => (step as i64) * 3_600_000,
1502 BarAggregation::Day => (step as i64) * 86_400_000,
1503 BarAggregation::Week => (step as i64) * 7 * 86_400_000,
1504 BarAggregation::Month => (step as i64) * 30 * 86_400_000,
1505 _ => unreachable!("Unsupported aggregation should have been caught above"),
1506 };
1507 let slot_ns: i64 = slot_ms * 1_000_000;
1508
1509 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1510 enum Mode {
1511 Latest,
1512 Backward,
1513 Range,
1514 }
1515
1516 let mode = match (start, end) {
1517 (None, None) => Mode::Latest,
1518 (Some(_), None) => Mode::Backward, (None, Some(_)) => Mode::Backward,
1520 (Some(_), Some(_)) => Mode::Range,
1521 };
1522
1523 let start_ns = start.and_then(|s| s.timestamp_nanos_opt());
1524 let end_ns = end.and_then(|e| e.timestamp_nanos_opt());
1525
1526 let start_ms = start.map(|s| {
1528 let ms = s.timestamp_millis();
1529 if slot_ms > 0 {
1530 (ms / slot_ms) * slot_ms } else {
1532 ms
1533 }
1534 });
1535 let end_ms = end.map(|e| {
1536 let ms = e.timestamp_millis();
1537 if slot_ms > 0 {
1538 ((ms + slot_ms - 1) / slot_ms) * slot_ms } else {
1540 ms
1541 }
1542 });
1543 let now_ms = now.timestamp_millis();
1544
1545 let symbol = bar_type.instrument_id().symbol;
1546 let inst = self.instrument_or_fetch(symbol.inner()).await?;
1547
1548 let mut out: Vec<Bar> = Vec::new();
1549 let mut pages = 0usize;
1550
1551 let mut after_ms: Option<i64> = None;
1556 let mut before_ms: Option<i64> = match mode {
1557 Mode::Backward => end_ms.map(|v| v.saturating_sub(1)),
1558 Mode::Range => {
1559 Some(end_ms.unwrap_or(now_ms))
1562 }
1563 Mode::Latest => None,
1564 };
1565
1566 let mut forward_prepend_mode = matches!(mode, Mode::Range);
1568
1569 if matches!(mode, Mode::Backward | Mode::Range)
1573 && let Some(b) = before_ms
1574 {
1575 let buffer_ms = slot_ms.max(60_000); if b >= now_ms.saturating_sub(buffer_ms) {
1581 before_ms = Some(now_ms.saturating_sub(buffer_ms));
1582 }
1583 }
1584
1585 let mut have_latest_first_page = false;
1586 let mut progressless_loops = 0u8;
1587
1588 loop {
1589 if let Some(lim) = limit
1590 && lim > 0
1591 && out.len() >= lim as usize
1592 {
1593 break;
1594 }
1595 if pages >= MAX_PAGES_SOFT {
1596 break;
1597 }
1598
1599 let pivot_ms = if let Some(a) = after_ms {
1600 a
1601 } else if let Some(b) = before_ms {
1602 b
1603 } else {
1604 now_ms
1605 };
1606 let age_ms = now_ms.saturating_sub(pivot_ms);
1612 let age_hours = age_ms / (60 * 60 * 1000);
1613 let using_history = age_hours > 1; let page_ceiling = if using_history { 100 } else { 300 };
1616 let remaining = limit
1617 .filter(|&l| l > 0) .map_or(page_ceiling, |l| (l as usize).saturating_sub(out.len()));
1619 let page_cap = remaining.min(page_ceiling);
1620
1621 let mut p = GetCandlesticksParamsBuilder::default();
1622 p.inst_id(symbol.as_str())
1623 .bar(&bar_param)
1624 .limit(page_cap as u32);
1625
1626 let mut req_used_before = false;
1628
1629 match mode {
1630 Mode::Latest => {
1631 if have_latest_first_page && let Some(b) = before_ms {
1632 p.before_ms(b);
1633 req_used_before = true;
1634 }
1635 }
1636 Mode::Backward => {
1637 if let Some(b) = before_ms {
1638 p.before_ms(b);
1639 req_used_before = true;
1640 }
1641 }
1642 Mode::Range => {
1643 if pages == 0 && !using_history {
1646 } else if forward_prepend_mode {
1649 if let Some(b) = before_ms {
1650 p.before_ms(b);
1651 req_used_before = true;
1652 }
1653 } else if let Some(a) = after_ms {
1654 p.after_ms(a);
1655 }
1656 }
1657 }
1658
1659 let params = p.build().map_err(anyhow::Error::new)?;
1660
1661 let mut raw = if using_history {
1662 self.inner
1663 .http_get_candlesticks_history(params.clone())
1664 .await
1665 .map_err(anyhow::Error::new)?
1666 } else {
1667 self.inner
1668 .http_get_candlesticks(params.clone())
1669 .await
1670 .map_err(anyhow::Error::new)?
1671 };
1672
1673 if raw.is_empty() {
1675 if matches!(mode, Mode::Latest)
1677 && have_latest_first_page
1678 && !using_history
1679 && let Some(b) = before_ms
1680 {
1681 let mut p2 = GetCandlesticksParamsBuilder::default();
1682 p2.inst_id(symbol.as_str())
1683 .bar(&bar_param)
1684 .limit(page_cap as u32);
1685 p2.before_ms(b);
1686 let params2 = p2.build().map_err(anyhow::Error::new)?;
1687 let raw2 = self
1688 .inner
1689 .http_get_candlesticks_history(params2)
1690 .await
1691 .map_err(anyhow::Error::new)?;
1692 if !raw2.is_empty() {
1693 raw = raw2;
1694 } else {
1695 let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
1697 before_ms = Some(b.saturating_sub(jump));
1698 progressless_loops = progressless_loops.saturating_add(1);
1699 if progressless_loops >= 3 {
1700 break;
1701 }
1702 continue;
1703 }
1704 }
1705
1706 if raw.is_empty() && matches!(mode, Mode::Range) && pages > 0 {
1710 let backstep_ms = (page_cap as i64).saturating_mul(slot_ms.max(1));
1711 let pivot_back = after_ms.unwrap_or(now_ms).saturating_sub(backstep_ms);
1712
1713 let mut p2 = GetCandlesticksParamsBuilder::default();
1714 p2.inst_id(symbol.as_str())
1715 .bar(&bar_param)
1716 .limit(page_cap as u32)
1717 .before_ms(pivot_back);
1718 let params2 = p2.build().map_err(anyhow::Error::new)?;
1719 let raw2 = if (now_ms.saturating_sub(pivot_back)) / (24 * 60 * 60 * 1000)
1720 > HISTORY_SPLIT_DAYS
1721 {
1722 self.inner.http_get_candlesticks_history(params2).await
1723 } else {
1724 self.inner.http_get_candlesticks(params2).await
1725 }
1726 .map_err(anyhow::Error::new)?;
1727 if raw2.is_empty() {
1728 break;
1729 } else {
1730 raw = raw2;
1731 forward_prepend_mode = true;
1732 req_used_before = true;
1733 }
1734 }
1735
1736 if raw.is_empty()
1738 && matches!(mode, Mode::Latest)
1739 && !have_latest_first_page
1740 && !using_history
1741 {
1742 let jump_days_ms = (HISTORY_SPLIT_DAYS + 1) * 86_400_000;
1743 before_ms = Some(now_ms.saturating_sub(jump_days_ms));
1744 have_latest_first_page = true;
1745 continue;
1746 }
1747
1748 if raw.is_empty() {
1750 break;
1751 }
1752 }
1753 pages += 1;
1756
1757 let ts_init = self.generate_ts_init();
1759 let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
1760 for r in &raw {
1761 page.push(parse_candlestick(
1762 r,
1763 bar_type,
1764 inst.price_precision(),
1765 inst.size_precision(),
1766 ts_init,
1767 )?);
1768 }
1769 page.reverse();
1770
1771 let page_oldest_ms = page.first().map(|b| b.ts_event.as_i64() / 1_000_000);
1772 let page_newest_ms = page.last().map(|b| b.ts_event.as_i64() / 1_000_000);
1773
1774 let mut filtered: Vec<Bar> = if matches!(mode, Mode::Range)
1778 && out.is_empty()
1779 && pages < 2
1780 {
1781 let tolerance_ns = slot_ns * 2; if !page.is_empty() {
1788 tracing::debug!(
1789 "Range mode bootstrap page: {} bars from {} to {}, filtering with start={:?} end={:?}",
1790 page.len(),
1791 page.first().unwrap().ts_event.as_i64() / 1_000_000,
1792 page.last().unwrap().ts_event.as_i64() / 1_000_000,
1793 start_ms,
1794 end_ms
1795 );
1796 }
1797
1798 let result: Vec<Bar> = page
1799 .clone()
1800 .into_iter()
1801 .filter(|b| {
1802 let ts = b.ts_event.as_i64();
1803 let ok_after =
1805 start_ns.is_none_or(|sns| ts >= sns.saturating_sub(tolerance_ns));
1806 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1807 ok_after && ok_before
1808 })
1809 .collect();
1810
1811 result
1812 } else {
1813 page.clone()
1815 .into_iter()
1816 .filter(|b| {
1817 let ts = b.ts_event.as_i64();
1818 let ok_after = start_ns.is_none_or(|sns| ts >= sns);
1819 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1820 ok_after && ok_before
1821 })
1822 .collect()
1823 };
1824
1825 if !page.is_empty() && filtered.is_empty() {
1826 if matches!(mode, Mode::Range)
1828 && !forward_prepend_mode
1829 && let (Some(newest_ms), Some(start_ms)) = (page_newest_ms, start_ms)
1830 && newest_ms < start_ms.saturating_sub(slot_ms * 2)
1831 {
1832 break;
1834 }
1835 }
1836
1837 let contribution;
1839
1840 if out.is_empty() {
1841 contribution = filtered.len();
1842 out = filtered;
1843 } else {
1844 match mode {
1845 Mode::Backward | Mode::Latest => {
1846 if let Some(first) = out.first() {
1847 filtered.retain(|b| b.ts_event < first.ts_event);
1848 }
1849 contribution = filtered.len();
1850 if contribution != 0 {
1851 let mut new_out = Vec::with_capacity(out.len() + filtered.len());
1852 new_out.extend_from_slice(&filtered);
1853 new_out.extend_from_slice(&out);
1854 out = new_out;
1855 }
1856 }
1857 Mode::Range => {
1858 if forward_prepend_mode || req_used_before {
1859 if let Some(first) = out.first() {
1861 filtered.retain(|b| b.ts_event < first.ts_event);
1862 }
1863 contribution = filtered.len();
1864 if contribution != 0 {
1865 let mut new_out = Vec::with_capacity(out.len() + filtered.len());
1866 new_out.extend_from_slice(&filtered);
1867 new_out.extend_from_slice(&out);
1868 out = new_out;
1869 }
1870 } else {
1871 if let Some(last) = out.last() {
1873 filtered.retain(|b| b.ts_event > last.ts_event);
1874 }
1875 contribution = filtered.len();
1876 out.extend(filtered);
1877 }
1878 }
1879 }
1880 }
1881
1882 if contribution == 0
1884 && matches!(mode, Mode::Latest | Mode::Backward)
1885 && let Some(b) = before_ms
1886 {
1887 let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
1888 let new_b = b.saturating_sub(jump);
1889 if new_b != b {
1890 before_ms = Some(new_b);
1891 }
1892 }
1893
1894 if contribution == 0 {
1895 progressless_loops = progressless_loops.saturating_add(1);
1896 if progressless_loops >= 3 {
1897 break;
1898 }
1899 } else {
1900 progressless_loops = 0;
1901
1902 match mode {
1904 Mode::Latest | Mode::Backward => {
1905 if let Some(oldest) = page_oldest_ms {
1906 before_ms = Some(oldest.saturating_sub(1));
1907 have_latest_first_page = true;
1908 } else {
1909 break;
1910 }
1911 }
1912 Mode::Range => {
1913 if forward_prepend_mode || req_used_before {
1914 if let Some(oldest) = page_oldest_ms {
1915 let jump_back = slot_ms.max(60_000); before_ms = Some(oldest.saturating_sub(jump_back));
1918 after_ms = None;
1919 } else {
1920 break;
1921 }
1922 } else if let Some(newest) = page_newest_ms {
1923 after_ms = Some(newest.saturating_add(1));
1924 before_ms = None;
1925 } else {
1926 break;
1927 }
1928 }
1929 }
1930 }
1931
1932 if let Some(lim) = limit
1934 && lim > 0
1935 && out.len() >= lim as usize
1936 {
1937 break;
1938 }
1939 if let Some(ens) = end_ns
1940 && let Some(last) = out.last()
1941 && last.ts_event.as_i64() >= ens
1942 {
1943 break;
1944 }
1945 if let Some(sns) = start_ns
1946 && let Some(first) = out.first()
1947 && (matches!(mode, Mode::Backward) || forward_prepend_mode)
1948 && first.ts_event.as_i64() <= sns
1949 {
1950 if matches!(mode, Mode::Range) {
1952 if let Some(ens) = end_ns
1954 && let Some(last) = out.last()
1955 {
1956 let last_ts = last.ts_event.as_i64();
1957 if last_ts < ens {
1958 forward_prepend_mode = false;
1961 after_ms = Some((last_ts / 1_000_000).saturating_add(1));
1962 before_ms = None;
1963 continue;
1964 }
1965 }
1966 }
1967 break;
1968 }
1969
1970 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1971 }
1972
1973 if out.is_empty() && matches!(mode, Mode::Range) {
1975 let pivot = end_ms.unwrap_or(now_ms.saturating_sub(1));
1976 let hist = (now_ms.saturating_sub(pivot)) / (24 * 60 * 60 * 1000) > HISTORY_SPLIT_DAYS;
1977 let mut p = GetCandlesticksParamsBuilder::default();
1978 p.inst_id(symbol.as_str())
1979 .bar(&bar_param)
1980 .limit(300)
1981 .before_ms(pivot);
1982 let params = p.build().map_err(anyhow::Error::new)?;
1983 let raw = if hist {
1984 self.inner.http_get_candlesticks_history(params).await
1985 } else {
1986 self.inner.http_get_candlesticks(params).await
1987 }
1988 .map_err(anyhow::Error::new)?;
1989 if !raw.is_empty() {
1990 let ts_init = self.generate_ts_init();
1991 let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
1992 for r in &raw {
1993 page.push(parse_candlestick(
1994 r,
1995 bar_type,
1996 inst.price_precision(),
1997 inst.size_precision(),
1998 ts_init,
1999 )?);
2000 }
2001 page.reverse();
2002 out = page
2003 .into_iter()
2004 .filter(|b| {
2005 let ts = b.ts_event.as_i64();
2006 let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2007 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2008 ok_after && ok_before
2009 })
2010 .collect();
2011 }
2012 }
2013
2014 if let Some(ens) = end_ns {
2016 while out.last().is_some_and(|b| b.ts_event.as_i64() > ens) {
2017 out.pop();
2018 }
2019 }
2020
2021 if matches!(mode, Mode::Range)
2023 && !forward_prepend_mode
2024 && let Some(sns) = start_ns
2025 {
2026 let lower = sns.saturating_sub(slot_ns);
2027 while out.first().is_some_and(|b| b.ts_event.as_i64() < lower) {
2028 out.remove(0);
2029 }
2030 }
2031
2032 if let Some(lim) = limit
2033 && lim > 0
2034 && out.len() > lim as usize
2035 {
2036 out.truncate(lim as usize);
2037 }
2038
2039 Ok(out)
2040 }
2041
2042 #[allow(clippy::too_many_arguments)]
2053 pub async fn request_order_status_reports(
2054 &self,
2055 account_id: AccountId,
2056 instrument_type: Option<OKXInstrumentType>,
2057 instrument_id: Option<InstrumentId>,
2058 start: Option<DateTime<Utc>>,
2059 end: Option<DateTime<Utc>>,
2060 open_only: bool,
2061 limit: Option<u32>,
2062 ) -> anyhow::Result<Vec<OrderStatusReport>> {
2063 let mut history_params = GetOrderHistoryParamsBuilder::default();
2064
2065 let instrument_type = if let Some(instrument_type) = instrument_type {
2066 instrument_type
2067 } else {
2068 let instrument_id = instrument_id.ok_or_else(|| {
2069 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2070 })?;
2071 let instrument = self
2072 .instrument_or_fetch(instrument_id.symbol.inner())
2073 .await?;
2074 okx_instrument_type(&instrument)?
2075 };
2076
2077 history_params.inst_type(instrument_type);
2078
2079 if let Some(instrument_id) = instrument_id.as_ref() {
2080 history_params.inst_id(instrument_id.symbol.inner().to_string());
2081 }
2082
2083 if let Some(limit) = limit {
2084 history_params.limit(limit);
2085 }
2086
2087 let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
2088
2089 let mut pending_params = GetOrderListParamsBuilder::default();
2090 pending_params.inst_type(instrument_type);
2091
2092 if let Some(instrument_id) = instrument_id.as_ref() {
2093 pending_params.inst_id(instrument_id.symbol.inner().to_string());
2094 }
2095
2096 if let Some(limit) = limit {
2097 pending_params.limit(limit);
2098 }
2099
2100 let pending_params = pending_params.build().map_err(|e| anyhow::anyhow!(e))?;
2101
2102 let combined_resp = if open_only {
2103 self.inner
2105 .http_get_order_list(pending_params)
2106 .await
2107 .map_err(|e| anyhow::anyhow!(e))?
2108 } else {
2109 let (history_resp, pending_resp) = tokio::try_join!(
2111 self.inner.http_get_order_history(history_params),
2112 self.inner.http_get_order_list(pending_params)
2113 )
2114 .map_err(|e| anyhow::anyhow!(e))?;
2115
2116 let mut combined_resp = history_resp;
2118 combined_resp.extend(pending_resp);
2119 combined_resp
2120 };
2121
2122 let start_ns = start.map(UnixNanos::from);
2124 let end_ns = end.map(UnixNanos::from);
2125
2126 let ts_init = self.generate_ts_init();
2127 let mut reports = Vec::with_capacity(combined_resp.len());
2128
2129 let mut seen: AHashSet<String> = AHashSet::new();
2131
2132 for order in combined_resp {
2133 let seen_key = if !order.cl_ord_id.is_empty() {
2134 order.cl_ord_id.as_str().to_string()
2135 } else if let Some(algo_cl_ord_id) = order
2136 .algo_cl_ord_id
2137 .as_ref()
2138 .filter(|value| !value.as_str().is_empty())
2139 {
2140 algo_cl_ord_id.as_str().to_string()
2141 } else if let Some(algo_id) = order
2142 .algo_id
2143 .as_ref()
2144 .filter(|value| !value.as_str().is_empty())
2145 {
2146 algo_id.as_str().to_string()
2147 } else {
2148 order.ord_id.as_str().to_string()
2149 };
2150
2151 if !seen.insert(seen_key) {
2152 continue; }
2154
2155 let inst = self.instrument_or_fetch(order.inst_id).await?;
2156
2157 let report = parse_order_status_report(
2158 &order,
2159 account_id,
2160 inst.id(),
2161 inst.price_precision(),
2162 inst.size_precision(),
2163 ts_init,
2164 );
2165
2166 if let Some(start_ns) = start_ns
2167 && report.ts_last < start_ns
2168 {
2169 continue;
2170 }
2171 if let Some(end_ns) = end_ns
2172 && report.ts_last > end_ns
2173 {
2174 continue;
2175 }
2176
2177 reports.push(report);
2178 }
2179
2180 Ok(reports)
2181 }
2182
2183 pub async fn request_fill_reports(
2193 &self,
2194 account_id: AccountId,
2195 instrument_type: Option<OKXInstrumentType>,
2196 instrument_id: Option<InstrumentId>,
2197 start: Option<DateTime<Utc>>,
2198 end: Option<DateTime<Utc>>,
2199 limit: Option<u32>,
2200 ) -> anyhow::Result<Vec<FillReport>> {
2201 let mut params = GetTransactionDetailsParamsBuilder::default();
2202
2203 let instrument_type = if let Some(instrument_type) = instrument_type {
2204 instrument_type
2205 } else {
2206 let instrument_id = instrument_id.ok_or_else(|| {
2207 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2208 })?;
2209 let instrument = self
2210 .instrument_or_fetch(instrument_id.symbol.inner())
2211 .await?;
2212 okx_instrument_type(&instrument)?
2213 };
2214
2215 params.inst_type(instrument_type);
2216
2217 if let Some(instrument_id) = instrument_id {
2218 let instrument = self
2219 .instrument_or_fetch(instrument_id.symbol.inner())
2220 .await?;
2221 let instrument_type = okx_instrument_type(&instrument)?;
2222 params.inst_type(instrument_type);
2223 params.inst_id(instrument_id.symbol.inner().to_string());
2224 }
2225
2226 if let Some(limit) = limit {
2227 params.limit(limit);
2228 }
2229
2230 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2231
2232 let resp = self
2233 .inner
2234 .http_get_transaction_details(params)
2235 .await
2236 .map_err(|e| anyhow::anyhow!(e))?;
2237
2238 let start_ns = start.map(UnixNanos::from);
2240 let end_ns = end.map(UnixNanos::from);
2241
2242 let ts_init = self.generate_ts_init();
2243 let mut reports = Vec::with_capacity(resp.len());
2244
2245 for detail in resp {
2246 let inst = self.instrument_or_fetch(detail.inst_id).await?;
2247
2248 let report = parse_fill_report(
2249 detail,
2250 account_id,
2251 inst.id(),
2252 inst.price_precision(),
2253 inst.size_precision(),
2254 ts_init,
2255 )?;
2256
2257 if let Some(start_ns) = start_ns
2258 && report.ts_event < start_ns
2259 {
2260 continue;
2261 }
2262
2263 if let Some(end_ns) = end_ns
2264 && report.ts_event > end_ns
2265 {
2266 continue;
2267 }
2268
2269 reports.push(report);
2270 }
2271
2272 Ok(reports)
2273 }
2274
2275 pub async fn request_position_status_reports(
2302 &self,
2303 account_id: AccountId,
2304 instrument_type: Option<OKXInstrumentType>,
2305 instrument_id: Option<InstrumentId>,
2306 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2307 let mut params = GetPositionsParamsBuilder::default();
2308
2309 let instrument_type = if let Some(instrument_type) = instrument_type {
2310 instrument_type
2311 } else {
2312 let instrument_id = instrument_id.ok_or_else(|| {
2313 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2314 })?;
2315 let instrument = self
2316 .instrument_or_fetch(instrument_id.symbol.inner())
2317 .await?;
2318 okx_instrument_type(&instrument)?
2319 };
2320
2321 params.inst_type(instrument_type);
2322
2323 instrument_id
2324 .as_ref()
2325 .map(|i| params.inst_id(i.symbol.inner()));
2326
2327 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2328
2329 let resp = self
2330 .inner
2331 .http_get_positions(params)
2332 .await
2333 .map_err(|e| anyhow::anyhow!(e))?;
2334
2335 let ts_init = self.generate_ts_init();
2336 let mut reports = Vec::with_capacity(resp.len());
2337
2338 for position in resp {
2339 let inst = self.instrument_or_fetch(position.inst_id).await?;
2340
2341 let report = parse_position_status_report(
2342 position,
2343 account_id,
2344 inst.id(),
2345 inst.size_precision(),
2346 ts_init,
2347 )?;
2348 reports.push(report);
2349 }
2350
2351 Ok(reports)
2352 }
2353
2354 pub async fn place_algo_order(
2364 &self,
2365 request: OKXPlaceAlgoOrderRequest,
2366 ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
2367 let body =
2368 serde_json::to_vec(&request).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2369
2370 let resp: Vec<OKXPlaceAlgoOrderResponse> = self
2371 .inner
2372 .send_request(Method::POST, "/api/v5/trade/order-algo", Some(body), true)
2373 .await?;
2374
2375 resp.into_iter()
2376 .next()
2377 .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2378 }
2379
2380 pub async fn cancel_algo_order(
2390 &self,
2391 request: OKXCancelAlgoOrderRequest,
2392 ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
2393 let body =
2396 serde_json::to_vec(&[request]).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2397
2398 let resp: Vec<OKXCancelAlgoOrderResponse> = self
2399 .inner
2400 .send_request(Method::POST, "/api/v5/trade/cancel-algos", Some(body), true)
2401 .await?;
2402
2403 resp.into_iter()
2404 .next()
2405 .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2406 }
2407
2408 #[allow(clippy::too_many_arguments)]
2417 pub async fn place_algo_order_with_domain_types(
2418 &self,
2419 instrument_id: InstrumentId,
2420 td_mode: OKXTradeMode,
2421 client_order_id: ClientOrderId,
2422 order_side: OrderSide,
2423 order_type: OrderType,
2424 quantity: Quantity,
2425 trigger_price: Price,
2426 trigger_type: Option<TriggerType>,
2427 limit_price: Option<Price>,
2428 reduce_only: Option<bool>,
2429 ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
2430 if !matches!(order_side, OrderSide::Buy | OrderSide::Sell) {
2431 return Err(OKXHttpError::ValidationError(
2432 "Invalid order side".to_string(),
2433 ));
2434 }
2435 let okx_side: OKXSide = order_side.into();
2436
2437 let trigger_px_type_enum = trigger_type.map_or(OKXTriggerType::Last, Into::into);
2439
2440 let order_px = if matches!(order_type, OrderType::StopLimit | OrderType::LimitIfTouched) {
2442 limit_price.map(|p| p.to_string())
2443 } else {
2444 Some("-1".to_string())
2446 };
2447
2448 let request = OKXPlaceAlgoOrderRequest {
2449 inst_id: instrument_id.symbol.as_str().to_string(),
2450 td_mode,
2451 side: okx_side,
2452 ord_type: OKXAlgoOrderType::Trigger, sz: quantity.to_string(),
2454 algo_cl_ord_id: Some(client_order_id.as_str().to_string()),
2455 trigger_px: Some(trigger_price.to_string()),
2456 order_px,
2457 trigger_px_type: Some(trigger_px_type_enum),
2458 tgt_ccy: None, pos_side: None, close_position: None,
2461 tag: Some(OKX_NAUTILUS_BROKER_ID.to_string()),
2462 reduce_only,
2463 };
2464
2465 self.place_algo_order(request).await
2466 }
2467
2468 pub async fn cancel_algo_order_with_domain_types(
2477 &self,
2478 instrument_id: InstrumentId,
2479 algo_id: String,
2480 ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
2481 let request = OKXCancelAlgoOrderRequest {
2482 inst_id: instrument_id.symbol.to_string(),
2483 algo_id: Some(algo_id),
2484 algo_cl_ord_id: None,
2485 };
2486
2487 self.cancel_algo_order(request).await
2488 }
2489
2490 #[allow(clippy::too_many_arguments)]
2496 pub async fn request_algo_order_status_reports(
2497 &self,
2498 account_id: AccountId,
2499 instrument_type: Option<OKXInstrumentType>,
2500 instrument_id: Option<InstrumentId>,
2501 algo_id: Option<String>,
2502 algo_client_order_id: Option<ClientOrderId>,
2503 state: Option<OKXOrderStatus>,
2504 limit: Option<u32>,
2505 ) -> anyhow::Result<Vec<OrderStatusReport>> {
2506 let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
2507
2508 let inst_type = if let Some(inst_type) = instrument_type {
2509 inst_type
2510 } else if let Some(inst_id) = instrument_id {
2511 let instrument = self.instrument_or_fetch(inst_id.symbol.inner()).await?;
2512 let inst_type = okx_instrument_type(&instrument)?;
2513 instruments_cache.insert(inst_id.symbol.inner(), instrument);
2514 inst_type
2515 } else {
2516 anyhow::bail!("instrument_type or instrument_id required for algo order query")
2517 };
2518
2519 let mut params_builder = GetAlgoOrdersParamsBuilder::default();
2520 params_builder.inst_type(inst_type);
2521 if let Some(inst_id) = instrument_id {
2522 params_builder.inst_id(inst_id.symbol.inner().to_string());
2523 }
2524 if let Some(algo_id) = algo_id.as_ref() {
2525 params_builder.algo_id(algo_id.clone());
2526 }
2527 if let Some(client_order_id) = algo_client_order_id.as_ref() {
2528 params_builder.algo_cl_ord_id(client_order_id.as_str().to_string());
2529 }
2530 if let Some(state) = state {
2531 params_builder.state(state);
2532 }
2533 if let Some(limit) = limit {
2534 params_builder.limit(limit);
2535 }
2536
2537 let params = params_builder
2538 .build()
2539 .map_err(|e| anyhow::anyhow!(format!("Failed to build algo order params: {e}")))?;
2540
2541 let ts_init = self.generate_ts_init();
2542 let mut reports = Vec::new();
2543 let mut seen: AHashSet<(String, String)> = AHashSet::new();
2544
2545 let pending = match self.inner.http_get_order_algo_pending(params.clone()).await {
2546 Ok(result) => result,
2547 Err(OKXHttpError::UnexpectedStatus { status, .. })
2548 if status == StatusCode::NOT_FOUND =>
2549 {
2550 Vec::new()
2551 }
2552 Err(e) => return Err(e.into()),
2553 };
2554 self.collect_algo_reports(
2555 account_id,
2556 &pending,
2557 &mut instruments_cache,
2558 ts_init,
2559 &mut seen,
2560 &mut reports,
2561 )
2562 .await?;
2563
2564 let history = match self.inner.http_get_order_algo_history(params).await {
2565 Ok(result) => result,
2566 Err(OKXHttpError::UnexpectedStatus { status, .. })
2567 if status == StatusCode::NOT_FOUND =>
2568 {
2569 Vec::new()
2570 }
2571 Err(e) => return Err(e.into()),
2572 };
2573 self.collect_algo_reports(
2574 account_id,
2575 &history,
2576 &mut instruments_cache,
2577 ts_init,
2578 &mut seen,
2579 &mut reports,
2580 )
2581 .await?;
2582
2583 Ok(reports)
2584 }
2585
2586 pub async fn request_algo_order_status_report(
2592 &self,
2593 account_id: AccountId,
2594 instrument_id: InstrumentId,
2595 algo_client_order_id: ClientOrderId,
2596 ) -> anyhow::Result<Option<OrderStatusReport>> {
2597 let reports = self
2598 .request_algo_order_status_reports(
2599 account_id,
2600 None,
2601 Some(instrument_id),
2602 None,
2603 Some(algo_client_order_id),
2604 None,
2605 Some(50_u32),
2606 )
2607 .await?;
2608
2609 Ok(reports.into_iter().next())
2610 }
2611 async fn collect_algo_reports(
2612 &self,
2613 account_id: AccountId,
2614 orders: &[OKXOrderAlgo],
2615 instruments_cache: &mut AHashMap<Ustr, InstrumentAny>,
2616 ts_init: UnixNanos,
2617 seen: &mut AHashSet<(String, String)>,
2618 reports: &mut Vec<OrderStatusReport>,
2619 ) -> anyhow::Result<()> {
2620 for order in orders {
2621 let key = (order.algo_id.clone(), order.algo_cl_ord_id.clone());
2622 if !seen.insert(key) {
2623 continue;
2624 }
2625
2626 let instrument = if let Some(instrument) = instruments_cache.get(&order.inst_id) {
2627 instrument.clone()
2628 } else {
2629 let instrument = self.instrument_or_fetch(order.inst_id).await?;
2630 instruments_cache.insert(order.inst_id, instrument.clone());
2631 instrument
2632 };
2633
2634 let report = parse_http_algo_order(order, account_id, &instrument, ts_init)?;
2635 reports.push(report);
2636 }
2637
2638 Ok(())
2639 }
2640}
2641
2642fn parse_http_algo_order(
2643 order: &OKXOrderAlgo,
2644 account_id: AccountId,
2645 instrument: &InstrumentAny,
2646 ts_init: UnixNanos,
2647) -> anyhow::Result<OrderStatusReport> {
2648 let ord_px = if order.ord_px.is_empty() {
2649 "-1".to_string()
2650 } else {
2651 order.ord_px.clone()
2652 };
2653
2654 let reduce_only = if order.reduce_only.is_empty() {
2655 "false".to_string()
2656 } else {
2657 order.reduce_only.clone()
2658 };
2659
2660 let msg = OKXAlgoOrderMsg {
2661 algo_id: order.algo_id.clone(),
2662 algo_cl_ord_id: order.algo_cl_ord_id.clone(),
2663 cl_ord_id: order.cl_ord_id.clone(),
2664 ord_id: order.ord_id.clone(),
2665 inst_id: order.inst_id,
2666 inst_type: order.inst_type,
2667 ord_type: order.ord_type,
2668 state: order.state,
2669 side: order.side,
2670 pos_side: order.pos_side,
2671 sz: order.sz.clone(),
2672 trigger_px: order.trigger_px.clone(),
2673 trigger_px_type: order.trigger_px_type.unwrap_or(OKXTriggerType::None),
2674 ord_px,
2675 td_mode: order.td_mode,
2676 lever: order.lever.clone(),
2677 reduce_only,
2678 actual_px: order.actual_px.clone(),
2679 actual_sz: order.actual_sz.clone(),
2680 notional_usd: order.notional_usd.clone(),
2681 c_time: order.c_time,
2682 u_time: order.u_time,
2683 trigger_time: order.trigger_time.clone(),
2684 tag: order.tag.clone(),
2685 };
2686
2687 parse_algo_order_status_report(&msg, instrument, account_id, ts_init)
2688}