nautilus_okx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides an ergonomic wrapper around the **OKX v5 REST API** –
17//! <https://www.okx.com/docs-v5/en/>.
18//!
19//! The core type exported by this module is [`OKXHttpClient`].  It offers a
20//! *strongly-typed* interface to all exchange endpoints currently required by
21//! NautilusTrader.
22//!
23//! Key responsibilities handled internally:
24//! • Request signing and header composition for private routes (HMAC-SHA256).
25//! • Rate-limiting based on the public OKX specification.
26//! • Zero-copy deserialization of large JSON payloads into domain models.
27//! • Conversion of raw exchange errors into the rich [`OKXHttpError`] enum.
28//!
29//! # Quick links to official docs
30//! | Domain                               | OKX reference                                                             |
31//! |--------------------------------------|---------------------------------------------------------------------------|
32//! | Market data                          | <https://www.okx.com/docs-v5/en/#rest-api-market-data>                    |
33//! | Account & positions                  | <https://www.okx.com/docs-v5/en/#rest-api-account>                       |
34//! | Funding & asset balances             | <https://www.okx.com/docs-v5/en/#rest-api-funding>                       |
35
36use std::{
37    collections::HashMap,
38    fmt::Debug,
39    num::NonZeroU32,
40    sync::{Arc, LazyLock, Mutex},
41};
42
43use ahash::AHashSet;
44use chrono::{DateTime, Utc};
45use nautilus_core::{
46    UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_env_var, time::get_atomic_clock_realtime,
47};
48use nautilus_model::{
49    data::{Bar, BarType, IndexPriceUpdate, MarkPriceUpdate, TradeTick},
50    enums::{AggregationSource, BarAggregation},
51    events::AccountState,
52    identifiers::{AccountId, InstrumentId},
53    instruments::{Instrument, InstrumentAny},
54    reports::{FillReport, OrderStatusReport, PositionStatusReport},
55};
56use nautilus_network::{
57    http::HttpClient,
58    ratelimiter::quota::Quota,
59    retry::{RetryConfig, RetryManager},
60};
61use reqwest::{Method, StatusCode, header::USER_AGENT};
62use serde::{Deserialize, Serialize, de::DeserializeOwned};
63use tokio_util::sync::CancellationToken;
64use ustr::Ustr;
65
66use super::{
67    error::OKXHttpError,
68    models::{
69        OKXAccount, OKXIndexTicker, OKXMarkPrice, OKXOrderHistory, OKXPosition, OKXPositionHistory,
70        OKXPositionTier, OKXTransactionDetail,
71    },
72    query::{
73        GetCandlesticksParams, GetCandlesticksParamsBuilder, GetIndexTickerParams,
74        GetIndexTickerParamsBuilder, GetInstrumentsParams, GetInstrumentsParamsBuilder,
75        GetMarkPriceParams, GetMarkPriceParamsBuilder, GetOrderHistoryParams,
76        GetOrderHistoryParamsBuilder, GetOrderListParams, GetOrderListParamsBuilder,
77        GetPositionTiersParams, GetPositionsHistoryParams, GetPositionsParams,
78        GetPositionsParamsBuilder, GetTradesParams, GetTradesParamsBuilder,
79        GetTransactionDetailsParams, GetTransactionDetailsParamsBuilder, SetPositionModeParams,
80        SetPositionModeParamsBuilder,
81    },
82};
83use crate::{
84    common::{
85        consts::{OKX_HTTP_URL, should_retry_error_code},
86        credential::Credential,
87        enums::{OKXInstrumentType, OKXPositionMode},
88        models::OKXInstrument,
89        parse::{
90            okx_instrument_type, parse_account_state, parse_candlestick, parse_fill_report,
91            parse_index_price_update, parse_instrument_any, parse_mark_price_update,
92            parse_order_status_report, parse_position_status_report, parse_trade_tick,
93        },
94    },
95    http::{
96        models::{OKXCandlestick, OKXTrade},
97        query::{GetOrderParams, GetPendingOrdersParams},
98    },
99};
100
101const OKX_SUCCESS_CODE: &str = "0";
102
103/// Default OKX REST API rate limit: 500 requests per 2 seconds.
104///
105/// - Sub-account order limit: 1000 requests per 2 seconds.
106/// - Account balance: 10 requests per 2 seconds.
107/// - Account instruments: 20 requests per 2 seconds.
108///
109/// We use a conservative 250 requests per second (500 per 2 seconds) as a general limit
110/// that should accommodate most use cases while respecting OKX's documented limits.
111pub static OKX_REST_QUOTA: LazyLock<Quota> =
112    LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
113
114/// Represents an OKX HTTP response.
115#[derive(Debug, Serialize, Deserialize)]
116pub struct OKXResponse<T> {
117    /// The OKX response code, which is `"0"` for success.
118    pub code: String,
119    /// A message string which can be informational or describe an error cause.
120    pub msg: String,
121    /// The typed data returned by the OKX endpoint.
122    pub data: Vec<T>,
123}
124
125/// Provides a HTTP client for connecting to the [OKX](https://okx.com) REST API.
126///
127/// This client wraps the underlying [`HttpClient`] to handle functionality
128/// specific to OKX, such as request signing (for authenticated endpoints),
129/// forming request URLs, and deserializing responses into specific data models.
130pub struct OKXHttpInnerClient {
131    base_url: String,
132    client: HttpClient,
133    credential: Option<Credential>,
134    retry_manager: RetryManager<OKXHttpError>,
135    cancellation_token: CancellationToken,
136}
137
138impl Default for OKXHttpInnerClient {
139    fn default() -> Self {
140        Self::new(None, Some(60), None, None, None)
141            .expect("Failed to create default OKXHttpInnerClient")
142    }
143}
144
145impl Debug for OKXHttpInnerClient {
146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        let credential = self.credential.as_ref().map(|_| "<redacted>");
148        f.debug_struct(stringify!(OKXHttpInnerClient))
149            .field("base_url", &self.base_url)
150            .field("credential", &credential)
151            .finish_non_exhaustive()
152    }
153}
154
155impl OKXHttpInnerClient {
156    /// Cancel all pending HTTP requests.
157    pub fn cancel_all_requests(&self) {
158        self.cancellation_token.cancel();
159    }
160
161    /// Get the cancellation token for this client.
162    pub fn cancellation_token(&self) -> &CancellationToken {
163        &self.cancellation_token
164    }
165
166    /// Creates a new [`OKXHttpClient`] using the default OKX HTTP URL,
167    /// optionally overridden with a custom base URL.
168    ///
169    /// This version of the client has **no credentials**, so it can only
170    /// call publicly accessible endpoints.
171    ///
172    /// # Errors
173    ///
174    /// Returns an error if the retry manager cannot be created.
175    pub fn new(
176        base_url: Option<String>,
177        timeout_secs: Option<u64>,
178        max_retries: Option<u32>,
179        retry_delay_ms: Option<u64>,
180        retry_delay_max_ms: Option<u64>,
181    ) -> Result<Self, OKXHttpError> {
182        let retry_config = RetryConfig {
183            max_retries: max_retries.unwrap_or(3),
184            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
185            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
186            backoff_factor: 2.0,
187            jitter_ms: 1000,
188            operation_timeout_ms: Some(60_000),
189            immediate_first: false,
190            max_elapsed_ms: Some(180_000),
191        };
192
193        let retry_manager = RetryManager::new(retry_config).map_err(|e| {
194            OKXHttpError::ValidationError(format!("Failed to create retry manager: {e}"))
195        })?;
196
197        Ok(Self {
198            base_url: base_url.unwrap_or(OKX_HTTP_URL.to_string()),
199            client: HttpClient::new(
200                Self::default_headers(),
201                vec![],
202                vec![],
203                Some(*OKX_REST_QUOTA),
204                timeout_secs,
205            ),
206            credential: None,
207            retry_manager,
208            cancellation_token: CancellationToken::new(),
209        })
210    }
211
212    /// Creates a new [`OKXHttpClient`] configured with credentials
213    /// for authenticated requests, optionally using a custom base URL.
214    ///
215    /// # Errors
216    ///
217    /// Returns an error if the retry manager cannot be created.
218    #[allow(clippy::too_many_arguments)]
219    pub fn with_credentials(
220        api_key: String,
221        api_secret: String,
222        api_passphrase: String,
223        base_url: String,
224        timeout_secs: Option<u64>,
225        max_retries: Option<u32>,
226        retry_delay_ms: Option<u64>,
227        retry_delay_max_ms: Option<u64>,
228    ) -> Result<Self, OKXHttpError> {
229        let retry_config = RetryConfig {
230            max_retries: max_retries.unwrap_or(3),
231            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
232            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
233            backoff_factor: 2.0,
234            jitter_ms: 1000,
235            operation_timeout_ms: Some(60_000),
236            immediate_first: false,
237            max_elapsed_ms: Some(180_000),
238        };
239
240        let retry_manager = RetryManager::new(retry_config).map_err(|e| {
241            OKXHttpError::ValidationError(format!("Failed to create retry manager: {e}"))
242        })?;
243
244        Ok(Self {
245            base_url,
246            client: HttpClient::new(
247                Self::default_headers(),
248                vec![],
249                vec![],
250                Some(*OKX_REST_QUOTA),
251                timeout_secs,
252            ),
253            credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
254            retry_manager,
255            cancellation_token: CancellationToken::new(),
256        })
257    }
258
259    /// Builds the default headers to include with each request (e.g., `User-Agent`).
260    fn default_headers() -> HashMap<String, String> {
261        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
262    }
263
264    /// Combine a base path with a `serde_urlencoded` query string if one exists.
265    ///
266    /// # Errors
267    ///
268    /// Returns an error if the query string serialization fails.
269    fn build_path<S: Serialize>(base: &str, params: &S) -> Result<String, OKXHttpError> {
270        let query = serde_urlencoded::to_string(params)
271            .map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
272        if query.is_empty() {
273            Ok(base.to_owned())
274        } else {
275            Ok(format!("{base}?{query}"))
276        }
277    }
278
279    /// Signs an OKX request with timestamp, API key, passphrase, and signature.
280    ///
281    /// # Errors
282    ///
283    /// Returns [`OKXHttpError::MissingCredentials`] if no credentials are set
284    /// but the request requires authentication.
285    fn sign_request(
286        &self,
287        method: &Method,
288        path: &str,
289        body: Option<&[u8]>,
290    ) -> Result<HashMap<String, String>, OKXHttpError> {
291        let credential = match self.credential.as_ref() {
292            Some(c) => c,
293            None => return Err(OKXHttpError::MissingCredentials),
294        };
295
296        let body_str = body
297            .and_then(|b| String::from_utf8(b.to_vec()).ok())
298            .unwrap_or_default();
299
300        tracing::debug!("{method} {path}");
301
302        let api_key = credential.api_key.clone().to_string();
303        let api_passphrase = credential.api_passphrase.clone();
304        let timestamp = Utc::now().format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string();
305        let signature = credential.sign(&timestamp, method.as_str(), path, &body_str);
306
307        let mut headers = HashMap::new();
308        headers.insert("OK-ACCESS-KEY".to_string(), api_key);
309        headers.insert("OK-ACCESS-PASSPHRASE".to_string(), api_passphrase);
310        headers.insert("OK-ACCESS-TIMESTAMP".to_string(), timestamp);
311        headers.insert("OK-ACCESS-SIGN".to_string(), signature);
312
313        Ok(headers)
314    }
315
316    /// Sends an HTTP request to OKX and parses the response into `Vec<T>`.
317    ///
318    /// Internally, this method handles:
319    /// - Building the URL from `base_url` + `path`.
320    /// - Optionally signing the request.
321    /// - Deserializing JSON responses into typed models, or returning a [`OKXHttpError`].
322    /// - Retrying with exponential backoff on transient errors.
323    ///
324    /// # Errors
325    ///
326    /// This function will return an error if:
327    /// - The HTTP request fails.
328    /// - Authentication is required but credentials are missing.
329    /// - The response cannot be deserialized into the expected type.
330    /// - The OKX API returns an error response.
331    async fn send_request<T: DeserializeOwned>(
332        &self,
333        method: Method,
334        path: &str,
335        body: Option<Vec<u8>>,
336        authenticate: bool,
337    ) -> Result<Vec<T>, OKXHttpError> {
338        let url = format!("{}{path}", self.base_url);
339        let endpoint = path.to_string();
340        let method_clone = method.clone();
341        let body_clone = body.clone();
342
343        let operation = || {
344            let url = url.clone();
345            let method = method_clone.clone();
346            let body = body_clone.clone();
347            let endpoint = endpoint.clone();
348
349            async move {
350                let mut headers = if authenticate {
351                    self.sign_request(&method, &endpoint, body.as_deref())?
352                } else {
353                    HashMap::new()
354                };
355
356                // Always set Content-Type header when body is present
357                if body.is_some() {
358                    headers.insert("Content-Type".to_string(), "application/json".to_string());
359                }
360
361                let resp = self
362                    .client
363                    .request(method.clone(), url, Some(headers), body, None, None)
364                    .await?;
365
366                tracing::trace!("Response: {resp:?}");
367
368                if resp.status.is_success() {
369                    let okx_response: OKXResponse<T> =
370                        serde_json::from_slice(&resp.body).map_err(|e| {
371                            tracing::error!("Failed to deserialize OKXResponse: {e}");
372                            OKXHttpError::JsonError(e.to_string())
373                        })?;
374
375                    if okx_response.code != OKX_SUCCESS_CODE {
376                        return Err(OKXHttpError::OkxError {
377                            error_code: okx_response.code,
378                            message: okx_response.msg,
379                        });
380                    }
381
382                    Ok(okx_response.data)
383                } else {
384                    let error_body = String::from_utf8_lossy(&resp.body);
385                    tracing::error!(
386                        "HTTP error {} with body: {error_body}",
387                        resp.status.as_str()
388                    );
389
390                    if let Ok(parsed_error) = serde_json::from_slice::<OKXResponse<T>>(&resp.body) {
391                        return Err(OKXHttpError::OkxError {
392                            error_code: parsed_error.code,
393                            message: parsed_error.msg,
394                        });
395                    }
396
397                    Err(OKXHttpError::UnexpectedStatus {
398                        status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
399                        body: error_body.to_string(),
400                    })
401                }
402            }
403        };
404
405        // Retry strategy based on OKX error responses and HTTP status codes:
406        //
407        // 1. Network errors: always retry (transient connection issues)
408        // 2. HTTP 5xx/429: server errors and rate limiting should be retried
409        // 3. OKX specific retryable error codes (defined in common::consts)
410        //
411        // Note: OKX returns many permanent errors which should NOT be retried
412        // (e.g., "Invalid instrument", "Insufficient balance", "Invalid API Key")
413        let should_retry = |error: &OKXHttpError| -> bool {
414            match error {
415                OKXHttpError::HttpClientError(_) => true,
416                OKXHttpError::UnexpectedStatus { status, .. } => {
417                    status.as_u16() >= 500 || status.as_u16() == 429
418                }
419                OKXHttpError::OkxError { error_code, .. } => should_retry_error_code(error_code),
420                _ => false,
421            }
422        };
423
424        let create_error = |msg: String| -> OKXHttpError {
425            if msg == "canceled" {
426                OKXHttpError::ValidationError("Request canceled".to_string())
427            } else {
428                OKXHttpError::ValidationError(msg)
429            }
430        };
431
432        self.retry_manager
433            .execute_with_retry_with_cancel(
434                &endpoint,
435                operation,
436                should_retry,
437                create_error,
438                &self.cancellation_token,
439            )
440            .await
441    }
442
443    /// Sets the position mode for an account.
444    ///
445    /// # Errors
446    ///
447    /// Returns an error if JSON serialization of `params` fails, if the HTTP
448    /// request fails, or if the response body cannot be deserialized.
449    ///
450    /// # References
451    ///
452    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-set-position-mode>
453    pub async fn http_set_position_mode(
454        &self,
455        params: SetPositionModeParams,
456    ) -> Result<Vec<serde_json::Value>, OKXHttpError> {
457        let path = "/api/v5/account/set-position-mode";
458        let body = serde_json::to_vec(&params)?;
459        self.send_request(Method::POST, path, Some(body), true)
460            .await
461    }
462
463    /// Requests position tiers information, maximum leverage depends on your borrowings and margin ratio.
464    ///
465    /// # Errors
466    ///
467    /// Returns an error if the HTTP request fails, authentication is rejected
468    /// or the response cannot be deserialized.
469    ///
470    /// # References
471    ///
472    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-position-tiers>
473    pub async fn http_get_position_tiers(
474        &self,
475        params: GetPositionTiersParams,
476    ) -> Result<Vec<OKXPositionTier>, OKXHttpError> {
477        let path = Self::build_path("/api/v5/public/position-tiers", &params)?;
478        self.send_request(Method::GET, &path, None, false).await
479    }
480
481    /// Requests a list of instruments with open contracts.
482    ///
483    /// # Errors
484    ///
485    /// Returns an error if JSON serialization of `params` fails, if the HTTP
486    /// request fails, or if the response body cannot be deserialized.
487    ///
488    /// # References
489    ///
490    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-instruments>
491    pub async fn http_get_instruments(
492        &self,
493        params: GetInstrumentsParams,
494    ) -> Result<Vec<OKXInstrument>, OKXHttpError> {
495        let path = Self::build_path("/api/v5/public/instruments", &params)?;
496        self.send_request(Method::GET, &path, None, false).await
497    }
498
499    /// Requests a mark price.
500    ///
501    /// We set the mark price based on the SPOT index and at a reasonable basis to prevent individual
502    /// users from manipulating the market and causing the contract price to fluctuate.
503    ///
504    /// # Errors
505    ///
506    /// Returns an error if the HTTP request fails or if the response body
507    /// cannot be parsed into [`OKXMarkPrice`].
508    ///
509    /// # References
510    ///
511    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-mark-price>
512    pub async fn http_get_mark_price(
513        &self,
514        params: GetMarkPriceParams,
515    ) -> Result<Vec<OKXMarkPrice>, OKXHttpError> {
516        let path = Self::build_path("/api/v5/public/mark-price", &params)?;
517        self.send_request(Method::GET, &path, None, false).await
518    }
519
520    /// Requests the latest index price.
521    ///
522    /// # References
523    ///
524    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-index-tickers>
525    pub async fn http_get_index_ticker(
526        &self,
527        params: GetIndexTickerParams,
528    ) -> Result<Vec<OKXIndexTicker>, OKXHttpError> {
529        let path = Self::build_path("/api/v5/market/index-tickers", &params)?;
530        self.send_request(Method::GET, &path, None, false).await
531    }
532
533    /// Requests trades history.
534    ///
535    /// # References
536    ///
537    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-trades-history>
538    pub async fn http_get_trades(
539        &self,
540        params: GetTradesParams,
541    ) -> Result<Vec<OKXTrade>, OKXHttpError> {
542        let path = Self::build_path("/api/v5/market/history-trades", &params)?;
543        self.send_request(Method::GET, &path, None, false).await
544    }
545
546    /// Requests recent candlestick data.
547    ///
548    /// # References
549    ///
550    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks>
551    pub async fn http_get_candlesticks(
552        &self,
553        params: GetCandlesticksParams,
554    ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
555        let path = Self::build_path("/api/v5/market/candles", &params)?;
556        self.send_request(Method::GET, &path, None, false).await
557    }
558
559    /// Requests historical candlestick data.
560    ///
561    /// # References
562    ///
563    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks-history>
564    pub async fn http_get_candlesticks_history(
565        &self,
566        params: GetCandlesticksParams,
567    ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
568        let path = Self::build_path("/api/v5/market/history-candles", &params)?;
569        self.send_request(Method::GET, &path, None, false).await
570    }
571
572    /// Lists current open orders.
573    ///
574    /// # References
575    ///
576    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-orders-pending>
577    pub async fn http_get_pending_orders(
578        &self,
579        params: GetPendingOrdersParams,
580    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
581        let path = Self::build_path("/api/v5/trade/orders-pending", &params)?;
582        self.send_request(Method::GET, &path, None, true).await
583    }
584
585    /// Retrieves a single order’s details.
586    ///
587    /// # References
588    ///
589    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order>
590    pub async fn http_get_order(
591        &self,
592        params: GetOrderParams,
593    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
594        let path = Self::build_path("/api/v5/trade/order", &params)?;
595        self.send_request(Method::GET, &path, None, true).await
596    }
597
598    /// Requests a list of assets (with non-zero balance), remaining balance, and available amount
599    /// in the trading account.
600    ///
601    /// # References
602    ///
603    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-balance>
604    pub async fn http_get_balance(&self) -> Result<Vec<OKXAccount>, OKXHttpError> {
605        let path = "/api/v5/account/balance";
606        self.send_request(Method::GET, path, None, true).await
607    }
608
609    /// Requests historical order records.
610    ///
611    /// # References
612    ///
613    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-orders-history>
614    pub async fn http_get_order_history(
615        &self,
616        params: GetOrderHistoryParams,
617    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
618        let path = Self::build_path("/api/v5/trade/orders-history", &params)?;
619        self.send_request(Method::GET, &path, None, true).await
620    }
621
622    /// Requests order list (pending orders).
623    ///
624    /// # References
625    ///
626    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-list>
627    pub async fn http_get_order_list(
628        &self,
629        params: GetOrderListParams,
630    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
631        let path = Self::build_path("/api/v5/trade/orders-pending", &params)?;
632        self.send_request(Method::GET, &path, None, true).await
633    }
634
635    /// Requests information on your positions. When the account is in net mode, net positions will
636    /// be displayed, and when the account is in long/short mode, long or short positions will be
637    /// displayed. Returns in reverse chronological order using ctime.
638    ///
639    /// # References
640    ///
641    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions>
642    pub async fn http_get_positions(
643        &self,
644        params: GetPositionsParams,
645    ) -> Result<Vec<OKXPosition>, OKXHttpError> {
646        let path = Self::build_path("/api/v5/account/positions", &params)?;
647        self.send_request(Method::GET, &path, None, true).await
648    }
649
650    /// Requests closed or historical position data.
651    ///
652    /// # References
653    ///
654    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions-history>
655    pub async fn http_get_position_history(
656        &self,
657        params: GetPositionsHistoryParams,
658    ) -> Result<Vec<OKXPositionHistory>, OKXHttpError> {
659        let path = Self::build_path("/api/v5/account/positions-history", &params)?;
660        self.send_request(Method::GET, &path, None, true).await
661    }
662
663    /// Requests transaction details (fills) for the given parameters.
664    ///
665    /// # References
666    ///
667    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-transaction-details-last-3-days>
668    pub async fn http_get_transaction_details(
669        &self,
670        params: GetTransactionDetailsParams,
671    ) -> Result<Vec<OKXTransactionDetail>, OKXHttpError> {
672        let path = Self::build_path("/api/v5/trade/fills", &params)?;
673        self.send_request(Method::GET, &path, None, true).await
674    }
675}
676
677/// Provides a higher-level HTTP client for the [OKX](https://okx.com) REST API.
678///
679/// This client wraps the underlying `OKXHttpInnerClient` to handle conversions
680/// into the Nautilus domain model.
681#[derive(Clone, Debug)]
682#[cfg_attr(
683    feature = "python",
684    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
685)]
686pub struct OKXHttpClient {
687    pub(crate) inner: Arc<OKXHttpInnerClient>,
688    pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
689    cache_initialized: bool,
690}
691
692impl Default for OKXHttpClient {
693    fn default() -> Self {
694        Self::new(None, Some(60), None, None, None).expect("Failed to create default OKXHttpClient")
695    }
696}
697
698impl OKXHttpClient {
699    /// Creates a new [`OKXHttpClient`] using the default OKX HTTP URL,
700    /// optionally overridden with a custom base url.
701    ///
702    /// This version of the client has **no credentials**, so it can only
703    /// call publicly accessible endpoints.
704    ///
705    /// # Errors
706    ///
707    /// Returns an error if the retry manager cannot be created.
708    pub fn new(
709        base_url: Option<String>,
710        timeout_secs: Option<u64>,
711        max_retries: Option<u32>,
712        retry_delay_ms: Option<u64>,
713        retry_delay_max_ms: Option<u64>,
714    ) -> anyhow::Result<Self> {
715        Ok(Self {
716            inner: Arc::new(OKXHttpInnerClient::new(
717                base_url,
718                timeout_secs,
719                max_retries,
720                retry_delay_ms,
721                retry_delay_max_ms,
722            )?),
723            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
724            cache_initialized: false,
725        })
726    }
727
728    /// Creates a new authenticated [`OKXHttpClient`] using environment variables and
729    /// the default OKX HTTP base url.
730    pub fn from_env() -> anyhow::Result<Self> {
731        Self::with_credentials(None, None, None, None, None, None, None, None)
732    }
733
734    /// Creates a new [`OKXHttpClient`] configured with credentials
735    /// for authenticated requests, optionally using a custom base url.
736    #[allow(clippy::too_many_arguments)]
737    pub fn with_credentials(
738        api_key: Option<String>,
739        api_secret: Option<String>,
740        api_passphrase: Option<String>,
741        base_url: Option<String>,
742        timeout_secs: Option<u64>,
743        max_retries: Option<u32>,
744        retry_delay_ms: Option<u64>,
745        retry_delay_max_ms: Option<u64>,
746    ) -> anyhow::Result<Self> {
747        let api_key = api_key.unwrap_or(get_env_var("OKX_API_KEY")?);
748        let api_secret = api_secret.unwrap_or(get_env_var("OKX_API_SECRET")?);
749        let api_passphrase = api_passphrase.unwrap_or(get_env_var("OKX_API_PASSPHRASE")?);
750        let base_url = base_url.unwrap_or(OKX_HTTP_URL.to_string());
751
752        Ok(Self {
753            inner: Arc::new(OKXHttpInnerClient::with_credentials(
754                api_key,
755                api_secret,
756                api_passphrase,
757                base_url,
758                timeout_secs,
759                max_retries,
760                retry_delay_ms,
761                retry_delay_max_ms,
762            )?),
763            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
764            cache_initialized: false,
765        })
766    }
767
768    /// Retrieves an instrument from the cache.
769    ///
770    /// # Errors
771    ///
772    /// Returns an error if the instrument is not found in the cache.
773    fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
774        self.instruments_cache
775            .lock()
776            .expect("`instruments_cache` lock poisoned")
777            .get(&symbol)
778            .cloned()
779            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
780    }
781
782    async fn instrument_or_fetch(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
783        if let Ok(inst) = self.get_instrument_from_cache(symbol) {
784            return Ok(inst);
785        }
786
787        for group in [
788            OKXInstrumentType::Spot,
789            OKXInstrumentType::Margin,
790            OKXInstrumentType::Futures,
791        ] {
792            if let Ok(instruments) = self.request_instruments(group).await {
793                let mut guard = self.instruments_cache.lock().unwrap();
794                for inst in instruments {
795                    guard.insert(inst.raw_symbol().inner(), inst);
796                }
797                drop(guard);
798
799                if let Ok(inst) = self.get_instrument_from_cache(symbol) {
800                    return Ok(inst);
801                }
802            }
803        }
804
805        anyhow::bail!("Instrument {symbol} not in cache and fetch failed");
806    }
807
808    /// Cancel all pending HTTP requests.
809    pub fn cancel_all_requests(&self) {
810        self.inner.cancel_all_requests();
811    }
812
813    /// Get the cancellation token for this client.
814    pub fn cancellation_token(&self) -> &CancellationToken {
815        self.inner.cancellation_token()
816    }
817
818    /// Returns the base url being used by the client.
819    pub fn base_url(&self) -> &str {
820        self.inner.base_url.as_str()
821    }
822
823    /// Returns the public API key being used by the client.
824    pub fn api_key(&self) -> Option<&str> {
825        self.inner.credential.as_ref().map(|c| c.api_key.as_str())
826    }
827
828    /// Checks if the client is initialized.
829    ///
830    /// The client is considered initialized if any instruments have been cached from the venue.
831    #[must_use]
832    pub const fn is_initialized(&self) -> bool {
833        self.cache_initialized
834    }
835
836    /// Generates a timestamp for initialization.
837    fn generate_ts_init(&self) -> UnixNanos {
838        get_atomic_clock_realtime().get_time_ns()
839    }
840
841    /// Returns the cached instrument symbols.
842    #[must_use]
843    /// Returns a snapshot of all instrument symbols currently held in the
844    /// internal cache.
845    ///
846    /// # Panics
847    ///
848    /// Panics if the internal mutex guarding the instrument cache is poisoned
849    /// (which would indicate a previous panic while the lock was held).
850    pub fn get_cached_symbols(&self) -> Vec<String> {
851        self.instruments_cache
852            .lock()
853            .unwrap()
854            .keys()
855            .map(std::string::ToString::to_string)
856            .collect()
857    }
858
859    /// Adds the `instruments` to the clients instrument cache.
860    ///
861    /// Any existing instruments will be replaced.
862    /// Inserts multiple instruments into the local cache.
863    ///
864    /// # Panics
865    ///
866    /// Panics if the instruments cache mutex is poisoned.
867    pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
868        for inst in instruments {
869            self.instruments_cache
870                .lock()
871                .unwrap()
872                .insert(inst.raw_symbol().inner(), inst);
873        }
874        self.cache_initialized = true;
875    }
876
877    /// Adds the `instrument` to the clients instrument cache.
878    ///
879    /// Any existing instrument will be replaced.
880    /// Inserts a single instrument into the local cache.
881    ///
882    /// # Panics
883    ///
884    /// Panics if the instruments cache mutex is poisoned.
885    pub fn add_instrument(&mut self, instrument: InstrumentAny) {
886        self.instruments_cache
887            .lock()
888            .unwrap()
889            .insert(instrument.raw_symbol().inner(), instrument);
890        self.cache_initialized = true;
891    }
892
893    /// Requests the account state for the `account_id` from OKX.
894    ///
895    /// # Errors
896    ///
897    /// Returns an error if the HTTP request fails or no account state is returned.
898    pub async fn request_account_state(
899        &self,
900        account_id: AccountId,
901    ) -> anyhow::Result<AccountState> {
902        let resp = self
903            .inner
904            .http_get_balance()
905            .await
906            .map_err(|e| anyhow::anyhow!(e))?;
907
908        let ts_init = self.generate_ts_init();
909        let raw = resp
910            .first()
911            .ok_or_else(|| anyhow::anyhow!("No account state returned from OKX"))?;
912        let account_state = parse_account_state(raw, account_id, ts_init)?;
913
914        Ok(account_state)
915    }
916
917    /// Sets the position mode for the account.
918    ///
919    /// Defaults to NetMode if no position mode is provided.
920    ///
921    /// # Errors
922    ///
923    /// Returns an error if the HTTP request fails or the position mode cannot be set.
924    ///
925    /// # Note
926    ///
927    /// This endpoint only works for accounts with derivatives trading enabled.
928    /// If the account only has spot trading, this will return an error.
929    pub async fn set_position_mode(&self, position_mode: OKXPositionMode) -> anyhow::Result<()> {
930        let mut params = SetPositionModeParamsBuilder::default();
931        params.pos_mode(position_mode);
932        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
933
934        match self.inner.http_set_position_mode(params).await {
935            Ok(_) => Ok(()),
936            Err(e) => {
937                // Check if this is the "Invalid request type" error for accounts without derivatives
938                if let crate::http::error::OKXHttpError::OkxError {
939                    error_code,
940                    message,
941                } = &e
942                    && error_code == "50115"
943                {
944                    tracing::warn!(
945                        "Account does not support position mode setting (derivatives trading not enabled): {message}"
946                    );
947                    return Ok(()); // Gracefully handle this case
948                }
949                anyhow::bail!(e)
950            }
951        }
952    }
953
954    /// Requests all instruments for the `instrument_type` from OKX.
955    ///
956    /// # Errors
957    ///
958    /// Returns an error if the HTTP request fails or instrument parsing fails.
959    pub async fn request_instruments(
960        &self,
961        instrument_type: OKXInstrumentType,
962    ) -> anyhow::Result<Vec<InstrumentAny>> {
963        let mut params = GetInstrumentsParamsBuilder::default();
964        params.inst_type(instrument_type);
965        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
966
967        let resp = self
968            .inner
969            .http_get_instruments(params)
970            .await
971            .map_err(|e| anyhow::anyhow!(e))?;
972
973        let ts_init = self.generate_ts_init();
974
975        let mut instruments: Vec<InstrumentAny> = Vec::new();
976        for inst in &resp {
977            if let Some(instrument_any) = parse_instrument_any(inst, ts_init)? {
978                instruments.push(instrument_any);
979            }
980        }
981
982        Ok(instruments)
983    }
984
985    /// Requests the latest mark price for the `instrument_type` from OKX.
986    ///
987    /// # Errors
988    ///
989    /// Returns an error if the HTTP request fails or no mark price is returned.
990    pub async fn request_mark_price(
991        &self,
992        instrument_id: InstrumentId,
993    ) -> anyhow::Result<MarkPriceUpdate> {
994        let mut params = GetMarkPriceParamsBuilder::default();
995        params.inst_id(instrument_id.symbol.inner());
996        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
997
998        let resp = self
999            .inner
1000            .http_get_mark_price(params)
1001            .await
1002            .map_err(|e| anyhow::anyhow!(e))?;
1003
1004        let raw = resp
1005            .first()
1006            .ok_or_else(|| anyhow::anyhow!("No mark price returned from OKX"))?;
1007        let inst = self
1008            .instrument_or_fetch(instrument_id.symbol.inner())
1009            .await?;
1010        let ts_init = self.generate_ts_init();
1011
1012        let mark_price =
1013            parse_mark_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1014                .map_err(|e| anyhow::anyhow!(e))?;
1015        Ok(mark_price)
1016    }
1017
1018    /// Requests the latest index price for the `instrument_id` from OKX.
1019    ///
1020    /// # Errors
1021    ///
1022    /// Returns an error if the HTTP request fails or no index price is returned.
1023    pub async fn request_index_price(
1024        &self,
1025        instrument_id: InstrumentId,
1026    ) -> anyhow::Result<IndexPriceUpdate> {
1027        let mut params = GetIndexTickerParamsBuilder::default();
1028        params.inst_id(instrument_id.symbol.inner());
1029        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1030
1031        let resp = self
1032            .inner
1033            .http_get_index_ticker(params)
1034            .await
1035            .map_err(|e| anyhow::anyhow!(e))?;
1036
1037        let raw = resp
1038            .first()
1039            .ok_or_else(|| anyhow::anyhow!("No index price returned from OKX"))?;
1040        let inst = self
1041            .instrument_or_fetch(instrument_id.symbol.inner())
1042            .await?;
1043        let ts_init = self.generate_ts_init();
1044
1045        let index_price =
1046            parse_index_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1047                .map_err(|e| anyhow::anyhow!(e))?;
1048        Ok(index_price)
1049    }
1050
1051    /// Requests trades for the `instrument_id` and `start` -> `end` time range.
1052    ///
1053    /// # Errors
1054    ///
1055    /// Returns an error if the HTTP request fails or trade parsing fails.
1056    pub async fn request_trades(
1057        &self,
1058        instrument_id: InstrumentId,
1059        start: Option<DateTime<Utc>>,
1060        end: Option<DateTime<Utc>>,
1061        limit: Option<u32>,
1062    ) -> anyhow::Result<Vec<TradeTick>> {
1063        let mut params = GetTradesParamsBuilder::default();
1064
1065        params.inst_id(instrument_id.symbol.inner());
1066        if let Some(s) = start {
1067            params.before(s.timestamp_millis().to_string());
1068        }
1069        if let Some(e) = end {
1070            params.after(e.timestamp_millis().to_string());
1071        }
1072        if let Some(l) = limit {
1073            params.limit(l);
1074        }
1075
1076        let params = params.build().map_err(anyhow::Error::new)?;
1077
1078        // Fetch raw trades
1079        let raw_trades = self
1080            .inner
1081            .http_get_trades(params)
1082            .await
1083            .map_err(anyhow::Error::new)?;
1084
1085        let ts_init = self.generate_ts_init();
1086        let inst = self
1087            .instrument_or_fetch(instrument_id.symbol.inner())
1088            .await?;
1089
1090        let mut trades = Vec::with_capacity(raw_trades.len());
1091        for raw in raw_trades {
1092            match parse_trade_tick(
1093                &raw,
1094                instrument_id,
1095                inst.price_precision(),
1096                inst.size_precision(),
1097                ts_init,
1098            ) {
1099                Ok(trade) => trades.push(trade),
1100                Err(e) => tracing::error!("{e}"),
1101            }
1102        }
1103
1104        Ok(trades)
1105    }
1106
1107    /// Requests historical bars for the given bar type and time range.
1108    ///
1109    /// The aggregation source must be `EXTERNAL`. Time range validation ensures start < end.
1110    /// Returns bars sorted oldest to newest.
1111    ///
1112    /// # Endpoint Selection
1113    ///
1114    /// The OKX API has different endpoints with different limits:
1115    /// - Regular endpoint (`/api/v5/market/candles`): ≤ 300 rows/call, ≤ 40 req/2s
1116    ///   - Used when: start is None OR age ≤ 100 days
1117    /// - History endpoint (`/api/v5/market/history-candles`): ≤ 100 rows/call, ≤ 20 req/2s
1118    ///   - Used when: start is Some AND age > 100 days
1119    ///
1120    /// Age is calculated as `Utc::now() - start` at the time of the first request.
1121    ///
1122    /// # Supported Aggregations
1123    ///
1124    /// Maps to OKX bar query parameter:
1125    /// - `Second` → `{n}s`
1126    /// - `Minute` → `{n}m`
1127    /// - `Hour` → `{n}H`
1128    /// - `Day` → `{n}D`
1129    /// - `Week` → `{n}W`
1130    /// - `Month` → `{n}M`
1131    ///
1132    /// # Pagination
1133    ///
1134    /// - Uses `before` parameter for backwards pagination
1135    /// - Pages backwards from end time (or now) to start time
1136    /// - Stops when: limit reached, time window covered, or API returns empty
1137    /// - Rate limit safety: ≥ 50ms between requests
1138    ///
1139    /// # Panics
1140    ///
1141    /// May panic if internal data structures are in an unexpected state.
1142    ///
1143    /// # References
1144    ///
1145    /// - <https://tr.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks>
1146    /// - <https://tr.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks-history>
1147    pub async fn request_bars(
1148        &self,
1149        bar_type: BarType,
1150        start: Option<DateTime<Utc>>,
1151        mut end: Option<DateTime<Utc>>,
1152        limit: Option<u32>,
1153    ) -> anyhow::Result<Vec<Bar>> {
1154        const HISTORY_SPLIT_DAYS: i64 = 100;
1155        const MAX_PAGES_SOFT: usize = 500;
1156
1157        let limit = if limit == Some(0) { None } else { limit };
1158
1159        anyhow::ensure!(
1160            bar_type.aggregation_source() == AggregationSource::External,
1161            "Only EXTERNAL aggregation is supported"
1162        );
1163        if let (Some(s), Some(e)) = (start, end) {
1164            anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1165        }
1166
1167        let now = Utc::now();
1168        if let Some(s) = start
1169            && s > now
1170        {
1171            return Ok(Vec::new());
1172        }
1173        if let Some(e) = end
1174            && e > now
1175        {
1176            end = Some(now);
1177        }
1178
1179        let spec = bar_type.spec();
1180        let step = spec.step.get();
1181        let bar_param = match spec.aggregation {
1182            BarAggregation::Second => format!("{step}s"),
1183            BarAggregation::Minute => format!("{step}m"),
1184            BarAggregation::Hour => format!("{step}H"),
1185            BarAggregation::Day => format!("{step}D"),
1186            BarAggregation::Week => format!("{step}W"),
1187            BarAggregation::Month => format!("{step}M"),
1188            a => anyhow::bail!("OKX does not support {a:?} aggregation"),
1189        };
1190
1191        let slot_ms: i64 = match spec.aggregation {
1192            BarAggregation::Second => (step as i64) * 1_000,
1193            BarAggregation::Minute => (step as i64) * 60_000,
1194            BarAggregation::Hour => (step as i64) * 3_600_000,
1195            BarAggregation::Day => (step as i64) * 86_400_000,
1196            BarAggregation::Week => (step as i64) * 7 * 86_400_000,
1197            BarAggregation::Month => (step as i64) * 30 * 86_400_000,
1198            _ => unreachable!("Unsupported aggregation should have been caught above"),
1199        };
1200        let slot_ns: i64 = slot_ms * 1_000_000;
1201
1202        #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1203        enum Mode {
1204            Latest,
1205            Backward,
1206            Range,
1207        }
1208
1209        let mode = match (start, end) {
1210            (None, None) => Mode::Latest,
1211            (Some(_), None) => Mode::Backward, // Changed: when only start is provided, work backward from now
1212            (None, Some(_)) => Mode::Backward,
1213            (Some(_), Some(_)) => Mode::Range,
1214        };
1215
1216        let start_ns = start.and_then(|s| s.timestamp_nanos_opt());
1217        let end_ns = end.and_then(|e| e.timestamp_nanos_opt());
1218
1219        // Floor start and ceiling end to bar boundaries for cleaner API requests
1220        let start_ms = start.map(|s| {
1221            let ms = s.timestamp_millis();
1222            if slot_ms > 0 {
1223                (ms / slot_ms) * slot_ms // Floor to nearest bar boundary
1224            } else {
1225                ms
1226            }
1227        });
1228        let end_ms = end.map(|e| {
1229            let ms = e.timestamp_millis();
1230            if slot_ms > 0 {
1231                ((ms + slot_ms - 1) / slot_ms) * slot_ms // Ceiling to nearest bar boundary
1232            } else {
1233                ms
1234            }
1235        });
1236        let now_ms = now.timestamp_millis();
1237
1238        let symbol = bar_type.instrument_id().symbol;
1239        let inst = self.instrument_or_fetch(symbol.inner()).await?;
1240
1241        let mut out: Vec<Bar> = Vec::new();
1242        let mut pages = 0usize;
1243
1244        // IMPORTANT: OKX API behavior:
1245        // - With 'after' parameter: returns bars with timestamp > after (going forward)
1246        // - With 'before' parameter: returns bars with timestamp < before (going backward)
1247        // For Range mode, we use 'before' starting from the end time to get bars before it
1248        let mut after_ms: Option<i64> = None;
1249        let mut before_ms: Option<i64> = match mode {
1250            Mode::Backward => end_ms.map(|v| v.saturating_sub(1)),
1251            Mode::Range => {
1252                // For Range, start from the end time (or current time if no end specified)
1253                // The API will return bars with timestamp < before_ms
1254                Some(end_ms.unwrap_or(now_ms))
1255            }
1256            Mode::Latest => None,
1257        };
1258
1259        // For Range mode, we'll paginate backwards like Backward mode
1260        let mut forward_prepend_mode = matches!(mode, Mode::Range);
1261
1262        // Adjust before_ms to ensure we get data from the API
1263        // OKX API might not have bars for the very recent past
1264        // This handles both explicit end=now and the actor layer setting end=now when it's None
1265        if matches!(mode, Mode::Backward | Mode::Range)
1266            && let Some(b) = before_ms
1267        {
1268            // OKX endpoints have different data availability windows:
1269            // - Regular endpoint: has most recent data but limited depth
1270            // - History endpoint: has deep history but lags behind current time
1271            // Use a small buffer to avoid the "dead zone"
1272            let buffer_ms = slot_ms.max(60_000); // At least 1 minute or 1 bar
1273            if b >= now_ms.saturating_sub(buffer_ms) {
1274                before_ms = Some(now_ms.saturating_sub(buffer_ms));
1275            }
1276        }
1277
1278        let mut have_latest_first_page = false;
1279        let mut progressless_loops = 0u8;
1280
1281        loop {
1282            if let Some(lim) = limit
1283                && lim > 0
1284                && out.len() >= lim as usize
1285            {
1286                break;
1287            }
1288            if pages >= MAX_PAGES_SOFT {
1289                break;
1290            }
1291
1292            let pivot_ms = if let Some(a) = after_ms {
1293                a
1294            } else if let Some(b) = before_ms {
1295                b
1296            } else {
1297                now_ms
1298            };
1299            // Choose endpoint based on how old the data is:
1300            // - Use regular endpoint for recent data (< 1 hour old)
1301            // - Use history endpoint for older data (> 1 hour old)
1302            // This avoids the "gap" where history endpoint has no recent data
1303            // and regular endpoint has limited depth
1304            let age_ms = now_ms.saturating_sub(pivot_ms);
1305            let age_hours = age_ms / (60 * 60 * 1000);
1306            let using_history = age_hours > 1; // Use history if data is > 1 hour old
1307
1308            let page_ceiling = if using_history { 100 } else { 300 };
1309            let remaining = limit
1310                .filter(|&l| l > 0) // Treat limit=0 as no limit
1311                .map(|l| (l as usize).saturating_sub(out.len()))
1312                .unwrap_or(page_ceiling);
1313            let page_cap = remaining.min(page_ceiling);
1314
1315            let mut p = GetCandlesticksParamsBuilder::default();
1316            p.inst_id(symbol.as_str())
1317                .bar(&bar_param)
1318                .limit(page_cap as u32);
1319
1320            // Track whether this planned request uses BEFORE or AFTER.
1321            let mut req_used_before = false;
1322
1323            match mode {
1324                Mode::Latest => {
1325                    if have_latest_first_page && let Some(b) = before_ms {
1326                        p.before_ms(b);
1327                        req_used_before = true;
1328                    }
1329                }
1330                Mode::Backward => {
1331                    if let Some(b) = before_ms {
1332                        p.before_ms(b);
1333                        req_used_before = true;
1334                    }
1335                }
1336                Mode::Range => {
1337                    // For first request with regular endpoint, try without parameters
1338                    // to get the most recent bars, then filter
1339                    if pages == 0 && !using_history {
1340                        // Don't set any time parameters on first request
1341                        // This gets the most recent bars available
1342                    } else if forward_prepend_mode {
1343                        if let Some(b) = before_ms {
1344                            p.before_ms(b);
1345                            req_used_before = true;
1346                        }
1347                    } else if let Some(a) = after_ms {
1348                        p.after_ms(a);
1349                    }
1350                }
1351            }
1352
1353            let params = p.build().map_err(anyhow::Error::new)?;
1354
1355            let mut raw = if using_history {
1356                self.inner
1357                    .http_get_candlesticks_history(params.clone())
1358                    .await
1359                    .map_err(anyhow::Error::new)?
1360            } else {
1361                self.inner
1362                    .http_get_candlesticks(params.clone())
1363                    .await
1364                    .map_err(anyhow::Error::new)?
1365            };
1366
1367            // --- Fallbacks on empty page ---
1368            if raw.is_empty() {
1369                // LATEST: retry same cursor via history, then step back a page-interval before giving up
1370                if matches!(mode, Mode::Latest)
1371                    && have_latest_first_page
1372                    && !using_history
1373                    && let Some(b) = before_ms
1374                {
1375                    let mut p2 = GetCandlesticksParamsBuilder::default();
1376                    p2.inst_id(symbol.as_str())
1377                        .bar(&bar_param)
1378                        .limit(page_cap as u32);
1379                    p2.before_ms(b);
1380                    let params2 = p2.build().map_err(anyhow::Error::new)?;
1381                    let raw2 = self
1382                        .inner
1383                        .http_get_candlesticks_history(params2)
1384                        .await
1385                        .map_err(anyhow::Error::new)?;
1386                    if !raw2.is_empty() {
1387                        raw = raw2;
1388                    } else {
1389                        // Step back one page interval and retry loop
1390                        let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
1391                        before_ms = Some(b.saturating_sub(jump));
1392                        progressless_loops = progressless_loops.saturating_add(1);
1393                        if progressless_loops >= 3 {
1394                            break;
1395                        }
1396                        continue;
1397                    }
1398                }
1399
1400                // Range mode doesn't need special bootstrap - it uses the normal flow with before_ms set
1401
1402                // If still empty: for Range after first page, try a single backstep window using BEFORE
1403                if raw.is_empty() && matches!(mode, Mode::Range) && pages > 0 {
1404                    let backstep_ms = (page_cap as i64).saturating_mul(slot_ms.max(1));
1405                    let pivot_back = after_ms.unwrap_or(now_ms).saturating_sub(backstep_ms);
1406
1407                    let mut p2 = GetCandlesticksParamsBuilder::default();
1408                    p2.inst_id(symbol.as_str())
1409                        .bar(&bar_param)
1410                        .limit(page_cap as u32)
1411                        .before_ms(pivot_back);
1412                    let params2 = p2.build().map_err(anyhow::Error::new)?;
1413                    let raw2 = if (now_ms.saturating_sub(pivot_back)) / (24 * 60 * 60 * 1000)
1414                        > HISTORY_SPLIT_DAYS
1415                    {
1416                        self.inner.http_get_candlesticks_history(params2).await
1417                    } else {
1418                        self.inner.http_get_candlesticks(params2).await
1419                    }
1420                    .map_err(anyhow::Error::new)?;
1421                    if raw2.is_empty() {
1422                        break;
1423                    } else {
1424                        raw = raw2;
1425                        forward_prepend_mode = true;
1426                        req_used_before = true;
1427                    }
1428                }
1429
1430                // First LATEST page empty: jump back >100d to force history, then continue loop
1431                if raw.is_empty()
1432                    && matches!(mode, Mode::Latest)
1433                    && !have_latest_first_page
1434                    && !using_history
1435                {
1436                    let jump_days_ms = (HISTORY_SPLIT_DAYS + 1) * 86_400_000;
1437                    before_ms = Some(now_ms.saturating_sub(jump_days_ms));
1438                    have_latest_first_page = true;
1439                    continue;
1440                }
1441
1442                // Still empty for any other case? Just break.
1443                if raw.is_empty() {
1444                    break;
1445                }
1446            }
1447            // --- end fallbacks ---
1448
1449            pages += 1;
1450
1451            // Parse, oldest → newest
1452            let ts_init = self.generate_ts_init();
1453            let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
1454            for r in &raw {
1455                page.push(parse_candlestick(
1456                    r,
1457                    bar_type,
1458                    inst.price_precision(),
1459                    inst.size_precision(),
1460                    ts_init,
1461                )?);
1462            }
1463            page.reverse();
1464
1465            let page_oldest_ms = page.first().map(|b| b.ts_event.as_i64() / 1_000_000);
1466            let page_newest_ms = page.last().map(|b| b.ts_event.as_i64() / 1_000_000);
1467
1468            // Range filter (inclusive)
1469            // For Range mode, if we have no bars yet and this is an early page,
1470            // be more tolerant with the start boundary to handle gaps in data
1471            let mut filtered: Vec<Bar> = if matches!(mode, Mode::Range)
1472                && out.is_empty()
1473                && pages < 2
1474            {
1475                // On first pages of Range mode with no data yet, include the most recent bar
1476                // even if it's slightly before our start time (within 2 bar periods)
1477                // BUT we want ALL bars in the page that are within our range
1478                let tolerance_ns = slot_ns * 2; // Allow up to 2 bar periods before start
1479
1480                // Debug: log the page range
1481                if !page.is_empty() {
1482                    tracing::debug!(
1483                        "Range mode bootstrap page: {} bars from {} to {}, filtering with start={:?} end={:?}",
1484                        page.len(),
1485                        page.first().unwrap().ts_event.as_i64() / 1_000_000,
1486                        page.last().unwrap().ts_event.as_i64() / 1_000_000,
1487                        start_ms,
1488                        end_ms
1489                    );
1490                }
1491
1492                let result: Vec<Bar> = page
1493                    .clone()
1494                    .into_iter()
1495                    .filter(|b| {
1496                        let ts = b.ts_event.as_i64();
1497                        // Accept bars from (start - tolerance) to end
1498                        let ok_after =
1499                            start_ns.is_none_or(|sns| ts >= sns.saturating_sub(tolerance_ns));
1500                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1501                        ok_after && ok_before
1502                    })
1503                    .collect();
1504
1505                result
1506            } else {
1507                // Normal filtering
1508                page.clone()
1509                    .into_iter()
1510                    .filter(|b| {
1511                        let ts = b.ts_event.as_i64();
1512                        let ok_after = start_ns.is_none_or(|sns| ts >= sns);
1513                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1514                        ok_after && ok_before
1515                    })
1516                    .collect()
1517            };
1518
1519            if !page.is_empty() && filtered.is_empty() {
1520                // For Range mode, if all bars are before our start time, there's no point continuing
1521                if matches!(mode, Mode::Range)
1522                    && !forward_prepend_mode
1523                    && let (Some(newest_ms), Some(start_ms)) = (page_newest_ms, start_ms)
1524                    && newest_ms < start_ms.saturating_sub(slot_ms * 2)
1525                {
1526                    // Bars are too old (more than 2 bar periods before start), stop
1527                    break;
1528                }
1529            }
1530
1531            // Track contribution for progress guard
1532            let contribution;
1533
1534            if out.is_empty() {
1535                contribution = filtered.len();
1536                out = filtered;
1537            } else {
1538                match mode {
1539                    Mode::Backward | Mode::Latest => {
1540                        if let Some(first) = out.first() {
1541                            filtered.retain(|b| b.ts_event < first.ts_event);
1542                        }
1543                        contribution = filtered.len();
1544                        if contribution != 0 {
1545                            let mut new_out = Vec::with_capacity(out.len() + filtered.len());
1546                            new_out.extend_from_slice(&filtered);
1547                            new_out.extend_from_slice(&out);
1548                            out = new_out;
1549                        }
1550                    }
1551                    Mode::Range => {
1552                        if forward_prepend_mode || req_used_before {
1553                            // We are backfilling older pages: prepend them.
1554                            if let Some(first) = out.first() {
1555                                filtered.retain(|b| b.ts_event < first.ts_event);
1556                            }
1557                            contribution = filtered.len();
1558                            if contribution != 0 {
1559                                let mut new_out = Vec::with_capacity(out.len() + filtered.len());
1560                                new_out.extend_from_slice(&filtered);
1561                                new_out.extend_from_slice(&out);
1562                                out = new_out;
1563                            }
1564                        } else {
1565                            // Normal forward: append newer pages.
1566                            if let Some(last) = out.last() {
1567                                filtered.retain(|b| b.ts_event > last.ts_event);
1568                            }
1569                            contribution = filtered.len();
1570                            out.extend(filtered);
1571                        }
1572                    }
1573                }
1574            }
1575
1576            // Duplicate-window mitigation for Latest/Backward
1577            if contribution == 0
1578                && matches!(mode, Mode::Latest | Mode::Backward)
1579                && let Some(b) = before_ms
1580            {
1581                let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
1582                let new_b = b.saturating_sub(jump);
1583                if new_b != b {
1584                    before_ms = Some(new_b);
1585                }
1586            }
1587
1588            if contribution == 0 {
1589                progressless_loops = progressless_loops.saturating_add(1);
1590                if progressless_loops >= 3 {
1591                    break;
1592                }
1593            } else {
1594                progressless_loops = 0;
1595
1596                // Advance cursors only when we made progress
1597                match mode {
1598                    Mode::Latest | Mode::Backward => {
1599                        if let Some(oldest) = page_oldest_ms {
1600                            before_ms = Some(oldest.saturating_sub(1));
1601                            have_latest_first_page = true;
1602                        } else {
1603                            break;
1604                        }
1605                    }
1606                    Mode::Range => {
1607                        if forward_prepend_mode || req_used_before {
1608                            if let Some(oldest) = page_oldest_ms {
1609                                // Move back by at least one bar period to avoid getting the same data
1610                                let jump_back = slot_ms.max(60_000); // At least 1 minute
1611                                before_ms = Some(oldest.saturating_sub(jump_back));
1612                                after_ms = None;
1613                            } else {
1614                                break;
1615                            }
1616                        } else if let Some(newest) = page_newest_ms {
1617                            after_ms = Some(newest.saturating_add(1));
1618                            before_ms = None;
1619                        } else {
1620                            break;
1621                        }
1622                    }
1623                }
1624            }
1625
1626            // Stop conditions
1627            if let Some(lim) = limit
1628                && lim > 0
1629                && out.len() >= lim as usize
1630            {
1631                break;
1632            }
1633            if let Some(ens) = end_ns
1634                && let Some(last) = out.last()
1635                && last.ts_event.as_i64() >= ens
1636            {
1637                break;
1638            }
1639            if let Some(sns) = start_ns
1640                && let Some(first) = out.first()
1641                && (matches!(mode, Mode::Backward) || forward_prepend_mode)
1642                && first.ts_event.as_i64() <= sns
1643            {
1644                // For Range mode, check if we have all bars up to the end time
1645                if matches!(mode, Mode::Range) {
1646                    // Don't stop if we haven't reached the end time yet
1647                    if let Some(ens) = end_ns
1648                        && let Some(last) = out.last()
1649                    {
1650                        let last_ts = last.ts_event.as_i64();
1651                        if last_ts < ens {
1652                            // We have bars before start but haven't reached end, need to continue forward
1653                            // Switch from backward to forward pagination
1654                            forward_prepend_mode = false;
1655                            after_ms = Some((last_ts / 1_000_000).saturating_add(1));
1656                            before_ms = None;
1657                            continue;
1658                        }
1659                    }
1660                }
1661                break;
1662            }
1663
1664            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1665        }
1666
1667        // Final rescue for FORWARD/RANGE when nothing gathered
1668        if out.is_empty() && matches!(mode, Mode::Range) {
1669            let pivot = end_ms.unwrap_or(now_ms.saturating_sub(1));
1670            let hist = (now_ms.saturating_sub(pivot)) / (24 * 60 * 60 * 1000) > HISTORY_SPLIT_DAYS;
1671            let mut p = GetCandlesticksParamsBuilder::default();
1672            p.inst_id(symbol.as_str())
1673                .bar(&bar_param)
1674                .limit(300)
1675                .before_ms(pivot);
1676            let params = p.build().map_err(anyhow::Error::new)?;
1677            let raw = if hist {
1678                self.inner.http_get_candlesticks_history(params).await
1679            } else {
1680                self.inner.http_get_candlesticks(params).await
1681            }
1682            .map_err(anyhow::Error::new)?;
1683            if !raw.is_empty() {
1684                let ts_init = self.generate_ts_init();
1685                let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
1686                for r in &raw {
1687                    page.push(parse_candlestick(
1688                        r,
1689                        bar_type,
1690                        inst.price_precision(),
1691                        inst.size_precision(),
1692                        ts_init,
1693                    )?);
1694                }
1695                page.reverse();
1696                out = page
1697                    .into_iter()
1698                    .filter(|b| {
1699                        let ts = b.ts_event.as_i64();
1700                        let ok_after = start_ns.is_none_or(|sns| ts >= sns);
1701                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1702                        ok_after && ok_before
1703                    })
1704                    .collect();
1705            }
1706        }
1707
1708        // Trim against end bound if needed (keep ≤ end)
1709        if let Some(ens) = end_ns {
1710            while out.last().is_some_and(|b| b.ts_event.as_i64() > ens) {
1711                out.pop();
1712            }
1713        }
1714
1715        // Clamp first bar for Range when using forward pagination
1716        if matches!(mode, Mode::Range)
1717            && !forward_prepend_mode
1718            && let Some(sns) = start_ns
1719        {
1720            let lower = sns.saturating_sub(slot_ns);
1721            while out.first().is_some_and(|b| b.ts_event.as_i64() < lower) {
1722                out.remove(0);
1723            }
1724        }
1725
1726        if let Some(lim) = limit
1727            && lim > 0
1728            && out.len() > lim as usize
1729        {
1730            out.truncate(lim as usize);
1731        }
1732
1733        Ok(out)
1734    }
1735
1736    /// Requests historical order status reports for the given parameters.
1737    ///
1738    /// # References
1739    ///
1740    /// - <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-history-last-7-days>.
1741    /// - <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-history-last-3-months>.
1742    #[allow(clippy::too_many_arguments)]
1743    pub async fn request_order_status_reports(
1744        &self,
1745        account_id: AccountId,
1746        instrument_type: Option<OKXInstrumentType>,
1747        instrument_id: Option<InstrumentId>,
1748        start: Option<DateTime<Utc>>,
1749        end: Option<DateTime<Utc>>,
1750        open_only: bool,
1751        limit: Option<u32>,
1752    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1753        // Build params for order history
1754        let mut history_params = GetOrderHistoryParamsBuilder::default();
1755
1756        let instrument_type = if let Some(instrument_type) = instrument_type {
1757            instrument_type
1758        } else {
1759            let instrument_id = instrument_id.ok_or_else(|| {
1760                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
1761            })?;
1762            let instrument = self
1763                .instrument_or_fetch(instrument_id.symbol.inner())
1764                .await?;
1765            okx_instrument_type(&instrument)?
1766        };
1767
1768        history_params.inst_type(instrument_type);
1769
1770        if let Some(instrument_id) = instrument_id.as_ref() {
1771            history_params.inst_id(instrument_id.symbol.inner().to_string());
1772        }
1773
1774        if let Some(limit) = limit {
1775            history_params.limit(limit);
1776        }
1777
1778        let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
1779
1780        // Build params for pending orders
1781        let mut pending_params = GetOrderListParamsBuilder::default();
1782        pending_params.inst_type(instrument_type);
1783
1784        if let Some(instrument_id) = instrument_id.as_ref() {
1785            pending_params.inst_id(instrument_id.symbol.inner().to_string());
1786        }
1787
1788        if let Some(limit) = limit {
1789            pending_params.limit(limit);
1790        }
1791
1792        let pending_params = pending_params.build().map_err(|e| anyhow::anyhow!(e))?;
1793
1794        let combined_resp = if open_only {
1795            // Only request pending/open orders
1796            self.inner
1797                .http_get_order_list(pending_params)
1798                .await
1799                .map_err(|e| anyhow::anyhow!(e))?
1800        } else {
1801            // Make both requests concurrently
1802            let (history_resp, pending_resp) = tokio::try_join!(
1803                self.inner.http_get_order_history(history_params),
1804                self.inner.http_get_order_list(pending_params)
1805            )
1806            .map_err(|e| anyhow::anyhow!(e))?;
1807
1808            // Combine both responses
1809            let mut combined_resp = history_resp;
1810            combined_resp.extend(pending_resp);
1811            combined_resp
1812        };
1813
1814        // Prepare time range filter
1815        let start_ns = start.map(UnixNanos::from);
1816        let end_ns = end.map(UnixNanos::from);
1817
1818        let ts_init = self.generate_ts_init();
1819        let mut reports = Vec::with_capacity(combined_resp.len());
1820
1821        // Use a seen filter in case pending orders are within the histories "2hr reserve window"
1822        let mut seen = AHashSet::new();
1823
1824        for order in combined_resp {
1825            if seen.contains(&order.cl_ord_id) {
1826                continue; // Reserved pending already reported
1827            }
1828            seen.insert(order.cl_ord_id);
1829
1830            let inst = self.instrument_or_fetch(order.inst_id).await?;
1831
1832            let report = parse_order_status_report(
1833                &order,
1834                account_id,
1835                inst.id(),
1836                inst.price_precision(),
1837                inst.size_precision(),
1838                ts_init,
1839            );
1840
1841            if let Some(start_ns) = start_ns
1842                && report.ts_last < start_ns
1843            {
1844                continue;
1845            }
1846            if let Some(end_ns) = end_ns
1847                && report.ts_last > end_ns
1848            {
1849                continue;
1850            }
1851
1852            reports.push(report);
1853        }
1854
1855        Ok(reports)
1856    }
1857
1858    /// Requests fill reports (transaction details) for the given parameters.
1859    ///
1860    /// # References
1861    ///
1862    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-transaction-details-last-3-days>.
1863    pub async fn request_fill_reports(
1864        &self,
1865        account_id: AccountId,
1866        instrument_type: Option<OKXInstrumentType>,
1867        instrument_id: Option<InstrumentId>,
1868        start: Option<DateTime<Utc>>,
1869        end: Option<DateTime<Utc>>,
1870        limit: Option<u32>,
1871    ) -> anyhow::Result<Vec<FillReport>> {
1872        let mut params = GetTransactionDetailsParamsBuilder::default();
1873
1874        let instrument_type = if let Some(instrument_type) = instrument_type {
1875            instrument_type
1876        } else {
1877            let instrument_id = instrument_id.ok_or_else(|| {
1878                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
1879            })?;
1880            let instrument = self
1881                .instrument_or_fetch(instrument_id.symbol.inner())
1882                .await?;
1883            okx_instrument_type(&instrument)?
1884        };
1885
1886        params.inst_type(instrument_type);
1887
1888        if let Some(instrument_id) = instrument_id {
1889            let instrument = self
1890                .instrument_or_fetch(instrument_id.symbol.inner())
1891                .await?;
1892            let instrument_type = okx_instrument_type(&instrument)?;
1893            params.inst_type(instrument_type);
1894            params.inst_id(instrument_id.symbol.inner().to_string());
1895        }
1896
1897        if let Some(limit) = limit {
1898            params.limit(limit);
1899        }
1900
1901        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1902
1903        let resp = self
1904            .inner
1905            .http_get_transaction_details(params)
1906            .await
1907            .map_err(|e| anyhow::anyhow!(e))?;
1908
1909        // Prepare time range filter
1910        let start_ns = start.map(UnixNanos::from);
1911        let end_ns = end.map(UnixNanos::from);
1912
1913        let ts_init = self.generate_ts_init();
1914        let mut reports = Vec::with_capacity(resp.len());
1915
1916        for detail in resp {
1917            let inst = self.instrument_or_fetch(detail.inst_id).await?;
1918
1919            let report = parse_fill_report(
1920                detail,
1921                account_id,
1922                inst.id(),
1923                inst.price_precision(),
1924                inst.size_precision(),
1925                ts_init,
1926            )?;
1927
1928            if let Some(start_ns) = start_ns
1929                && report.ts_event < start_ns
1930            {
1931                continue;
1932            }
1933
1934            if let Some(end_ns) = end_ns
1935                && report.ts_event > end_ns
1936            {
1937                continue;
1938            }
1939
1940            reports.push(report);
1941        }
1942
1943        Ok(reports)
1944    }
1945
1946    /// Requests current position status reports for the given parameters.
1947    ///
1948    /// # References
1949    ///
1950    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions>.
1951    pub async fn request_position_status_reports(
1952        &self,
1953        account_id: AccountId,
1954        instrument_type: Option<OKXInstrumentType>,
1955        instrument_id: Option<InstrumentId>,
1956    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1957        let mut params = GetPositionsParamsBuilder::default();
1958
1959        let instrument_type = if let Some(instrument_type) = instrument_type {
1960            instrument_type
1961        } else {
1962            let instrument_id = instrument_id.ok_or_else(|| {
1963                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
1964            })?;
1965            let instrument = self
1966                .instrument_or_fetch(instrument_id.symbol.inner())
1967                .await?;
1968            okx_instrument_type(&instrument)?
1969        };
1970
1971        params.inst_type(instrument_type);
1972
1973        instrument_id
1974            .as_ref()
1975            .map(|i| params.inst_id(i.symbol.inner()));
1976
1977        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1978
1979        let resp = self
1980            .inner
1981            .http_get_positions(params)
1982            .await
1983            .map_err(|e| anyhow::anyhow!(e))?;
1984
1985        let ts_init = self.generate_ts_init();
1986        let mut reports = Vec::with_capacity(resp.len());
1987
1988        for position in resp {
1989            let inst = self.instrument_or_fetch(position.inst_id).await?;
1990
1991            let report = parse_position_status_report(
1992                position,
1993                account_id,
1994                inst.id(),
1995                inst.size_precision(),
1996                ts_init,
1997            );
1998            reports.push(report);
1999        }
2000
2001        Ok(reports)
2002    }
2003}