1use std::{
36 collections::HashMap,
37 fmt::Debug,
38 num::NonZeroU32,
39 sync::{Arc, LazyLock, Mutex},
40};
41
42use ahash::{AHashMap, AHashSet};
43use chrono::{DateTime, Utc};
44use nautilus_core::{
45 UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var, time::get_atomic_clock_realtime,
46};
47use nautilus_model::{
48 data::{Bar, BarType, IndexPriceUpdate, MarkPriceUpdate, TradeTick},
49 enums::{AggregationSource, BarAggregation, OrderSide, OrderType, TriggerType},
50 events::AccountState,
51 identifiers::{AccountId, ClientOrderId, InstrumentId},
52 instruments::{Instrument, InstrumentAny},
53 reports::{FillReport, OrderStatusReport, PositionStatusReport},
54 types::{Price, Quantity},
55};
56use nautilus_network::{
57 http::HttpClient,
58 ratelimiter::quota::Quota,
59 retry::{RetryConfig, RetryManager},
60};
61use reqwest::{Method, StatusCode, header::USER_AGENT};
62use serde::{Deserialize, Serialize, de::DeserializeOwned};
63use tokio_util::sync::CancellationToken;
64use ustr::Ustr;
65
66use super::{
67 error::OKXHttpError,
68 models::{
69 OKXAccount, OKXCancelAlgoOrderRequest, OKXCancelAlgoOrderResponse, OKXFeeRate,
70 OKXIndexTicker, OKXMarkPrice, OKXOrderAlgo, OKXOrderHistory, OKXPlaceAlgoOrderRequest,
71 OKXPlaceAlgoOrderResponse, OKXPosition, OKXPositionHistory, OKXPositionTier, OKXServerTime,
72 OKXTransactionDetail,
73 },
74 query::{
75 GetAlgoOrdersParams, GetAlgoOrdersParamsBuilder, GetCandlesticksParams,
76 GetCandlesticksParamsBuilder, GetIndexTickerParams, GetIndexTickerParamsBuilder,
77 GetInstrumentsParams, GetInstrumentsParamsBuilder, GetMarkPriceParams,
78 GetMarkPriceParamsBuilder, GetOrderHistoryParams, GetOrderHistoryParamsBuilder,
79 GetOrderListParams, GetOrderListParamsBuilder, GetPositionTiersParams,
80 GetPositionsHistoryParams, GetPositionsParams, GetPositionsParamsBuilder,
81 GetTradeFeeParams, GetTradesParams, GetTradesParamsBuilder, GetTransactionDetailsParams,
82 GetTransactionDetailsParamsBuilder, SetPositionModeParams, SetPositionModeParamsBuilder,
83 },
84};
85use crate::{
86 common::{
87 consts::{OKX_HTTP_URL, OKX_NAUTILUS_BROKER_ID, should_retry_error_code},
88 credential::Credential,
89 enums::{
90 OKXAlgoOrderType, OKXInstrumentType, OKXOrderStatus, OKXPositionMode, OKXSide,
91 OKXTradeMode, OKXTriggerType, OKXVipLevel,
92 },
93 models::OKXInstrument,
94 parse::{
95 okx_instrument_type, parse_account_state, parse_candlestick, parse_fill_report,
96 parse_index_price_update, parse_instrument_any, parse_mark_price_update,
97 parse_order_status_report, parse_position_status_report, parse_trade_tick,
98 },
99 },
100 http::{
101 models::{OKXCandlestick, OKXTrade},
102 query::{GetOrderParams, GetPendingOrdersParams},
103 },
104 websocket::{messages::OKXAlgoOrderMsg, parse::parse_algo_order_status_report},
105};
106
107const OKX_SUCCESS_CODE: &str = "0";
108
109pub static OKX_REST_QUOTA: LazyLock<Quota> =
118 LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
119
120const OKX_GLOBAL_RATE_KEY: &str = "okx:global";
121
122#[derive(Debug, Serialize, Deserialize)]
124pub struct OKXResponse<T> {
125 pub code: String,
127 pub msg: String,
129 pub data: Vec<T>,
131}
132
133pub struct OKXHttpInnerClient {
139 base_url: String,
140 client: HttpClient,
141 credential: Option<Credential>,
142 retry_manager: RetryManager<OKXHttpError>,
143 cancellation_token: CancellationToken,
144 is_demo: bool,
145}
146
147impl Default for OKXHttpInnerClient {
148 fn default() -> Self {
149 Self::new(None, Some(60), None, None, None, false)
150 .expect("Failed to create default OKXHttpInnerClient")
151 }
152}
153
154impl Debug for OKXHttpInnerClient {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 let credential = self.credential.as_ref().map(|_| "<redacted>");
157 f.debug_struct(stringify!(OKXHttpInnerClient))
158 .field("base_url", &self.base_url)
159 .field("credential", &credential)
160 .finish_non_exhaustive()
161 }
162}
163
164impl OKXHttpInnerClient {
165 fn rate_limiter_quotas() -> Vec<(String, Quota)> {
166 vec![
167 (OKX_GLOBAL_RATE_KEY.to_string(), *OKX_REST_QUOTA),
168 (
169 "okx:/api/v5/account/balance".to_string(),
170 Quota::per_second(NonZeroU32::new(5).unwrap()),
171 ),
172 (
173 "okx:/api/v5/public/instruments".to_string(),
174 Quota::per_second(NonZeroU32::new(10).unwrap()),
175 ),
176 (
177 "okx:/api/v5/market/candles".to_string(),
178 Quota::per_second(NonZeroU32::new(50).unwrap()),
179 ),
180 (
181 "okx:/api/v5/market/history-candles".to_string(),
182 Quota::per_second(NonZeroU32::new(20).unwrap()),
183 ),
184 (
185 "okx:/api/v5/market/history-trades".to_string(),
186 Quota::per_second(NonZeroU32::new(30).unwrap()),
187 ),
188 (
189 "okx:/api/v5/trade/order".to_string(),
190 Quota::per_second(NonZeroU32::new(30).unwrap()), ),
192 (
193 "okx:/api/v5/trade/orders-pending".to_string(),
194 Quota::per_second(NonZeroU32::new(20).unwrap()),
195 ),
196 (
197 "okx:/api/v5/trade/orders-history".to_string(),
198 Quota::per_second(NonZeroU32::new(20).unwrap()),
199 ),
200 (
201 "okx:/api/v5/trade/fills".to_string(),
202 Quota::per_second(NonZeroU32::new(30).unwrap()),
203 ),
204 (
205 "okx:/api/v5/trade/order-algo".to_string(),
206 Quota::per_second(NonZeroU32::new(10).unwrap()),
207 ),
208 (
209 "okx:/api/v5/trade/cancel-algos".to_string(),
210 Quota::per_second(NonZeroU32::new(10).unwrap()),
211 ),
212 ]
213 }
214
215 fn rate_limit_keys(endpoint: &str) -> Vec<Ustr> {
216 let normalized = endpoint.split('?').next().unwrap_or(endpoint);
217 let route = format!("okx:{normalized}");
218
219 vec![Ustr::from(OKX_GLOBAL_RATE_KEY), Ustr::from(route.as_str())]
220 }
221
222 pub fn cancel_all_requests(&self) {
224 self.cancellation_token.cancel();
225 }
226
227 pub fn cancellation_token(&self) -> &CancellationToken {
229 &self.cancellation_token
230 }
231
232 pub fn new(
242 base_url: Option<String>,
243 timeout_secs: Option<u64>,
244 max_retries: Option<u32>,
245 retry_delay_ms: Option<u64>,
246 retry_delay_max_ms: Option<u64>,
247 is_demo: bool,
248 ) -> Result<Self, OKXHttpError> {
249 let retry_config = RetryConfig {
250 max_retries: max_retries.unwrap_or(3),
251 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
252 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
253 backoff_factor: 2.0,
254 jitter_ms: 1000,
255 operation_timeout_ms: Some(60_000),
256 immediate_first: false,
257 max_elapsed_ms: Some(180_000),
258 };
259
260 let retry_manager = RetryManager::new(retry_config).map_err(|e| {
261 OKXHttpError::ValidationError(format!("Failed to create retry manager: {e}"))
262 })?;
263
264 Ok(Self {
265 base_url: base_url.unwrap_or(OKX_HTTP_URL.to_string()),
266 client: HttpClient::new(
267 Self::default_headers(is_demo),
268 vec![],
269 Self::rate_limiter_quotas(),
270 Some(*OKX_REST_QUOTA),
271 timeout_secs,
272 ),
273 credential: None,
274 retry_manager,
275 cancellation_token: CancellationToken::new(),
276 is_demo,
277 })
278 }
279
280 #[allow(clippy::too_many_arguments)]
287 pub fn with_credentials(
288 api_key: String,
289 api_secret: String,
290 api_passphrase: String,
291 base_url: String,
292 timeout_secs: Option<u64>,
293 max_retries: Option<u32>,
294 retry_delay_ms: Option<u64>,
295 retry_delay_max_ms: Option<u64>,
296 is_demo: bool,
297 ) -> Result<Self, OKXHttpError> {
298 let retry_config = RetryConfig {
299 max_retries: max_retries.unwrap_or(3),
300 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
301 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
302 backoff_factor: 2.0,
303 jitter_ms: 1000,
304 operation_timeout_ms: Some(60_000),
305 immediate_first: false,
306 max_elapsed_ms: Some(180_000),
307 };
308
309 let retry_manager = RetryManager::new(retry_config).map_err(|e| {
310 OKXHttpError::ValidationError(format!("Failed to create retry manager: {e}"))
311 })?;
312
313 Ok(Self {
314 base_url,
315 client: HttpClient::new(
316 Self::default_headers(is_demo),
317 vec![],
318 Self::rate_limiter_quotas(),
319 Some(*OKX_REST_QUOTA),
320 timeout_secs,
321 ),
322 credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
323 retry_manager,
324 cancellation_token: CancellationToken::new(),
325 is_demo,
326 })
327 }
328
329 fn default_headers(is_demo: bool) -> HashMap<String, String> {
331 let mut headers =
332 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())]);
333
334 if is_demo {
335 headers.insert("x-simulated-trading".to_string(), "1".to_string());
336 }
337
338 headers
339 }
340
341 fn build_path<S: Serialize>(base: &str, params: &S) -> Result<String, OKXHttpError> {
347 let query = serde_urlencoded::to_string(params)
348 .map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
349 if query.is_empty() {
350 Ok(base.to_owned())
351 } else {
352 Ok(format!("{base}?{query}"))
353 }
354 }
355
356 fn sign_request(
363 &self,
364 method: &Method,
365 path: &str,
366 body: Option<&[u8]>,
367 ) -> Result<HashMap<String, String>, OKXHttpError> {
368 let credential = match self.credential.as_ref() {
369 Some(c) => c,
370 None => return Err(OKXHttpError::MissingCredentials),
371 };
372
373 let api_key = credential.api_key.to_string();
374 let api_passphrase = credential.api_passphrase.to_string();
375
376 let now = Utc::now();
378 let millis = now.timestamp_subsec_millis();
379 let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string() + &format!(".{:03}Z", millis);
380 let signature = credential.sign_bytes(×tamp, method.as_str(), path, body);
381
382 let mut headers = HashMap::new();
383 headers.insert("OK-ACCESS-KEY".to_string(), api_key.clone());
384 headers.insert("OK-ACCESS-PASSPHRASE".to_string(), api_passphrase);
385 headers.insert("OK-ACCESS-TIMESTAMP".to_string(), timestamp.clone());
386 headers.insert("OK-ACCESS-SIGN".to_string(), signature);
387
388 Ok(headers)
389 }
390
391 async fn send_request<T: DeserializeOwned>(
407 &self,
408 method: Method,
409 path: &str,
410 body: Option<Vec<u8>>,
411 authenticate: bool,
412 ) -> Result<Vec<T>, OKXHttpError> {
413 let url = format!("{}{path}", self.base_url);
414 let endpoint = path.to_string();
415 let method_clone = method.clone();
416 let body_clone = body.clone();
417
418 let operation = || {
419 let url = url.clone();
420 let method = method_clone.clone();
421 let body = body_clone.clone();
422 let endpoint = endpoint.clone();
423
424 async move {
425 let mut headers = if authenticate {
426 self.sign_request(&method, endpoint.as_str(), body.as_deref())?
427 } else {
428 HashMap::new()
429 };
430
431 if body.is_some() {
433 headers.insert("Content-Type".to_string(), "application/json".to_string());
434 }
435
436 let rate_keys = Self::rate_limit_keys(endpoint.as_str());
437 let resp = self
438 .client
439 .request_with_ustr_keys(
440 method.clone(),
441 url,
442 Some(headers),
443 body,
444 None,
445 Some(rate_keys),
446 )
447 .await?;
448
449 tracing::trace!("Response: {resp:?}");
450
451 if resp.status.is_success() {
452 let okx_response: OKXResponse<T> =
453 serde_json::from_slice(&resp.body).map_err(|e| {
454 tracing::error!("Failed to deserialize OKXResponse: {e}");
455 OKXHttpError::JsonError(e.to_string())
456 })?;
457
458 if okx_response.code != OKX_SUCCESS_CODE {
459 return Err(OKXHttpError::OkxError {
460 error_code: okx_response.code,
461 message: okx_response.msg,
462 });
463 }
464
465 Ok(okx_response.data)
466 } else {
467 let error_body = String::from_utf8_lossy(&resp.body);
468 if resp.status.as_u16() == StatusCode::NOT_FOUND.as_u16() {
469 tracing::debug!("HTTP 404 with body: {error_body}");
470 } else {
471 tracing::error!(
472 "HTTP error {} with body: {error_body}",
473 resp.status.as_str()
474 );
475 }
476
477 if let Ok(parsed_error) = serde_json::from_slice::<OKXResponse<T>>(&resp.body) {
478 return Err(OKXHttpError::OkxError {
479 error_code: parsed_error.code,
480 message: parsed_error.msg,
481 });
482 }
483
484 Err(OKXHttpError::UnexpectedStatus {
485 status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
486 body: error_body.to_string(),
487 })
488 }
489 }
490 };
491
492 let should_retry = |error: &OKXHttpError| -> bool {
501 match error {
502 OKXHttpError::HttpClientError(_) => true,
503 OKXHttpError::UnexpectedStatus { status, .. } => {
504 status.as_u16() >= 500 || status.as_u16() == 429
505 }
506 OKXHttpError::OkxError { error_code, .. } => should_retry_error_code(error_code),
507 _ => false,
508 }
509 };
510
511 let create_error = |msg: String| -> OKXHttpError {
512 if msg == "canceled" {
513 OKXHttpError::ValidationError("Request canceled".to_string())
514 } else {
515 OKXHttpError::ValidationError(msg)
516 }
517 };
518
519 self.retry_manager
520 .execute_with_retry_with_cancel(
521 endpoint.as_str(),
522 operation,
523 should_retry,
524 create_error,
525 &self.cancellation_token,
526 )
527 .await
528 }
529
530 pub async fn http_set_position_mode(
541 &self,
542 params: SetPositionModeParams,
543 ) -> Result<Vec<serde_json::Value>, OKXHttpError> {
544 let path = "/api/v5/account/set-position-mode";
545 let body = serde_json::to_vec(¶ms)?;
546 self.send_request(Method::POST, path, Some(body), true)
547 .await
548 }
549
550 pub async fn http_get_position_tiers(
561 &self,
562 params: GetPositionTiersParams,
563 ) -> Result<Vec<OKXPositionTier>, OKXHttpError> {
564 let path = Self::build_path("/api/v5/public/position-tiers", ¶ms)?;
565 self.send_request(Method::GET, &path, None, false).await
566 }
567
568 pub async fn http_get_instruments(
579 &self,
580 params: GetInstrumentsParams,
581 ) -> Result<Vec<OKXInstrument>, OKXHttpError> {
582 let path = Self::build_path("/api/v5/public/instruments", ¶ms)?;
583 self.send_request(Method::GET, &path, None, false).await
584 }
585
586 pub async fn http_get_server_time(&self) -> Result<u64, OKXHttpError> {
600 let response: Vec<OKXServerTime> = self
601 .send_request(Method::GET, "/api/v5/public/time", None, false)
602 .await?;
603 response
604 .first()
605 .map(|t| t.ts)
606 .ok_or_else(|| OKXHttpError::JsonError("Empty server time response".to_string()))
607 }
608
609 pub async fn http_get_mark_price(
623 &self,
624 params: GetMarkPriceParams,
625 ) -> Result<Vec<OKXMarkPrice>, OKXHttpError> {
626 let path = Self::build_path("/api/v5/public/mark-price", ¶ms)?;
627 self.send_request(Method::GET, &path, None, false).await
628 }
629
630 pub async fn http_get_index_ticker(
640 &self,
641 params: GetIndexTickerParams,
642 ) -> Result<Vec<OKXIndexTicker>, OKXHttpError> {
643 let path = Self::build_path("/api/v5/market/index-tickers", ¶ms)?;
644 self.send_request(Method::GET, &path, None, false).await
645 }
646
647 pub async fn http_get_trades(
657 &self,
658 params: GetTradesParams,
659 ) -> Result<Vec<OKXTrade>, OKXHttpError> {
660 let path = Self::build_path("/api/v5/market/history-trades", ¶ms)?;
661 self.send_request(Method::GET, &path, None, false).await
662 }
663
664 pub async fn http_get_candlesticks(
674 &self,
675 params: GetCandlesticksParams,
676 ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
677 let path = Self::build_path("/api/v5/market/candles", ¶ms)?;
678 self.send_request(Method::GET, &path, None, false).await
679 }
680
681 pub async fn http_get_candlesticks_history(
691 &self,
692 params: GetCandlesticksParams,
693 ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
694 let path = Self::build_path("/api/v5/market/history-candles", ¶ms)?;
695 self.send_request(Method::GET, &path, None, false).await
696 }
697
698 pub async fn http_get_pending_orders(
708 &self,
709 params: GetPendingOrdersParams,
710 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
711 let path = Self::build_path("/api/v5/trade/orders-pending", ¶ms)?;
712 self.send_request(Method::GET, &path, None, true).await
713 }
714
715 pub async fn http_get_order(
725 &self,
726 params: GetOrderParams,
727 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
728 let path = Self::build_path("/api/v5/trade/order", ¶ms)?;
729 self.send_request(Method::GET, &path, None, true).await
730 }
731
732 pub async fn http_get_balance(&self) -> Result<Vec<OKXAccount>, OKXHttpError> {
743 let path = "/api/v5/account/balance";
744 self.send_request(Method::GET, path, None, true).await
745 }
746
747 pub async fn http_get_trade_fee(
759 &self,
760 params: GetTradeFeeParams,
761 ) -> Result<Vec<OKXFeeRate>, OKXHttpError> {
762 let path = Self::build_path("/api/v5/account/trade-fee", ¶ms)?;
763 self.send_request(Method::GET, &path, None, true).await
764 }
765
766 pub async fn http_get_order_history(
776 &self,
777 params: GetOrderHistoryParams,
778 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
779 let path = Self::build_path("/api/v5/trade/orders-history", ¶ms)?;
780 self.send_request(Method::GET, &path, None, true).await
781 }
782
783 pub async fn http_get_order_list(
793 &self,
794 params: GetOrderListParams,
795 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
796 let path = Self::build_path("/api/v5/trade/orders-pending", ¶ms)?;
797 self.send_request(Method::GET, &path, None, true).await
798 }
799
800 pub async fn http_get_order_algo_pending(
806 &self,
807 params: GetAlgoOrdersParams,
808 ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
809 let path = Self::build_path("/api/v5/trade/order-algo-pending", ¶ms)?;
810 self.send_request(Method::GET, &path, None, true).await
811 }
812
813 pub async fn http_get_order_algo_history(
819 &self,
820 params: GetAlgoOrdersParams,
821 ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
822 let path = Self::build_path("/api/v5/trade/order-algo-history", ¶ms)?;
823 self.send_request(Method::GET, &path, None, true).await
824 }
825
826 pub async fn http_get_positions(
838 &self,
839 params: GetPositionsParams,
840 ) -> Result<Vec<OKXPosition>, OKXHttpError> {
841 let path = Self::build_path("/api/v5/account/positions", ¶ms)?;
842 self.send_request(Method::GET, &path, None, true).await
843 }
844
845 pub async fn http_get_position_history(
855 &self,
856 params: GetPositionsHistoryParams,
857 ) -> Result<Vec<OKXPositionHistory>, OKXHttpError> {
858 let path = Self::build_path("/api/v5/account/positions-history", ¶ms)?;
859 self.send_request(Method::GET, &path, None, true).await
860 }
861
862 pub async fn http_get_transaction_details(
872 &self,
873 params: GetTransactionDetailsParams,
874 ) -> Result<Vec<OKXTransactionDetail>, OKXHttpError> {
875 let path = Self::build_path("/api/v5/trade/fills", ¶ms)?;
876 self.send_request(Method::GET, &path, None, true).await
877 }
878}
879
880#[derive(Clone, Debug)]
885#[cfg_attr(
886 feature = "python",
887 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
888)]
889pub struct OKXHttpClient {
890 pub(crate) inner: Arc<OKXHttpInnerClient>,
891 pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
892 cache_initialized: bool,
893}
894
895impl Default for OKXHttpClient {
896 fn default() -> Self {
897 Self::new(None, Some(60), None, None, None, false)
898 .expect("Failed to create default OKXHttpClient")
899 }
900}
901
902impl OKXHttpClient {
903 pub fn new(
913 base_url: Option<String>,
914 timeout_secs: Option<u64>,
915 max_retries: Option<u32>,
916 retry_delay_ms: Option<u64>,
917 retry_delay_max_ms: Option<u64>,
918 is_demo: bool,
919 ) -> anyhow::Result<Self> {
920 Ok(Self {
921 inner: Arc::new(OKXHttpInnerClient::new(
922 base_url,
923 timeout_secs,
924 max_retries,
925 retry_delay_ms,
926 retry_delay_max_ms,
927 is_demo,
928 )?),
929 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
930 cache_initialized: false,
931 })
932 }
933
934 pub fn from_env() -> anyhow::Result<Self> {
941 Self::with_credentials(None, None, None, None, None, None, None, None, false)
942 }
943
944 #[allow(clippy::too_many_arguments)]
951 pub fn with_credentials(
952 api_key: Option<String>,
953 api_secret: Option<String>,
954 api_passphrase: Option<String>,
955 base_url: Option<String>,
956 timeout_secs: Option<u64>,
957 max_retries: Option<u32>,
958 retry_delay_ms: Option<u64>,
959 retry_delay_max_ms: Option<u64>,
960 is_demo: bool,
961 ) -> anyhow::Result<Self> {
962 let api_key = get_or_env_var(api_key, "OKX_API_KEY")?;
963 let api_secret = get_or_env_var(api_secret, "OKX_API_SECRET")?;
964 let api_passphrase = get_or_env_var(api_passphrase, "OKX_API_PASSPHRASE")?;
965 let base_url = base_url.unwrap_or(OKX_HTTP_URL.to_string());
966
967 Ok(Self {
968 inner: Arc::new(OKXHttpInnerClient::with_credentials(
969 api_key,
970 api_secret,
971 api_passphrase,
972 base_url,
973 timeout_secs,
974 max_retries,
975 retry_delay_ms,
976 retry_delay_max_ms,
977 is_demo,
978 )?),
979 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
980 cache_initialized: false,
981 })
982 }
983
984 fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
990 self.instruments_cache
991 .lock()
992 .expect("`instruments_cache` lock poisoned")
993 .get(&symbol)
994 .cloned()
995 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
996 }
997
998 async fn instrument_or_fetch(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
999 if let Ok(inst) = self.get_instrument_from_cache(symbol) {
1000 return Ok(inst);
1001 }
1002
1003 for group in [
1004 OKXInstrumentType::Spot,
1005 OKXInstrumentType::Margin,
1006 OKXInstrumentType::Futures,
1007 ] {
1008 if let Ok(instruments) = self.request_instruments(group, None).await {
1009 let mut guard = self.instruments_cache.lock().unwrap();
1010 for inst in instruments {
1011 guard.insert(inst.raw_symbol().inner(), inst);
1012 }
1013 drop(guard);
1014
1015 if let Ok(inst) = self.get_instrument_from_cache(symbol) {
1016 return Ok(inst);
1017 }
1018 }
1019 }
1020
1021 anyhow::bail!("Instrument {symbol} not in cache and fetch failed");
1022 }
1023
1024 pub fn cancel_all_requests(&self) {
1026 self.inner.cancel_all_requests();
1027 }
1028
1029 pub fn cancellation_token(&self) -> &CancellationToken {
1031 self.inner.cancellation_token()
1032 }
1033
1034 pub fn base_url(&self) -> &str {
1036 self.inner.base_url.as_str()
1037 }
1038
1039 pub fn api_key(&self) -> Option<&str> {
1041 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
1042 }
1043
1044 #[must_use]
1046 pub fn is_demo(&self) -> bool {
1047 self.inner.is_demo
1048 }
1049
1050 pub async fn http_get_server_time(&self) -> Result<u64, OKXHttpError> {
1058 self.inner.http_get_server_time().await
1059 }
1060
1061 #[must_use]
1065 pub const fn is_initialized(&self) -> bool {
1066 self.cache_initialized
1067 }
1068
1069 fn generate_ts_init(&self) -> UnixNanos {
1071 get_atomic_clock_realtime().get_time_ns()
1072 }
1073
1074 #[must_use]
1076 pub fn get_cached_symbols(&self) -> Vec<String> {
1084 self.instruments_cache
1085 .lock()
1086 .unwrap()
1087 .keys()
1088 .map(std::string::ToString::to_string)
1089 .collect()
1090 }
1091
1092 pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
1101 for inst in instruments {
1102 self.instruments_cache
1103 .lock()
1104 .unwrap()
1105 .insert(inst.raw_symbol().inner(), inst);
1106 }
1107 self.cache_initialized = true;
1108 }
1109
1110 pub fn add_instrument(&mut self, instrument: InstrumentAny) {
1119 self.instruments_cache
1120 .lock()
1121 .unwrap()
1122 .insert(instrument.raw_symbol().inner(), instrument);
1123 self.cache_initialized = true;
1124 }
1125
1126 pub async fn request_account_state(
1132 &self,
1133 account_id: AccountId,
1134 ) -> anyhow::Result<AccountState> {
1135 let resp = self
1136 .inner
1137 .http_get_balance()
1138 .await
1139 .map_err(|e| anyhow::anyhow!(e))?;
1140
1141 let ts_init = self.generate_ts_init();
1142 let raw = resp
1143 .first()
1144 .ok_or_else(|| anyhow::anyhow!("No account state returned from OKX"))?;
1145 let account_state = parse_account_state(raw, account_id, ts_init)?;
1146
1147 Ok(account_state)
1148 }
1149
1150 pub async fn request_vip_level(&self) -> anyhow::Result<Option<OKXVipLevel>> {
1159 let simple_types = [OKXInstrumentType::Spot, OKXInstrumentType::Margin];
1162
1163 for inst_type in simple_types {
1164 let params = GetTradeFeeParams {
1165 inst_type,
1166 inst_family: None,
1167 uly: None,
1168 };
1169
1170 match self.inner.http_get_trade_fee(params).await {
1171 Ok(resp) => {
1172 if let Some(fee_rate) = resp.first() {
1173 tracing::info!("Detected OKX VIP level: {}", fee_rate.level);
1174 return Ok(Some(fee_rate.level));
1175 }
1176 }
1177 Err(e) => {
1178 tracing::debug!(
1179 "Failed to query fee rates for {inst_type:?}: {e}, trying next type"
1180 );
1181 continue;
1182 }
1183 }
1184 }
1185
1186 let derivatives_types = [
1188 OKXInstrumentType::Swap,
1189 OKXInstrumentType::Futures,
1190 OKXInstrumentType::Option,
1191 ];
1192
1193 let inst_families = ["BTC-USD", "ETH-USD", "BTC-USDT", "ETH-USDT"];
1195
1196 for inst_type in derivatives_types {
1197 for family in inst_families {
1198 let params = GetTradeFeeParams {
1199 inst_type,
1200 inst_family: Some(family.to_string()),
1201 uly: None,
1202 };
1203
1204 match self.inner.http_get_trade_fee(params).await {
1205 Ok(resp) => {
1206 if let Some(fee_rate) = resp.first() {
1207 tracing::info!("Detected OKX VIP level: {}", fee_rate.level);
1208 return Ok(Some(fee_rate.level));
1209 }
1210 }
1211 Err(e) => {
1212 tracing::debug!(
1213 "Failed to query fee rates for {inst_type:?} family {family}: {e}"
1214 );
1215 continue;
1216 }
1217 }
1218 }
1219 }
1220
1221 tracing::warn!("Unable to query VIP level from any instrument type or family");
1222 Ok(None)
1223 }
1224
1225 pub async fn set_position_mode(&self, position_mode: OKXPositionMode) -> anyhow::Result<()> {
1238 let mut params = SetPositionModeParamsBuilder::default();
1239 params.pos_mode(position_mode);
1240 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1241
1242 match self.inner.http_set_position_mode(params).await {
1243 Ok(_) => Ok(()),
1244 Err(e) => {
1245 if let OKXHttpError::OkxError {
1246 error_code,
1247 message,
1248 } = &e
1249 && error_code == "50115"
1250 {
1251 tracing::warn!(
1252 "Account does not support position mode setting (derivatives trading not enabled): {message}"
1253 );
1254 return Ok(()); }
1256 anyhow::bail!(e)
1257 }
1258 }
1259 }
1260
1261 pub async fn request_instruments(
1267 &self,
1268 instrument_type: OKXInstrumentType,
1269 instrument_family: Option<String>,
1270 ) -> anyhow::Result<Vec<InstrumentAny>> {
1271 let mut params = GetInstrumentsParamsBuilder::default();
1272 params.inst_type(instrument_type);
1273
1274 if let Some(family) = instrument_family {
1275 params.inst_family(family);
1276 }
1277
1278 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1279
1280 let resp = self
1281 .inner
1282 .http_get_instruments(params)
1283 .await
1284 .map_err(|e| anyhow::anyhow!(e))?;
1285
1286 let ts_init = self.generate_ts_init();
1287
1288 let mut instruments: Vec<InstrumentAny> = Vec::new();
1289 for inst in &resp {
1290 if let Some(instrument_any) = parse_instrument_any(inst, ts_init)? {
1291 instruments.push(instrument_any);
1292 }
1293 }
1294
1295 Ok(instruments)
1296 }
1297
1298 pub async fn request_mark_price(
1304 &self,
1305 instrument_id: InstrumentId,
1306 ) -> anyhow::Result<MarkPriceUpdate> {
1307 let mut params = GetMarkPriceParamsBuilder::default();
1308 params.inst_id(instrument_id.symbol.inner());
1309 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1310
1311 let resp = self
1312 .inner
1313 .http_get_mark_price(params)
1314 .await
1315 .map_err(|e| anyhow::anyhow!(e))?;
1316
1317 let raw = resp
1318 .first()
1319 .ok_or_else(|| anyhow::anyhow!("No mark price returned from OKX"))?;
1320 let inst = self
1321 .instrument_or_fetch(instrument_id.symbol.inner())
1322 .await?;
1323 let ts_init = self.generate_ts_init();
1324
1325 let mark_price =
1326 parse_mark_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1327 .map_err(|e| anyhow::anyhow!(e))?;
1328 Ok(mark_price)
1329 }
1330
1331 pub async fn request_index_price(
1337 &self,
1338 instrument_id: InstrumentId,
1339 ) -> anyhow::Result<IndexPriceUpdate> {
1340 let mut params = GetIndexTickerParamsBuilder::default();
1341 params.inst_id(instrument_id.symbol.inner());
1342 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1343
1344 let resp = self
1345 .inner
1346 .http_get_index_ticker(params)
1347 .await
1348 .map_err(|e| anyhow::anyhow!(e))?;
1349
1350 let raw = resp
1351 .first()
1352 .ok_or_else(|| anyhow::anyhow!("No index price returned from OKX"))?;
1353 let inst = self
1354 .instrument_or_fetch(instrument_id.symbol.inner())
1355 .await?;
1356 let ts_init = self.generate_ts_init();
1357
1358 let index_price =
1359 parse_index_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1360 .map_err(|e| anyhow::anyhow!(e))?;
1361 Ok(index_price)
1362 }
1363
1364 pub async fn request_trades(
1370 &self,
1371 instrument_id: InstrumentId,
1372 start: Option<DateTime<Utc>>,
1373 end: Option<DateTime<Utc>>,
1374 limit: Option<u32>,
1375 ) -> anyhow::Result<Vec<TradeTick>> {
1376 let mut params = GetTradesParamsBuilder::default();
1377
1378 params.inst_id(instrument_id.symbol.inner());
1379 if let Some(s) = start {
1380 params.after(s.timestamp_millis().to_string());
1381 }
1382 if let Some(e) = end {
1383 params.before(e.timestamp_millis().to_string());
1384 }
1385 const OKX_TRADES_MAX_LIMIT: u32 = 100;
1389 if let Some(l) = limit
1390 && l > 0
1391 {
1392 params.limit(l.min(OKX_TRADES_MAX_LIMIT));
1393 }
1394
1395 let params = params.build().map_err(anyhow::Error::new)?;
1396
1397 let raw_trades = self
1399 .inner
1400 .http_get_trades(params)
1401 .await
1402 .map_err(anyhow::Error::new)?;
1403
1404 let ts_init = self.generate_ts_init();
1405 let inst = self
1406 .instrument_or_fetch(instrument_id.symbol.inner())
1407 .await?;
1408
1409 let mut trades = Vec::with_capacity(raw_trades.len());
1410 for raw in raw_trades {
1411 match parse_trade_tick(
1412 &raw,
1413 instrument_id,
1414 inst.price_precision(),
1415 inst.size_precision(),
1416 ts_init,
1417 ) {
1418 Ok(trade) => trades.push(trade),
1419 Err(e) => tracing::error!("{e}"),
1420 }
1421 }
1422
1423 Ok(trades)
1424 }
1425
1426 pub async fn request_bars(
1471 &self,
1472 bar_type: BarType,
1473 start: Option<DateTime<Utc>>,
1474 mut end: Option<DateTime<Utc>>,
1475 limit: Option<u32>,
1476 ) -> anyhow::Result<Vec<Bar>> {
1477 const HISTORY_SPLIT_DAYS: i64 = 100;
1478 const MAX_PAGES_SOFT: usize = 500;
1479
1480 let limit = if limit == Some(0) { None } else { limit };
1481
1482 anyhow::ensure!(
1483 bar_type.aggregation_source() == AggregationSource::External,
1484 "Only EXTERNAL aggregation is supported"
1485 );
1486 if let (Some(s), Some(e)) = (start, end) {
1487 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1488 }
1489
1490 let now = Utc::now();
1491 if let Some(s) = start
1492 && s > now
1493 {
1494 return Ok(Vec::new());
1495 }
1496 if let Some(e) = end
1497 && e > now
1498 {
1499 end = Some(now);
1500 }
1501
1502 let spec = bar_type.spec();
1503 let step = spec.step.get();
1504 let bar_param = match spec.aggregation {
1505 BarAggregation::Second => format!("{step}s"),
1506 BarAggregation::Minute => format!("{step}m"),
1507 BarAggregation::Hour => format!("{step}H"),
1508 BarAggregation::Day => format!("{step}D"),
1509 BarAggregation::Week => format!("{step}W"),
1510 BarAggregation::Month => format!("{step}M"),
1511 a => anyhow::bail!("OKX does not support {a:?} aggregation"),
1512 };
1513
1514 let slot_ms: i64 = match spec.aggregation {
1515 BarAggregation::Second => (step as i64) * 1_000,
1516 BarAggregation::Minute => (step as i64) * 60_000,
1517 BarAggregation::Hour => (step as i64) * 3_600_000,
1518 BarAggregation::Day => (step as i64) * 86_400_000,
1519 BarAggregation::Week => (step as i64) * 7 * 86_400_000,
1520 BarAggregation::Month => (step as i64) * 30 * 86_400_000,
1521 _ => unreachable!("Unsupported aggregation should have been caught above"),
1522 };
1523 let slot_ns: i64 = slot_ms * 1_000_000;
1524
1525 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1526 enum Mode {
1527 Latest,
1528 Backward,
1529 Range,
1530 }
1531
1532 let mode = match (start, end) {
1533 (None, None) => Mode::Latest,
1534 (Some(_), None) => Mode::Backward, (None, Some(_)) => Mode::Backward,
1536 (Some(_), Some(_)) => Mode::Range,
1537 };
1538
1539 let start_ns = start.and_then(|s| s.timestamp_nanos_opt());
1540 let end_ns = end.and_then(|e| e.timestamp_nanos_opt());
1541
1542 let start_ms = start.map(|s| {
1544 let ms = s.timestamp_millis();
1545 if slot_ms > 0 {
1546 (ms / slot_ms) * slot_ms } else {
1548 ms
1549 }
1550 });
1551 let end_ms = end.map(|e| {
1552 let ms = e.timestamp_millis();
1553 if slot_ms > 0 {
1554 ((ms + slot_ms - 1) / slot_ms) * slot_ms } else {
1556 ms
1557 }
1558 });
1559 let now_ms = now.timestamp_millis();
1560
1561 let symbol = bar_type.instrument_id().symbol;
1562 let inst = self.instrument_or_fetch(symbol.inner()).await?;
1563
1564 let mut out: Vec<Bar> = Vec::new();
1565 let mut pages = 0usize;
1566
1567 let mut after_ms: Option<i64> = None;
1572 let mut before_ms: Option<i64> = match mode {
1573 Mode::Backward => end_ms.map(|v| v.saturating_sub(1)),
1574 Mode::Range => {
1575 Some(end_ms.unwrap_or(now_ms))
1578 }
1579 Mode::Latest => None,
1580 };
1581
1582 let mut forward_prepend_mode = matches!(mode, Mode::Range);
1584
1585 if matches!(mode, Mode::Backward | Mode::Range)
1589 && let Some(b) = before_ms
1590 {
1591 let buffer_ms = slot_ms.max(60_000); if b >= now_ms.saturating_sub(buffer_ms) {
1597 before_ms = Some(now_ms.saturating_sub(buffer_ms));
1598 }
1599 }
1600
1601 let mut have_latest_first_page = false;
1602 let mut progressless_loops = 0u8;
1603
1604 loop {
1605 if let Some(lim) = limit
1606 && lim > 0
1607 && out.len() >= lim as usize
1608 {
1609 break;
1610 }
1611 if pages >= MAX_PAGES_SOFT {
1612 break;
1613 }
1614
1615 let pivot_ms = if let Some(a) = after_ms {
1616 a
1617 } else if let Some(b) = before_ms {
1618 b
1619 } else {
1620 now_ms
1621 };
1622 let age_ms = now_ms.saturating_sub(pivot_ms);
1628 let age_hours = age_ms / (60 * 60 * 1000);
1629 let using_history = age_hours > 1; let page_ceiling = if using_history { 100 } else { 300 };
1632 let remaining = limit
1633 .filter(|&l| l > 0) .map(|l| (l as usize).saturating_sub(out.len()))
1635 .unwrap_or(page_ceiling);
1636 let page_cap = remaining.min(page_ceiling);
1637
1638 let mut p = GetCandlesticksParamsBuilder::default();
1639 p.inst_id(symbol.as_str())
1640 .bar(&bar_param)
1641 .limit(page_cap as u32);
1642
1643 let mut req_used_before = false;
1645
1646 match mode {
1647 Mode::Latest => {
1648 if have_latest_first_page && let Some(b) = before_ms {
1649 p.before_ms(b);
1650 req_used_before = true;
1651 }
1652 }
1653 Mode::Backward => {
1654 if let Some(b) = before_ms {
1655 p.before_ms(b);
1656 req_used_before = true;
1657 }
1658 }
1659 Mode::Range => {
1660 if pages == 0 && !using_history {
1663 } else if forward_prepend_mode {
1666 if let Some(b) = before_ms {
1667 p.before_ms(b);
1668 req_used_before = true;
1669 }
1670 } else if let Some(a) = after_ms {
1671 p.after_ms(a);
1672 }
1673 }
1674 }
1675
1676 let params = p.build().map_err(anyhow::Error::new)?;
1677
1678 let mut raw = if using_history {
1679 self.inner
1680 .http_get_candlesticks_history(params.clone())
1681 .await
1682 .map_err(anyhow::Error::new)?
1683 } else {
1684 self.inner
1685 .http_get_candlesticks(params.clone())
1686 .await
1687 .map_err(anyhow::Error::new)?
1688 };
1689
1690 if raw.is_empty() {
1692 if matches!(mode, Mode::Latest)
1694 && have_latest_first_page
1695 && !using_history
1696 && let Some(b) = before_ms
1697 {
1698 let mut p2 = GetCandlesticksParamsBuilder::default();
1699 p2.inst_id(symbol.as_str())
1700 .bar(&bar_param)
1701 .limit(page_cap as u32);
1702 p2.before_ms(b);
1703 let params2 = p2.build().map_err(anyhow::Error::new)?;
1704 let raw2 = self
1705 .inner
1706 .http_get_candlesticks_history(params2)
1707 .await
1708 .map_err(anyhow::Error::new)?;
1709 if !raw2.is_empty() {
1710 raw = raw2;
1711 } else {
1712 let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
1714 before_ms = Some(b.saturating_sub(jump));
1715 progressless_loops = progressless_loops.saturating_add(1);
1716 if progressless_loops >= 3 {
1717 break;
1718 }
1719 continue;
1720 }
1721 }
1722
1723 if raw.is_empty() && matches!(mode, Mode::Range) && pages > 0 {
1727 let backstep_ms = (page_cap as i64).saturating_mul(slot_ms.max(1));
1728 let pivot_back = after_ms.unwrap_or(now_ms).saturating_sub(backstep_ms);
1729
1730 let mut p2 = GetCandlesticksParamsBuilder::default();
1731 p2.inst_id(symbol.as_str())
1732 .bar(&bar_param)
1733 .limit(page_cap as u32)
1734 .before_ms(pivot_back);
1735 let params2 = p2.build().map_err(anyhow::Error::new)?;
1736 let raw2 = if (now_ms.saturating_sub(pivot_back)) / (24 * 60 * 60 * 1000)
1737 > HISTORY_SPLIT_DAYS
1738 {
1739 self.inner.http_get_candlesticks_history(params2).await
1740 } else {
1741 self.inner.http_get_candlesticks(params2).await
1742 }
1743 .map_err(anyhow::Error::new)?;
1744 if raw2.is_empty() {
1745 break;
1746 } else {
1747 raw = raw2;
1748 forward_prepend_mode = true;
1749 req_used_before = true;
1750 }
1751 }
1752
1753 if raw.is_empty()
1755 && matches!(mode, Mode::Latest)
1756 && !have_latest_first_page
1757 && !using_history
1758 {
1759 let jump_days_ms = (HISTORY_SPLIT_DAYS + 1) * 86_400_000;
1760 before_ms = Some(now_ms.saturating_sub(jump_days_ms));
1761 have_latest_first_page = true;
1762 continue;
1763 }
1764
1765 if raw.is_empty() {
1767 break;
1768 }
1769 }
1770 pages += 1;
1773
1774 let ts_init = self.generate_ts_init();
1776 let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
1777 for r in &raw {
1778 page.push(parse_candlestick(
1779 r,
1780 bar_type,
1781 inst.price_precision(),
1782 inst.size_precision(),
1783 ts_init,
1784 )?);
1785 }
1786 page.reverse();
1787
1788 let page_oldest_ms = page.first().map(|b| b.ts_event.as_i64() / 1_000_000);
1789 let page_newest_ms = page.last().map(|b| b.ts_event.as_i64() / 1_000_000);
1790
1791 let mut filtered: Vec<Bar> = if matches!(mode, Mode::Range)
1795 && out.is_empty()
1796 && pages < 2
1797 {
1798 let tolerance_ns = slot_ns * 2; if !page.is_empty() {
1805 tracing::debug!(
1806 "Range mode bootstrap page: {} bars from {} to {}, filtering with start={:?} end={:?}",
1807 page.len(),
1808 page.first().unwrap().ts_event.as_i64() / 1_000_000,
1809 page.last().unwrap().ts_event.as_i64() / 1_000_000,
1810 start_ms,
1811 end_ms
1812 );
1813 }
1814
1815 let result: Vec<Bar> = page
1816 .clone()
1817 .into_iter()
1818 .filter(|b| {
1819 let ts = b.ts_event.as_i64();
1820 let ok_after =
1822 start_ns.is_none_or(|sns| ts >= sns.saturating_sub(tolerance_ns));
1823 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1824 ok_after && ok_before
1825 })
1826 .collect();
1827
1828 result
1829 } else {
1830 page.clone()
1832 .into_iter()
1833 .filter(|b| {
1834 let ts = b.ts_event.as_i64();
1835 let ok_after = start_ns.is_none_or(|sns| ts >= sns);
1836 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1837 ok_after && ok_before
1838 })
1839 .collect()
1840 };
1841
1842 if !page.is_empty() && filtered.is_empty() {
1843 if matches!(mode, Mode::Range)
1845 && !forward_prepend_mode
1846 && let (Some(newest_ms), Some(start_ms)) = (page_newest_ms, start_ms)
1847 && newest_ms < start_ms.saturating_sub(slot_ms * 2)
1848 {
1849 break;
1851 }
1852 }
1853
1854 let contribution;
1856
1857 if out.is_empty() {
1858 contribution = filtered.len();
1859 out = filtered;
1860 } else {
1861 match mode {
1862 Mode::Backward | Mode::Latest => {
1863 if let Some(first) = out.first() {
1864 filtered.retain(|b| b.ts_event < first.ts_event);
1865 }
1866 contribution = filtered.len();
1867 if contribution != 0 {
1868 let mut new_out = Vec::with_capacity(out.len() + filtered.len());
1869 new_out.extend_from_slice(&filtered);
1870 new_out.extend_from_slice(&out);
1871 out = new_out;
1872 }
1873 }
1874 Mode::Range => {
1875 if forward_prepend_mode || req_used_before {
1876 if let Some(first) = out.first() {
1878 filtered.retain(|b| b.ts_event < first.ts_event);
1879 }
1880 contribution = filtered.len();
1881 if contribution != 0 {
1882 let mut new_out = Vec::with_capacity(out.len() + filtered.len());
1883 new_out.extend_from_slice(&filtered);
1884 new_out.extend_from_slice(&out);
1885 out = new_out;
1886 }
1887 } else {
1888 if let Some(last) = out.last() {
1890 filtered.retain(|b| b.ts_event > last.ts_event);
1891 }
1892 contribution = filtered.len();
1893 out.extend(filtered);
1894 }
1895 }
1896 }
1897 }
1898
1899 if contribution == 0
1901 && matches!(mode, Mode::Latest | Mode::Backward)
1902 && let Some(b) = before_ms
1903 {
1904 let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
1905 let new_b = b.saturating_sub(jump);
1906 if new_b != b {
1907 before_ms = Some(new_b);
1908 }
1909 }
1910
1911 if contribution == 0 {
1912 progressless_loops = progressless_loops.saturating_add(1);
1913 if progressless_loops >= 3 {
1914 break;
1915 }
1916 } else {
1917 progressless_loops = 0;
1918
1919 match mode {
1921 Mode::Latest | Mode::Backward => {
1922 if let Some(oldest) = page_oldest_ms {
1923 before_ms = Some(oldest.saturating_sub(1));
1924 have_latest_first_page = true;
1925 } else {
1926 break;
1927 }
1928 }
1929 Mode::Range => {
1930 if forward_prepend_mode || req_used_before {
1931 if let Some(oldest) = page_oldest_ms {
1932 let jump_back = slot_ms.max(60_000); before_ms = Some(oldest.saturating_sub(jump_back));
1935 after_ms = None;
1936 } else {
1937 break;
1938 }
1939 } else if let Some(newest) = page_newest_ms {
1940 after_ms = Some(newest.saturating_add(1));
1941 before_ms = None;
1942 } else {
1943 break;
1944 }
1945 }
1946 }
1947 }
1948
1949 if let Some(lim) = limit
1951 && lim > 0
1952 && out.len() >= lim as usize
1953 {
1954 break;
1955 }
1956 if let Some(ens) = end_ns
1957 && let Some(last) = out.last()
1958 && last.ts_event.as_i64() >= ens
1959 {
1960 break;
1961 }
1962 if let Some(sns) = start_ns
1963 && let Some(first) = out.first()
1964 && (matches!(mode, Mode::Backward) || forward_prepend_mode)
1965 && first.ts_event.as_i64() <= sns
1966 {
1967 if matches!(mode, Mode::Range) {
1969 if let Some(ens) = end_ns
1971 && let Some(last) = out.last()
1972 {
1973 let last_ts = last.ts_event.as_i64();
1974 if last_ts < ens {
1975 forward_prepend_mode = false;
1978 after_ms = Some((last_ts / 1_000_000).saturating_add(1));
1979 before_ms = None;
1980 continue;
1981 }
1982 }
1983 }
1984 break;
1985 }
1986
1987 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1988 }
1989
1990 if out.is_empty() && matches!(mode, Mode::Range) {
1992 let pivot = end_ms.unwrap_or(now_ms.saturating_sub(1));
1993 let hist = (now_ms.saturating_sub(pivot)) / (24 * 60 * 60 * 1000) > HISTORY_SPLIT_DAYS;
1994 let mut p = GetCandlesticksParamsBuilder::default();
1995 p.inst_id(symbol.as_str())
1996 .bar(&bar_param)
1997 .limit(300)
1998 .before_ms(pivot);
1999 let params = p.build().map_err(anyhow::Error::new)?;
2000 let raw = if hist {
2001 self.inner.http_get_candlesticks_history(params).await
2002 } else {
2003 self.inner.http_get_candlesticks(params).await
2004 }
2005 .map_err(anyhow::Error::new)?;
2006 if !raw.is_empty() {
2007 let ts_init = self.generate_ts_init();
2008 let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2009 for r in &raw {
2010 page.push(parse_candlestick(
2011 r,
2012 bar_type,
2013 inst.price_precision(),
2014 inst.size_precision(),
2015 ts_init,
2016 )?);
2017 }
2018 page.reverse();
2019 out = page
2020 .into_iter()
2021 .filter(|b| {
2022 let ts = b.ts_event.as_i64();
2023 let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2024 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2025 ok_after && ok_before
2026 })
2027 .collect();
2028 }
2029 }
2030
2031 if let Some(ens) = end_ns {
2033 while out.last().is_some_and(|b| b.ts_event.as_i64() > ens) {
2034 out.pop();
2035 }
2036 }
2037
2038 if matches!(mode, Mode::Range)
2040 && !forward_prepend_mode
2041 && let Some(sns) = start_ns
2042 {
2043 let lower = sns.saturating_sub(slot_ns);
2044 while out.first().is_some_and(|b| b.ts_event.as_i64() < lower) {
2045 out.remove(0);
2046 }
2047 }
2048
2049 if let Some(lim) = limit
2050 && lim > 0
2051 && out.len() > lim as usize
2052 {
2053 out.truncate(lim as usize);
2054 }
2055
2056 Ok(out)
2057 }
2058
2059 #[allow(clippy::too_many_arguments)]
2070 pub async fn request_order_status_reports(
2071 &self,
2072 account_id: AccountId,
2073 instrument_type: Option<OKXInstrumentType>,
2074 instrument_id: Option<InstrumentId>,
2075 start: Option<DateTime<Utc>>,
2076 end: Option<DateTime<Utc>>,
2077 open_only: bool,
2078 limit: Option<u32>,
2079 ) -> anyhow::Result<Vec<OrderStatusReport>> {
2080 let mut history_params = GetOrderHistoryParamsBuilder::default();
2081
2082 let instrument_type = if let Some(instrument_type) = instrument_type {
2083 instrument_type
2084 } else {
2085 let instrument_id = instrument_id.ok_or_else(|| {
2086 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2087 })?;
2088 let instrument = self
2089 .instrument_or_fetch(instrument_id.symbol.inner())
2090 .await?;
2091 okx_instrument_type(&instrument)?
2092 };
2093
2094 history_params.inst_type(instrument_type);
2095
2096 if let Some(instrument_id) = instrument_id.as_ref() {
2097 history_params.inst_id(instrument_id.symbol.inner().to_string());
2098 }
2099
2100 if let Some(limit) = limit {
2101 history_params.limit(limit);
2102 }
2103
2104 let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
2105
2106 let mut pending_params = GetOrderListParamsBuilder::default();
2107 pending_params.inst_type(instrument_type);
2108
2109 if let Some(instrument_id) = instrument_id.as_ref() {
2110 pending_params.inst_id(instrument_id.symbol.inner().to_string());
2111 }
2112
2113 if let Some(limit) = limit {
2114 pending_params.limit(limit);
2115 }
2116
2117 let pending_params = pending_params.build().map_err(|e| anyhow::anyhow!(e))?;
2118
2119 let combined_resp = if open_only {
2120 self.inner
2122 .http_get_order_list(pending_params)
2123 .await
2124 .map_err(|e| anyhow::anyhow!(e))?
2125 } else {
2126 let (history_resp, pending_resp) = tokio::try_join!(
2128 self.inner.http_get_order_history(history_params),
2129 self.inner.http_get_order_list(pending_params)
2130 )
2131 .map_err(|e| anyhow::anyhow!(e))?;
2132
2133 let mut combined_resp = history_resp;
2135 combined_resp.extend(pending_resp);
2136 combined_resp
2137 };
2138
2139 let start_ns = start.map(UnixNanos::from);
2141 let end_ns = end.map(UnixNanos::from);
2142
2143 let ts_init = self.generate_ts_init();
2144 let mut reports = Vec::with_capacity(combined_resp.len());
2145
2146 let mut seen: AHashSet<String> = AHashSet::new();
2148
2149 for order in combined_resp {
2150 let seen_key = if !order.cl_ord_id.is_empty() {
2151 order.cl_ord_id.as_str().to_string()
2152 } else if let Some(algo_cl_ord_id) = order
2153 .algo_cl_ord_id
2154 .as_ref()
2155 .filter(|value| !value.as_str().is_empty())
2156 {
2157 algo_cl_ord_id.as_str().to_string()
2158 } else if let Some(algo_id) = order
2159 .algo_id
2160 .as_ref()
2161 .filter(|value| !value.as_str().is_empty())
2162 {
2163 algo_id.as_str().to_string()
2164 } else {
2165 order.ord_id.as_str().to_string()
2166 };
2167
2168 if !seen.insert(seen_key) {
2169 continue; }
2171
2172 let inst = self.instrument_or_fetch(order.inst_id).await?;
2173
2174 let report = parse_order_status_report(
2175 &order,
2176 account_id,
2177 inst.id(),
2178 inst.price_precision(),
2179 inst.size_precision(),
2180 ts_init,
2181 );
2182
2183 if let Some(start_ns) = start_ns
2184 && report.ts_last < start_ns
2185 {
2186 continue;
2187 }
2188 if let Some(end_ns) = end_ns
2189 && report.ts_last > end_ns
2190 {
2191 continue;
2192 }
2193
2194 reports.push(report);
2195 }
2196
2197 Ok(reports)
2198 }
2199
2200 pub async fn request_fill_reports(
2210 &self,
2211 account_id: AccountId,
2212 instrument_type: Option<OKXInstrumentType>,
2213 instrument_id: Option<InstrumentId>,
2214 start: Option<DateTime<Utc>>,
2215 end: Option<DateTime<Utc>>,
2216 limit: Option<u32>,
2217 ) -> anyhow::Result<Vec<FillReport>> {
2218 let mut params = GetTransactionDetailsParamsBuilder::default();
2219
2220 let instrument_type = if let Some(instrument_type) = instrument_type {
2221 instrument_type
2222 } else {
2223 let instrument_id = instrument_id.ok_or_else(|| {
2224 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2225 })?;
2226 let instrument = self
2227 .instrument_or_fetch(instrument_id.symbol.inner())
2228 .await?;
2229 okx_instrument_type(&instrument)?
2230 };
2231
2232 params.inst_type(instrument_type);
2233
2234 if let Some(instrument_id) = instrument_id {
2235 let instrument = self
2236 .instrument_or_fetch(instrument_id.symbol.inner())
2237 .await?;
2238 let instrument_type = okx_instrument_type(&instrument)?;
2239 params.inst_type(instrument_type);
2240 params.inst_id(instrument_id.symbol.inner().to_string());
2241 }
2242
2243 if let Some(limit) = limit {
2244 params.limit(limit);
2245 }
2246
2247 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2248
2249 let resp = self
2250 .inner
2251 .http_get_transaction_details(params)
2252 .await
2253 .map_err(|e| anyhow::anyhow!(e))?;
2254
2255 let start_ns = start.map(UnixNanos::from);
2257 let end_ns = end.map(UnixNanos::from);
2258
2259 let ts_init = self.generate_ts_init();
2260 let mut reports = Vec::with_capacity(resp.len());
2261
2262 for detail in resp {
2263 let inst = self.instrument_or_fetch(detail.inst_id).await?;
2264
2265 let report = parse_fill_report(
2266 detail,
2267 account_id,
2268 inst.id(),
2269 inst.price_precision(),
2270 inst.size_precision(),
2271 ts_init,
2272 )?;
2273
2274 if let Some(start_ns) = start_ns
2275 && report.ts_event < start_ns
2276 {
2277 continue;
2278 }
2279
2280 if let Some(end_ns) = end_ns
2281 && report.ts_event > end_ns
2282 {
2283 continue;
2284 }
2285
2286 reports.push(report);
2287 }
2288
2289 Ok(reports)
2290 }
2291
2292 pub async fn request_position_status_reports(
2302 &self,
2303 account_id: AccountId,
2304 instrument_type: Option<OKXInstrumentType>,
2305 instrument_id: Option<InstrumentId>,
2306 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2307 let mut params = GetPositionsParamsBuilder::default();
2308
2309 let instrument_type = if let Some(instrument_type) = instrument_type {
2310 instrument_type
2311 } else {
2312 let instrument_id = instrument_id.ok_or_else(|| {
2313 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
2314 })?;
2315 let instrument = self
2316 .instrument_or_fetch(instrument_id.symbol.inner())
2317 .await?;
2318 okx_instrument_type(&instrument)?
2319 };
2320
2321 params.inst_type(instrument_type);
2322
2323 instrument_id
2324 .as_ref()
2325 .map(|i| params.inst_id(i.symbol.inner()));
2326
2327 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2328
2329 let resp = self
2330 .inner
2331 .http_get_positions(params)
2332 .await
2333 .map_err(|e| anyhow::anyhow!(e))?;
2334
2335 let ts_init = self.generate_ts_init();
2336 let mut reports = Vec::with_capacity(resp.len());
2337
2338 for position in resp {
2339 let inst = self.instrument_or_fetch(position.inst_id).await?;
2340
2341 let report = parse_position_status_report(
2342 position,
2343 account_id,
2344 inst.id(),
2345 inst.size_precision(),
2346 ts_init,
2347 )?;
2348 reports.push(report);
2349 }
2350
2351 Ok(reports)
2352 }
2353
2354 pub async fn place_algo_order(
2364 &self,
2365 request: OKXPlaceAlgoOrderRequest,
2366 ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
2367 let body =
2368 serde_json::to_vec(&request).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2369
2370 let resp: Vec<OKXPlaceAlgoOrderResponse> = self
2371 .inner
2372 .send_request(Method::POST, "/api/v5/trade/order-algo", Some(body), true)
2373 .await?;
2374
2375 resp.into_iter()
2376 .next()
2377 .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2378 }
2379
2380 pub async fn cancel_algo_order(
2390 &self,
2391 request: OKXCancelAlgoOrderRequest,
2392 ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
2393 let body =
2396 serde_json::to_vec(&[request]).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
2397
2398 let resp: Vec<OKXCancelAlgoOrderResponse> = self
2399 .inner
2400 .send_request(Method::POST, "/api/v5/trade/cancel-algos", Some(body), true)
2401 .await?;
2402
2403 resp.into_iter()
2404 .next()
2405 .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
2406 }
2407
2408 #[allow(clippy::too_many_arguments)]
2417 pub async fn place_algo_order_with_domain_types(
2418 &self,
2419 instrument_id: InstrumentId,
2420 td_mode: OKXTradeMode,
2421 client_order_id: ClientOrderId,
2422 order_side: OrderSide,
2423 order_type: OrderType,
2424 quantity: Quantity,
2425 trigger_price: Price,
2426 trigger_type: Option<TriggerType>,
2427 limit_price: Option<Price>,
2428 reduce_only: Option<bool>,
2429 ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
2430 if !matches!(order_side, OrderSide::Buy | OrderSide::Sell) {
2431 return Err(OKXHttpError::ValidationError(
2432 "Invalid order side".to_string(),
2433 ));
2434 }
2435 let okx_side: OKXSide = order_side.into();
2436
2437 let trigger_px_type_enum = trigger_type.map(Into::into).unwrap_or(OKXTriggerType::Last);
2439
2440 let order_px = if matches!(order_type, OrderType::StopLimit | OrderType::LimitIfTouched) {
2442 limit_price.map(|p| p.to_string())
2443 } else {
2444 Some("-1".to_string())
2446 };
2447
2448 let request = OKXPlaceAlgoOrderRequest {
2449 inst_id: instrument_id.symbol.as_str().to_string(),
2450 td_mode,
2451 side: okx_side,
2452 ord_type: OKXAlgoOrderType::Trigger, sz: quantity.to_string(),
2454 algo_cl_ord_id: Some(client_order_id.as_str().to_string()),
2455 trigger_px: Some(trigger_price.to_string()),
2456 order_px,
2457 trigger_px_type: Some(trigger_px_type_enum),
2458 tgt_ccy: None, pos_side: None, close_position: None,
2461 tag: Some(OKX_NAUTILUS_BROKER_ID.to_string()),
2462 reduce_only,
2463 };
2464
2465 self.place_algo_order(request).await
2466 }
2467
2468 pub async fn cancel_algo_order_with_domain_types(
2477 &self,
2478 instrument_id: InstrumentId,
2479 algo_id: String,
2480 ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
2481 let request = OKXCancelAlgoOrderRequest {
2482 inst_id: instrument_id.symbol.to_string(),
2483 algo_id: Some(algo_id),
2484 algo_cl_ord_id: None,
2485 };
2486
2487 self.cancel_algo_order(request).await
2488 }
2489
2490 #[allow(clippy::too_many_arguments)]
2496 pub async fn request_algo_order_status_reports(
2497 &self,
2498 account_id: AccountId,
2499 instrument_type: Option<OKXInstrumentType>,
2500 instrument_id: Option<InstrumentId>,
2501 algo_id: Option<String>,
2502 algo_client_order_id: Option<ClientOrderId>,
2503 state: Option<OKXOrderStatus>,
2504 limit: Option<u32>,
2505 ) -> anyhow::Result<Vec<OrderStatusReport>> {
2506 let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
2507
2508 let inst_type = if let Some(inst_type) = instrument_type {
2509 inst_type
2510 } else if let Some(inst_id) = instrument_id {
2511 let instrument = self.instrument_or_fetch(inst_id.symbol.inner()).await?;
2512 let inst_type = okx_instrument_type(&instrument)?;
2513 instruments_cache.insert(inst_id.symbol.inner(), instrument);
2514 inst_type
2515 } else {
2516 anyhow::bail!("instrument_type or instrument_id required for algo order query")
2517 };
2518
2519 let mut params_builder = GetAlgoOrdersParamsBuilder::default();
2520 params_builder.inst_type(inst_type);
2521 if let Some(inst_id) = instrument_id {
2522 params_builder.inst_id(inst_id.symbol.inner().to_string());
2523 }
2524 if let Some(algo_id) = algo_id.as_ref() {
2525 params_builder.algo_id(algo_id.clone());
2526 }
2527 if let Some(client_order_id) = algo_client_order_id.as_ref() {
2528 params_builder.algo_cl_ord_id(client_order_id.as_str().to_string());
2529 }
2530 if let Some(state) = state {
2531 params_builder.state(state);
2532 }
2533 if let Some(limit) = limit {
2534 params_builder.limit(limit);
2535 }
2536
2537 let params = params_builder
2538 .build()
2539 .map_err(|e| anyhow::anyhow!(format!("Failed to build algo order params: {e}")))?;
2540
2541 let ts_init = self.generate_ts_init();
2542 let mut reports = Vec::new();
2543 let mut seen: AHashSet<(String, String)> = AHashSet::new();
2544
2545 let pending = match self.inner.http_get_order_algo_pending(params.clone()).await {
2546 Ok(result) => result,
2547 Err(OKXHttpError::UnexpectedStatus { status, .. })
2548 if status == StatusCode::NOT_FOUND =>
2549 {
2550 Vec::new()
2551 }
2552 Err(error) => return Err(error.into()),
2553 };
2554 self.collect_algo_reports(
2555 account_id,
2556 &pending,
2557 &mut instruments_cache,
2558 ts_init,
2559 &mut seen,
2560 &mut reports,
2561 )
2562 .await?;
2563
2564 let history = match self.inner.http_get_order_algo_history(params).await {
2565 Ok(result) => result,
2566 Err(OKXHttpError::UnexpectedStatus { status, .. })
2567 if status == StatusCode::NOT_FOUND =>
2568 {
2569 Vec::new()
2570 }
2571 Err(error) => return Err(error.into()),
2572 };
2573 self.collect_algo_reports(
2574 account_id,
2575 &history,
2576 &mut instruments_cache,
2577 ts_init,
2578 &mut seen,
2579 &mut reports,
2580 )
2581 .await?;
2582
2583 Ok(reports)
2584 }
2585
2586 pub async fn request_algo_order_status_report(
2592 &self,
2593 account_id: AccountId,
2594 instrument_id: InstrumentId,
2595 algo_client_order_id: ClientOrderId,
2596 ) -> anyhow::Result<Option<OrderStatusReport>> {
2597 let reports = self
2598 .request_algo_order_status_reports(
2599 account_id,
2600 None,
2601 Some(instrument_id),
2602 None,
2603 Some(algo_client_order_id),
2604 None,
2605 Some(50_u32),
2606 )
2607 .await?;
2608
2609 Ok(reports.into_iter().next())
2610 }
2611 async fn collect_algo_reports(
2612 &self,
2613 account_id: AccountId,
2614 orders: &[OKXOrderAlgo],
2615 instruments_cache: &mut AHashMap<Ustr, InstrumentAny>,
2616 ts_init: UnixNanos,
2617 seen: &mut AHashSet<(String, String)>,
2618 reports: &mut Vec<OrderStatusReport>,
2619 ) -> anyhow::Result<()> {
2620 for order in orders {
2621 let key = (order.algo_id.clone(), order.algo_cl_ord_id.clone());
2622 if !seen.insert(key) {
2623 continue;
2624 }
2625
2626 let instrument = if let Some(instrument) = instruments_cache.get(&order.inst_id) {
2627 instrument.clone()
2628 } else {
2629 let instrument = self.instrument_or_fetch(order.inst_id).await?;
2630 instruments_cache.insert(order.inst_id, instrument.clone());
2631 instrument
2632 };
2633
2634 let report = parse_http_algo_order(order, account_id, &instrument, ts_init)?;
2635 reports.push(report);
2636 }
2637
2638 Ok(())
2639 }
2640}
2641
2642fn parse_http_algo_order(
2643 order: &OKXOrderAlgo,
2644 account_id: AccountId,
2645 instrument: &InstrumentAny,
2646 ts_init: UnixNanos,
2647) -> anyhow::Result<OrderStatusReport> {
2648 let ord_px = if order.ord_px.is_empty() {
2649 "-1".to_string()
2650 } else {
2651 order.ord_px.clone()
2652 };
2653
2654 let reduce_only = if order.reduce_only.is_empty() {
2655 "false".to_string()
2656 } else {
2657 order.reduce_only.clone()
2658 };
2659
2660 let msg = OKXAlgoOrderMsg {
2661 algo_id: order.algo_id.clone(),
2662 algo_cl_ord_id: order.algo_cl_ord_id.clone(),
2663 cl_ord_id: order.cl_ord_id.clone(),
2664 ord_id: order.ord_id.clone(),
2665 inst_id: order.inst_id,
2666 inst_type: order.inst_type,
2667 ord_type: order.ord_type,
2668 state: order.state,
2669 side: order.side,
2670 pos_side: order.pos_side,
2671 sz: order.sz.clone(),
2672 trigger_px: order.trigger_px.clone(),
2673 trigger_px_type: order.trigger_px_type.unwrap_or(OKXTriggerType::None),
2674 ord_px,
2675 td_mode: order.td_mode,
2676 lever: order.lever.clone(),
2677 reduce_only,
2678 actual_px: order.actual_px.clone(),
2679 actual_sz: order.actual_sz.clone(),
2680 notional_usd: order.notional_usd.clone(),
2681 c_time: order.c_time,
2682 u_time: order.u_time,
2683 trigger_time: order.trigger_time.clone(),
2684 tag: order.tag.clone(),
2685 };
2686
2687 parse_algo_order_status_report(&msg, instrument, account_id, ts_init)
2688}