nautilus_okx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides an ergonomic wrapper around the **OKX v5 REST API** –
17//! <https://www.okx.com/docs-v5/en/>.
18//!
19//! The core type exported by this module is [`OKXHttpClient`].  It offers a
20//! *strongly-typed* interface to all exchange endpoints currently required by
21//! NautilusTrader.
22//!
23//! Key responsibilities handled internally:
24//! • Request signing and header composition for private routes (HMAC-SHA256).
25//! • Rate-limiting based on the public OKX specification.
26//! • Zero-copy deserialization of large JSON payloads into domain models.
27//! • Conversion of raw exchange errors into the rich [`OKXHttpError`] enum.
28//!
29//! # Quick links to official docs
30//! | Domain                               | OKX reference                                                             |
31//! |--------------------------------------|---------------------------------------------------------------------------|
32//! | Market data                          | <https://www.okx.com/docs-v5/en/#rest-api-market-data>                    |
33//! | Account & positions                  | <https://www.okx.com/docs-v5/en/#rest-api-account>                       |
34//! | Funding & asset balances             | <https://www.okx.com/docs-v5/en/#rest-api-funding>                       |
35
36use std::{
37    collections::HashMap,
38    fmt::Debug,
39    num::NonZeroU32,
40    sync::{Arc, LazyLock, Mutex},
41};
42
43use ahash::AHashSet;
44use chrono::{DateTime, Utc};
45use nautilus_core::{
46    UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_env_var, time::get_atomic_clock_realtime,
47};
48use nautilus_model::{
49    data::{Bar, BarType, IndexPriceUpdate, MarkPriceUpdate, TradeTick},
50    enums::{AggregationSource, BarAggregation},
51    events::AccountState,
52    identifiers::{AccountId, InstrumentId},
53    instruments::{Instrument, InstrumentAny},
54    reports::{FillReport, OrderStatusReport, PositionStatusReport},
55};
56use nautilus_network::{http::HttpClient, ratelimiter::quota::Quota};
57use reqwest::{Method, StatusCode, header::USER_AGENT};
58use serde::{Deserialize, Serialize, de::DeserializeOwned};
59use ustr::Ustr;
60
61use super::{
62    error::OKXHttpError,
63    models::{
64        OKXAccount, OKXIndexTicker, OKXMarkPrice, OKXOrderHistory, OKXPosition, OKXPositionHistory,
65        OKXPositionTier, OKXTransactionDetail,
66    },
67    query::{
68        GetCandlesticksParams, GetCandlesticksParamsBuilder, GetIndexTickerParams,
69        GetIndexTickerParamsBuilder, GetInstrumentsParams, GetInstrumentsParamsBuilder,
70        GetMarkPriceParams, GetMarkPriceParamsBuilder, GetOrderHistoryParams,
71        GetOrderHistoryParamsBuilder, GetOrderListParams, GetOrderListParamsBuilder,
72        GetPositionTiersParams, GetPositionsHistoryParams, GetPositionsParams,
73        GetPositionsParamsBuilder, GetTradesParams, GetTradesParamsBuilder,
74        GetTransactionDetailsParams, GetTransactionDetailsParamsBuilder, SetPositionModeParams,
75        SetPositionModeParamsBuilder,
76    },
77};
78use crate::{
79    common::{
80        consts::OKX_HTTP_URL,
81        credential::Credential,
82        enums::{OKXInstrumentType, OKXPositionMode},
83        models::OKXInstrument,
84        parse::{
85            okx_instrument_type, parse_account_state, parse_candlestick, parse_fill_report,
86            parse_index_price_update, parse_instrument_any, parse_mark_price_update,
87            parse_order_status_report, parse_position_status_report, parse_trade_tick,
88        },
89    },
90    http::{
91        models::{OKXCandlestick, OKXTrade},
92        query::{GetOrderParams, GetPendingOrdersParams},
93    },
94};
95
96const OKX_SUCCESS_CODE: &str = "0";
97
98/// Default OKX REST API rate limit: 500 requests per 2 seconds.
99///
100/// - Sub-account order limit: 1000 requests per 2 seconds.
101/// - Account balance: 10 requests per 2 seconds.
102/// - Account instruments: 20 requests per 2 seconds.
103///
104/// We use a conservative 250 requests per second (500 per 2 seconds) as a general limit
105/// that should accommodate most use cases while respecting OKX's documented limits.
106pub static OKX_REST_QUOTA: LazyLock<Quota> =
107    LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
108
109/// Represents an OKX HTTP response.
110#[derive(Debug, Serialize, Deserialize)]
111pub struct OKXResponse<T> {
112    /// The OKX response code, which is `"0"` for success.
113    pub code: String,
114    /// A message string which can be informational or describe an error cause.
115    pub msg: String,
116    /// The typed data returned by the OKX endpoint.
117    pub data: Vec<T>,
118}
119
120/// Provides a HTTP client for connecting to the [OKX](https://okx.com) REST API.
121///
122/// This client wraps the underlying [`HttpClient`] to handle functionality
123/// specific to OKX, such as request signing (for authenticated endpoints),
124/// forming request URLs, and deserializing responses into specific data models.
125pub struct OKXHttpInnerClient {
126    base_url: String,
127    client: HttpClient,
128    credential: Option<Credential>,
129}
130
131impl Default for OKXHttpInnerClient {
132    fn default() -> Self {
133        Self::new(None, Some(60))
134    }
135}
136
137impl Debug for OKXHttpInnerClient {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        let credential = self.credential.as_ref().map(|_| "<redacted>");
140        f.debug_struct(stringify!(OKXHttpInnerClient))
141            .field("base_url", &self.base_url)
142            .field("credential", &credential)
143            .finish_non_exhaustive()
144    }
145}
146
147impl OKXHttpInnerClient {
148    /// Creates a new [`OKXHttpClient`] using the default OKX HTTP URL,
149    /// optionally overridden with a custom base URL.
150    ///
151    /// This version of the client has **no credentials**, so it can only
152    /// call publicly accessible endpoints.
153    pub fn new(base_url: Option<String>, timeout_secs: Option<u64>) -> Self {
154        Self {
155            base_url: base_url.unwrap_or(OKX_HTTP_URL.to_string()),
156            client: HttpClient::new(
157                Self::default_headers(),
158                vec![],
159                vec![],
160                Some(*OKX_REST_QUOTA),
161                timeout_secs,
162            ),
163            credential: None,
164        }
165    }
166
167    /// Creates a new [`OKXHttpClient`] configured with credentials
168    /// for authenticated requests, optionally using a custom base URL.
169    pub fn with_credentials(
170        api_key: String,
171        api_secret: String,
172        api_passphrase: String,
173        base_url: String,
174        timeout_secs: Option<u64>,
175    ) -> Self {
176        Self {
177            base_url,
178            client: HttpClient::new(
179                Self::default_headers(),
180                vec![],
181                vec![],
182                Some(*OKX_REST_QUOTA),
183                timeout_secs,
184            ),
185            credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
186        }
187    }
188
189    /// Builds the default headers to include with each request (e.g., `User-Agent`).
190    fn default_headers() -> HashMap<String, String> {
191        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
192    }
193
194    /// Combine a base path with a `serde_urlencoded` query string if one exists.
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if the query string serialization fails.
199    fn build_path<S: Serialize>(base: &str, params: &S) -> Result<String, OKXHttpError> {
200        let query = serde_urlencoded::to_string(params)
201            .map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
202        if query.is_empty() {
203            Ok(base.to_owned())
204        } else {
205            Ok(format!("{base}?{query}"))
206        }
207    }
208
209    /// Signs an OKX request with timestamp, API key, passphrase, and signature.
210    ///
211    /// # Errors
212    ///
213    /// Returns [`OKXHttpError::MissingCredentials`] if no credentials are set
214    /// but the request requires authentication.
215    fn sign_request(
216        &self,
217        method: &Method,
218        path: &str,
219        body: Option<&[u8]>,
220    ) -> Result<HashMap<String, String>, OKXHttpError> {
221        let credential = match self.credential.as_ref() {
222            Some(c) => c,
223            None => return Err(OKXHttpError::MissingCredentials),
224        };
225
226        let body_str = body
227            .and_then(|b| String::from_utf8(b.to_vec()).ok())
228            .unwrap_or_default();
229
230        tracing::debug!("{method} {path}");
231
232        let api_key = credential.api_key.clone().to_string();
233        let api_passphrase = credential.api_passphrase.clone();
234        let timestamp = Utc::now().format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string();
235        let signature = credential.sign(&timestamp, method.as_str(), path, &body_str);
236
237        let mut headers = HashMap::new();
238        headers.insert("OK-ACCESS-KEY".to_string(), api_key);
239        headers.insert("OK-ACCESS-PASSPHRASE".to_string(), api_passphrase);
240        headers.insert("OK-ACCESS-TIMESTAMP".to_string(), timestamp);
241        headers.insert("OK-ACCESS-SIGN".to_string(), signature);
242
243        Ok(headers)
244    }
245
246    /// Sends an HTTP request to OKX and parses the response into `Vec<T>`.
247    ///
248    /// Internally, this method handles:
249    /// - Building the URL from `base_url` + `path`
250    /// - Optionally signing the request
251    /// - Deserializing JSON responses into typed models, or returning a [`OKXHttpError`]
252    ///
253    /// # Errors
254    ///
255    /// This function will return an error if:
256    /// - The HTTP request fails.
257    /// - Authentication is required but credentials are missing.
258    /// - The response cannot be deserialized into the expected type.
259    /// - The OKX API returns an error response.
260    async fn send_request<T: DeserializeOwned>(
261        &self,
262        method: Method,
263        path: &str,
264        body: Option<Vec<u8>>,
265        authenticate: bool,
266    ) -> Result<Vec<T>, OKXHttpError> {
267        let url = format!("{}{path}", self.base_url);
268
269        let mut headers = if authenticate {
270            self.sign_request(&method, path, body.as_deref())?
271        } else {
272            HashMap::new()
273        };
274
275        // Always set Content-Type header when body is present
276        if body.is_some() {
277            headers.insert("Content-Type".to_string(), "application/json".to_string());
278        }
279
280        let resp = self
281            .client
282            .request(method.clone(), url, Some(headers), body, None, None)
283            .await?;
284
285        tracing::trace!("Response: {resp:?}");
286
287        if resp.status.is_success() {
288            let okx_response: OKXResponse<T> = serde_json::from_slice(&resp.body).map_err(|e| {
289                tracing::error!("Failed to deserialize OKXResponse: {e}");
290                OKXHttpError::JsonError(e.to_string())
291            })?;
292
293            if okx_response.code != OKX_SUCCESS_CODE {
294                return Err(OKXHttpError::OkxError {
295                    error_code: okx_response.code,
296                    message: okx_response.msg,
297                });
298            }
299
300            Ok(okx_response.data)
301        } else {
302            let error_body = String::from_utf8_lossy(&resp.body);
303            tracing::error!(
304                "HTTP error {} with body: {error_body}",
305                resp.status.as_str()
306            );
307
308            if let Ok(parsed_error) = serde_json::from_slice::<OKXResponse<T>>(&resp.body) {
309                return Err(OKXHttpError::OkxError {
310                    error_code: parsed_error.code,
311                    message: parsed_error.msg,
312                });
313            }
314
315            Err(OKXHttpError::UnexpectedStatus {
316                status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
317                body: error_body.to_string(),
318            })
319        }
320    }
321
322    /// Sets the position mode for an account.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if JSON serialization of `params` fails, if the HTTP
327    /// request fails, or if the response body cannot be deserialized.
328    ///
329    /// # References
330    ///
331    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-set-position-mode>
332    pub async fn http_set_position_mode(
333        &self,
334        params: SetPositionModeParams,
335    ) -> Result<Vec<serde_json::Value>, OKXHttpError> {
336        let path = "/api/v5/account/set-position-mode";
337        let body = serde_json::to_vec(&params)?;
338        self.send_request(Method::POST, path, Some(body), true)
339            .await
340    }
341
342    /// Requests position tiers information, maximum leverage depends on your borrowings and margin ratio.
343    ///
344    /// # Errors
345    ///
346    /// Returns an error if the HTTP request fails, authentication is rejected
347    /// or the response cannot be deserialized.
348    ///
349    /// # References
350    ///
351    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-position-tiers>
352    pub async fn http_get_position_tiers(
353        &self,
354        params: GetPositionTiersParams,
355    ) -> Result<Vec<OKXPositionTier>, OKXHttpError> {
356        let path = Self::build_path("/api/v5/public/position-tiers", &params)?;
357        self.send_request(Method::GET, &path, None, false).await
358    }
359
360    /// Requests a list of instruments with open contracts.
361    ///
362    /// # Errors
363    ///
364    /// Returns an error if JSON serialization of `params` fails, if the HTTP
365    /// request fails, or if the response body cannot be deserialized.
366    ///
367    /// # References
368    ///
369    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-instruments>
370    pub async fn http_get_instruments(
371        &self,
372        params: GetInstrumentsParams,
373    ) -> Result<Vec<OKXInstrument>, OKXHttpError> {
374        let path = Self::build_path("/api/v5/public/instruments", &params)?;
375        self.send_request(Method::GET, &path, None, false).await
376    }
377
378    /// Requests a mark price.
379    ///
380    /// We set the mark price based on the SPOT index and at a reasonable basis to prevent individual
381    /// users from manipulating the market and causing the contract price to fluctuate.
382    ///
383    /// # Errors
384    ///
385    /// Returns an error if the HTTP request fails or if the response body
386    /// cannot be parsed into [`OKXMarkPrice`].
387    ///
388    /// # References
389    ///
390    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-mark-price>
391    pub async fn http_get_mark_price(
392        &self,
393        params: GetMarkPriceParams,
394    ) -> Result<Vec<OKXMarkPrice>, OKXHttpError> {
395        let path = Self::build_path("/api/v5/public/mark-price", &params)?;
396        self.send_request(Method::GET, &path, None, false).await
397    }
398
399    /// Requests the latest index price.
400    ///
401    /// # References
402    ///
403    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-index-tickers>
404    pub async fn http_get_index_ticker(
405        &self,
406        params: GetIndexTickerParams,
407    ) -> Result<Vec<OKXIndexTicker>, OKXHttpError> {
408        let path = Self::build_path("/api/v5/market/index-tickers", &params)?;
409        self.send_request(Method::GET, &path, None, false).await
410    }
411
412    /// Requests trades history.
413    ///
414    /// # References
415    ///
416    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-trades-history>
417    pub async fn http_get_trades(
418        &self,
419        params: GetTradesParams,
420    ) -> Result<Vec<OKXTrade>, OKXHttpError> {
421        let path = Self::build_path("/api/v5/market/history-trades", &params)?;
422        self.send_request(Method::GET, &path, None, false).await
423    }
424
425    /// Requests recent candlestick data.
426    ///
427    /// # References
428    ///
429    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks>
430    pub async fn http_get_candlesticks(
431        &self,
432        params: GetCandlesticksParams,
433    ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
434        let path = Self::build_path("/api/v5/market/candles", &params)?;
435        self.send_request(Method::GET, &path, None, false).await
436    }
437
438    /// Requests historical candlestick data.
439    ///
440    /// # References
441    ///
442    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks-history>
443    pub async fn http_get_candlesticks_history(
444        &self,
445        params: GetCandlesticksParams,
446    ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
447        let path = Self::build_path("/api/v5/market/history-candles", &params)?;
448        self.send_request(Method::GET, &path, None, false).await
449    }
450
451    /// Lists current open orders.
452    ///
453    /// # References
454    ///
455    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-orders-pending>
456    pub async fn http_get_pending_orders(
457        &self,
458        params: GetPendingOrdersParams,
459    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
460        let path = Self::build_path("/api/v5/trade/orders-pending", &params)?;
461        self.send_request(Method::GET, &path, None, true).await
462    }
463
464    /// Retrieves a single order’s details.
465    ///
466    /// # References
467    ///
468    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order>
469    pub async fn http_get_order(
470        &self,
471        params: GetOrderParams,
472    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
473        let path = Self::build_path("/api/v5/trade/order", &params)?;
474        self.send_request(Method::GET, &path, None, true).await
475    }
476
477    /// Requests a list of assets (with non-zero balance), remaining balance, and available amount
478    /// in the trading account.
479    ///
480    /// # References
481    ///
482    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-balance>
483    pub async fn http_get_balance(&self) -> Result<Vec<OKXAccount>, OKXHttpError> {
484        let path = "/api/v5/account/balance";
485        self.send_request(Method::GET, path, None, true).await
486    }
487
488    /// Requests historical order records.
489    ///
490    /// # References
491    ///
492    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-orders-history>
493    pub async fn http_get_order_history(
494        &self,
495        params: GetOrderHistoryParams,
496    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
497        let path = Self::build_path("/api/v5/trade/orders-history", &params)?;
498        self.send_request(Method::GET, &path, None, true).await
499    }
500
501    /// Requests order list (pending orders).
502    ///
503    /// # References
504    ///
505    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-list>
506    pub async fn http_get_order_list(
507        &self,
508        params: GetOrderListParams,
509    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
510        let path = Self::build_path("/api/v5/trade/orders-pending", &params)?;
511        self.send_request(Method::GET, &path, None, true).await
512    }
513
514    /// Requests information on your positions. When the account is in net mode, net positions will
515    /// be displayed, and when the account is in long/short mode, long or short positions will be
516    /// displayed. Returns in reverse chronological order using ctime.
517    ///
518    /// # References
519    ///
520    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions>
521    pub async fn http_get_positions(
522        &self,
523        params: GetPositionsParams,
524    ) -> Result<Vec<OKXPosition>, OKXHttpError> {
525        let path = Self::build_path("/api/v5/account/positions", &params)?;
526        self.send_request(Method::GET, &path, None, true).await
527    }
528
529    /// Requests closed or historical position data.
530    ///
531    /// # References
532    ///
533    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions-history>
534    pub async fn http_get_position_history(
535        &self,
536        params: GetPositionsHistoryParams,
537    ) -> Result<Vec<OKXPositionHistory>, OKXHttpError> {
538        let path = Self::build_path("/api/v5/account/positions-history", &params)?;
539        self.send_request(Method::GET, &path, None, true).await
540    }
541
542    /// Requests transaction details (fills) for the given parameters.
543    ///
544    /// # References
545    ///
546    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-transaction-details-last-3-days>
547    pub async fn http_get_transaction_details(
548        &self,
549        params: GetTransactionDetailsParams,
550    ) -> Result<Vec<OKXTransactionDetail>, OKXHttpError> {
551        let path = Self::build_path("/api/v5/trade/fills", &params)?;
552        self.send_request(Method::GET, &path, None, true).await
553    }
554}
555
556/// Provides a higher-level HTTP client for the [OKX](https://okx.com) REST API.
557///
558/// This client wraps the underlying `OKXHttpInnerClient` to handle conversions
559/// into the Nautilus domain model.
560#[derive(Clone, Debug)]
561#[cfg_attr(
562    feature = "python",
563    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
564)]
565pub struct OKXHttpClient {
566    pub(crate) inner: Arc<OKXHttpInnerClient>,
567    pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
568    cache_initialized: bool,
569}
570
571impl Default for OKXHttpClient {
572    fn default() -> Self {
573        Self::new(None, Some(60))
574    }
575}
576
577impl OKXHttpClient {
578    /// Creates a new [`OKXHttpClient`] using the default OKX HTTP URL,
579    /// optionally overridden with a custom base url.
580    ///
581    /// This version of the client has **no credentials**, so it can only
582    /// call publicly accessible endpoints.
583    pub fn new(base_url: Option<String>, timeout_secs: Option<u64>) -> Self {
584        Self {
585            inner: Arc::new(OKXHttpInnerClient::new(base_url, timeout_secs)),
586            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
587            cache_initialized: false,
588        }
589    }
590
591    /// Creates a new authenticated [`OKXHttpClient`] using environment variables and
592    /// the default OKX HTTP base url.
593    pub fn from_env() -> anyhow::Result<Self> {
594        Self::with_credentials(None, None, None, None, None)
595    }
596
597    /// Creates a new [`OKXHttpClient`] configured with credentials
598    /// for authenticated requests, optionally using a custom base url.
599    pub fn with_credentials(
600        api_key: Option<String>,
601        api_secret: Option<String>,
602        api_passphrase: Option<String>,
603        base_url: Option<String>,
604        timeout_secs: Option<u64>,
605    ) -> anyhow::Result<Self> {
606        let api_key = api_key.unwrap_or(get_env_var("OKX_API_KEY")?);
607        let api_secret = api_secret.unwrap_or(get_env_var("OKX_API_SECRET")?);
608        let api_passphrase = api_passphrase.unwrap_or(get_env_var("OKX_API_PASSPHRASE")?);
609        let base_url = base_url.unwrap_or(OKX_HTTP_URL.to_string());
610
611        Ok(Self {
612            inner: Arc::new(OKXHttpInnerClient::with_credentials(
613                api_key,
614                api_secret,
615                api_passphrase,
616                base_url,
617                timeout_secs,
618            )),
619            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
620            cache_initialized: false,
621        })
622    }
623
624    /// Retrieves an instrument from the cache.
625    ///
626    /// # Errors
627    ///
628    /// Returns an error if the instrument is not found in the cache.
629    fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
630        self.instruments_cache
631            .lock()
632            .expect("`instruments_cache` lock poisoned")
633            .get(&symbol)
634            .cloned()
635            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
636    }
637
638    async fn instrument_or_fetch(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
639        if let Ok(inst) = self.get_instrument_from_cache(symbol) {
640            return Ok(inst);
641        }
642
643        for group in [
644            OKXInstrumentType::Spot,
645            OKXInstrumentType::Margin,
646            OKXInstrumentType::Futures,
647        ] {
648            if let Ok(instruments) = self.request_instruments(group).await {
649                let mut guard = self.instruments_cache.lock().unwrap();
650                for inst in instruments {
651                    guard.insert(inst.raw_symbol().inner(), inst);
652                }
653                drop(guard);
654
655                if let Ok(inst) = self.get_instrument_from_cache(symbol) {
656                    return Ok(inst);
657                }
658            }
659        }
660
661        anyhow::bail!("Instrument {symbol} not in cache and fetch failed");
662    }
663
664    /// Returns the base url being used by the client.
665    pub fn base_url(&self) -> &str {
666        self.inner.base_url.as_str()
667    }
668
669    /// Returns the public API key being used by the client.
670    pub fn api_key(&self) -> Option<&str> {
671        self.inner.credential.as_ref().map(|c| c.api_key.as_str())
672    }
673
674    /// Checks if the client is initialized.
675    ///
676    /// The client is considered initialized if any instruments have been cached from the venue.
677    #[must_use]
678    pub const fn is_initialized(&self) -> bool {
679        self.cache_initialized
680    }
681
682    /// Generates a timestamp for initialization.
683    fn generate_ts_init(&self) -> UnixNanos {
684        get_atomic_clock_realtime().get_time_ns()
685    }
686
687    /// Returns the cached instrument symbols.
688    #[must_use]
689    /// Returns a snapshot of all instrument symbols currently held in the
690    /// internal cache.
691    ///
692    /// # Panics
693    ///
694    /// Panics if the internal mutex guarding the instrument cache is poisoned
695    /// (which would indicate a previous panic while the lock was held).
696    pub fn get_cached_symbols(&self) -> Vec<String> {
697        self.instruments_cache
698            .lock()
699            .unwrap()
700            .keys()
701            .map(std::string::ToString::to_string)
702            .collect()
703    }
704
705    /// Adds the `instruments` to the clients instrument cache.
706    ///
707    /// Any existing instruments will be replaced.
708    /// Inserts multiple instruments into the local cache.
709    ///
710    /// # Panics
711    ///
712    /// Panics if the instruments cache mutex is poisoned.
713    pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
714        for inst in instruments {
715            self.instruments_cache
716                .lock()
717                .unwrap()
718                .insert(inst.raw_symbol().inner(), inst);
719        }
720        self.cache_initialized = true;
721    }
722
723    /// Adds the `instrument` to the clients instrument cache.
724    ///
725    /// Any existing instrument will be replaced.
726    /// Inserts a single instrument into the local cache.
727    ///
728    /// # Panics
729    ///
730    /// Panics if the instruments cache mutex is poisoned.
731    pub fn add_instrument(&mut self, instrument: InstrumentAny) {
732        self.instruments_cache
733            .lock()
734            .unwrap()
735            .insert(instrument.raw_symbol().inner(), instrument);
736        self.cache_initialized = true;
737    }
738
739    /// Requests the account state for the `account_id` from OKX.
740    ///
741    /// # Errors
742    ///
743    /// Returns an error if the HTTP request fails or no account state is returned.
744    pub async fn request_account_state(
745        &self,
746        account_id: AccountId,
747    ) -> anyhow::Result<AccountState> {
748        let resp = self
749            .inner
750            .http_get_balance()
751            .await
752            .map_err(|e| anyhow::anyhow!(e))?;
753
754        let ts_init = self.generate_ts_init();
755        let raw = resp
756            .first()
757            .ok_or_else(|| anyhow::anyhow!("No account state returned from OKX"))?;
758        let account_state = parse_account_state(raw, account_id, ts_init)?;
759
760        Ok(account_state)
761    }
762
763    /// Sets the position mode for the account.
764    ///
765    /// Defaults to NetMode if no position mode is provided.
766    ///
767    /// # Errors
768    ///
769    /// Returns an error if the HTTP request fails or the position mode cannot be set.
770    ///
771    /// # Note
772    ///
773    /// This endpoint only works for accounts with derivatives trading enabled.
774    /// If the account only has spot trading, this will return an error.
775    pub async fn set_position_mode(&self, position_mode: OKXPositionMode) -> anyhow::Result<()> {
776        let mut params = SetPositionModeParamsBuilder::default();
777        params.pos_mode(position_mode);
778        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
779
780        match self.inner.http_set_position_mode(params).await {
781            Ok(_) => Ok(()),
782            Err(e) => {
783                // Check if this is the "Invalid request type" error for accounts without derivatives
784                if let crate::http::error::OKXHttpError::OkxError {
785                    error_code,
786                    message,
787                } = &e
788                    && error_code == "50115"
789                {
790                    tracing::warn!(
791                        "Account does not support position mode setting (derivatives trading not enabled): {message}"
792                    );
793                    return Ok(()); // Gracefully handle this case
794                }
795                anyhow::bail!(e)
796            }
797        }
798    }
799
800    /// Requests all instruments for the `instrument_type` from OKX.
801    ///
802    /// # Errors
803    ///
804    /// Returns an error if the HTTP request fails or instrument parsing fails.
805    pub async fn request_instruments(
806        &self,
807        instrument_type: OKXInstrumentType,
808    ) -> anyhow::Result<Vec<InstrumentAny>> {
809        let mut params = GetInstrumentsParamsBuilder::default();
810        params.inst_type(instrument_type);
811        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
812
813        let resp = self
814            .inner
815            .http_get_instruments(params)
816            .await
817            .map_err(|e| anyhow::anyhow!(e))?;
818
819        let ts_init = self.generate_ts_init();
820
821        let mut instruments: Vec<InstrumentAny> = Vec::new();
822        for inst in &resp {
823            if let Some(instrument_any) = parse_instrument_any(inst, ts_init)? {
824                instruments.push(instrument_any);
825            }
826        }
827
828        Ok(instruments)
829    }
830
831    /// Requests the latest mark price for the `instrument_type` from OKX.
832    ///
833    /// # Errors
834    ///
835    /// Returns an error if the HTTP request fails or no mark price is returned.
836    pub async fn request_mark_price(
837        &self,
838        instrument_id: InstrumentId,
839    ) -> anyhow::Result<MarkPriceUpdate> {
840        let mut params = GetMarkPriceParamsBuilder::default();
841        params.inst_id(instrument_id.symbol.inner());
842        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
843
844        let resp = self
845            .inner
846            .http_get_mark_price(params)
847            .await
848            .map_err(|e| anyhow::anyhow!(e))?;
849
850        let raw = resp
851            .first()
852            .ok_or_else(|| anyhow::anyhow!("No mark price returned from OKX"))?;
853        let inst = self
854            .instrument_or_fetch(instrument_id.symbol.inner())
855            .await?;
856        let ts_init = self.generate_ts_init();
857
858        let mark_price =
859            parse_mark_price_update(raw, instrument_id, inst.price_precision(), ts_init)
860                .map_err(|e| anyhow::anyhow!(e))?;
861        Ok(mark_price)
862    }
863
864    /// Requests the latest index price for the `instrument_id` from OKX.
865    ///
866    /// # Errors
867    ///
868    /// Returns an error if the HTTP request fails or no index price is returned.
869    pub async fn request_index_price(
870        &self,
871        instrument_id: InstrumentId,
872    ) -> anyhow::Result<IndexPriceUpdate> {
873        let mut params = GetIndexTickerParamsBuilder::default();
874        params.inst_id(instrument_id.symbol.inner());
875        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
876
877        let resp = self
878            .inner
879            .http_get_index_ticker(params)
880            .await
881            .map_err(|e| anyhow::anyhow!(e))?;
882
883        let raw = resp
884            .first()
885            .ok_or_else(|| anyhow::anyhow!("No index price returned from OKX"))?;
886        let inst = self
887            .instrument_or_fetch(instrument_id.symbol.inner())
888            .await?;
889        let ts_init = self.generate_ts_init();
890
891        let index_price =
892            parse_index_price_update(raw, instrument_id, inst.price_precision(), ts_init)
893                .map_err(|e| anyhow::anyhow!(e))?;
894        Ok(index_price)
895    }
896
897    /// Requests trades for the `instrument_id` and `start` -> `end` time range.
898    ///
899    /// # Errors
900    ///
901    /// Returns an error if the HTTP request fails or trade parsing fails.
902    pub async fn request_trades(
903        &self,
904        instrument_id: InstrumentId,
905        start: Option<DateTime<Utc>>,
906        end: Option<DateTime<Utc>>,
907        limit: Option<u32>,
908    ) -> anyhow::Result<Vec<TradeTick>> {
909        let mut params = GetTradesParamsBuilder::default();
910
911        params.inst_id(instrument_id.symbol.inner());
912        if let Some(s) = start {
913            params.before(s.timestamp_millis().to_string());
914        }
915        if let Some(e) = end {
916            params.after(e.timestamp_millis().to_string());
917        }
918        if let Some(l) = limit {
919            params.limit(l);
920        }
921
922        let params = params.build().map_err(anyhow::Error::new)?;
923
924        // Fetch raw trades
925        let raw_trades = self
926            .inner
927            .http_get_trades(params)
928            .await
929            .map_err(anyhow::Error::new)?;
930
931        let ts_init = self.generate_ts_init();
932        let inst = self
933            .instrument_or_fetch(instrument_id.symbol.inner())
934            .await?;
935
936        let mut trades = Vec::with_capacity(raw_trades.len());
937        for raw in raw_trades {
938            match parse_trade_tick(
939                &raw,
940                instrument_id,
941                inst.price_precision(),
942                inst.size_precision(),
943                ts_init,
944            ) {
945                Ok(trade) => trades.push(trade),
946                Err(e) => tracing::error!("{e}"),
947            }
948        }
949
950        Ok(trades)
951    }
952
953    /// Requests historical bars for the given bar type and time range.
954    ///
955    /// The aggregation source must be `EXTERNAL`. Time range validation ensures start < end.
956    /// Returns bars sorted oldest to newest.
957    ///
958    /// # Endpoint Selection
959    ///
960    /// The OKX API has different endpoints with different limits:
961    /// - Regular endpoint (`/api/v5/market/candles`): ≤ 300 rows/call, ≤ 40 req/2s
962    ///   - Used when: start is None OR age ≤ 100 days
963    /// - History endpoint (`/api/v5/market/history-candles`): ≤ 100 rows/call, ≤ 20 req/2s
964    ///   - Used when: start is Some AND age > 100 days
965    ///
966    /// Age is calculated as `Utc::now() - start` at the time of the first request.
967    ///
968    /// # Supported Aggregations
969    ///
970    /// Maps to OKX bar query parameter:
971    /// - `Second` → `{n}s`
972    /// - `Minute` → `{n}m`
973    /// - `Hour` → `{n}H`
974    /// - `Day` → `{n}D`
975    /// - `Week` → `{n}W`
976    /// - `Month` → `{n}M`
977    ///
978    /// # Pagination
979    ///
980    /// - Uses `before` parameter for backwards pagination
981    /// - Pages backwards from end time (or now) to start time
982    /// - Stops when: limit reached, time window covered, or API returns empty
983    /// - Rate limit safety: ≥ 50ms between requests
984    ///
985    /// # Panics
986    ///
987    /// May panic if internal data structures are in an unexpected state.
988    ///
989    /// # References
990    ///
991    /// - <https://tr.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks>
992    /// - <https://tr.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks-history>
993    pub async fn request_bars(
994        &self,
995        bar_type: BarType,
996        start: Option<DateTime<Utc>>,
997        mut end: Option<DateTime<Utc>>,
998        limit: Option<u32>,
999    ) -> anyhow::Result<Vec<Bar>> {
1000        const HISTORY_SPLIT_DAYS: i64 = 100;
1001        const MAX_PAGES_SOFT: usize = 500;
1002
1003        let limit = if limit == Some(0) { None } else { limit };
1004
1005        anyhow::ensure!(
1006            bar_type.aggregation_source() == AggregationSource::External,
1007            "Only EXTERNAL aggregation is supported"
1008        );
1009        if let (Some(s), Some(e)) = (start, end) {
1010            anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1011        }
1012
1013        let now = Utc::now();
1014        if let Some(s) = start
1015            && s > now
1016        {
1017            return Ok(Vec::new());
1018        }
1019        if let Some(e) = end
1020            && e > now
1021        {
1022            end = Some(now);
1023        }
1024
1025        let spec = bar_type.spec();
1026        let step = spec.step.get();
1027        let bar_param = match spec.aggregation {
1028            BarAggregation::Second => format!("{step}s"),
1029            BarAggregation::Minute => format!("{step}m"),
1030            BarAggregation::Hour => format!("{step}H"),
1031            BarAggregation::Day => format!("{step}D"),
1032            BarAggregation::Week => format!("{step}W"),
1033            BarAggregation::Month => format!("{step}M"),
1034            a => anyhow::bail!("OKX does not support {a:?} aggregation"),
1035        };
1036
1037        let slot_ms: i64 = match spec.aggregation {
1038            BarAggregation::Second => (step as i64) * 1_000,
1039            BarAggregation::Minute => (step as i64) * 60_000,
1040            BarAggregation::Hour => (step as i64) * 3_600_000,
1041            BarAggregation::Day => (step as i64) * 86_400_000,
1042            BarAggregation::Week => (step as i64) * 7 * 86_400_000,
1043            BarAggregation::Month => (step as i64) * 30 * 86_400_000,
1044            _ => unreachable!("Unsupported aggregation should have been caught above"),
1045        };
1046        let slot_ns: i64 = slot_ms * 1_000_000;
1047
1048        #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1049        enum Mode {
1050            Latest,
1051            Backward,
1052            Range,
1053        }
1054
1055        let mode = match (start, end) {
1056            (None, None) => Mode::Latest,
1057            (Some(_), None) => Mode::Backward, // Changed: when only start is provided, work backward from now
1058            (None, Some(_)) => Mode::Backward,
1059            (Some(_), Some(_)) => Mode::Range,
1060        };
1061
1062        let start_ns = start.and_then(|s| s.timestamp_nanos_opt());
1063        let end_ns = end.and_then(|e| e.timestamp_nanos_opt());
1064
1065        // Floor start and ceiling end to bar boundaries for cleaner API requests
1066        let start_ms = start.map(|s| {
1067            let ms = s.timestamp_millis();
1068            if slot_ms > 0 {
1069                (ms / slot_ms) * slot_ms // Floor to nearest bar boundary
1070            } else {
1071                ms
1072            }
1073        });
1074        let end_ms = end.map(|e| {
1075            let ms = e.timestamp_millis();
1076            if slot_ms > 0 {
1077                ((ms + slot_ms - 1) / slot_ms) * slot_ms // Ceiling to nearest bar boundary
1078            } else {
1079                ms
1080            }
1081        });
1082        let now_ms = now.timestamp_millis();
1083
1084        let symbol = bar_type.instrument_id().symbol;
1085        let inst = self.instrument_or_fetch(symbol.inner()).await?;
1086
1087        let mut out: Vec<Bar> = Vec::new();
1088        let mut pages = 0usize;
1089
1090        // IMPORTANT: OKX API behavior:
1091        // - With 'after' parameter: returns bars with timestamp > after (going forward)
1092        // - With 'before' parameter: returns bars with timestamp < before (going backward)
1093        // For Range mode, we use 'before' starting from the end time to get bars before it
1094        let mut after_ms: Option<i64> = None;
1095        let mut before_ms: Option<i64> = match mode {
1096            Mode::Backward => end_ms.map(|v| v.saturating_sub(1)),
1097            Mode::Range => {
1098                // For Range, start from the end time (or current time if no end specified)
1099                // The API will return bars with timestamp < before_ms
1100                Some(end_ms.unwrap_or(now_ms))
1101            }
1102            Mode::Latest => None,
1103        };
1104
1105        // For Range mode, we'll paginate backwards like Backward mode
1106        let mut forward_prepend_mode = matches!(mode, Mode::Range);
1107
1108        // Adjust before_ms to ensure we get data from the API
1109        // OKX API might not have bars for the very recent past
1110        // This handles both explicit end=now and the actor layer setting end=now when it's None
1111        if matches!(mode, Mode::Backward | Mode::Range)
1112            && let Some(b) = before_ms
1113        {
1114            // OKX endpoints have different data availability windows:
1115            // - Regular endpoint: has most recent data but limited depth
1116            // - History endpoint: has deep history but lags behind current time
1117            // Use a small buffer to avoid the "dead zone"
1118            let buffer_ms = slot_ms.max(60_000); // At least 1 minute or 1 bar
1119            if b >= now_ms.saturating_sub(buffer_ms) {
1120                before_ms = Some(now_ms.saturating_sub(buffer_ms));
1121            }
1122        }
1123
1124        let mut have_latest_first_page = false;
1125        let mut progressless_loops = 0u8;
1126
1127        loop {
1128            if let Some(lim) = limit
1129                && lim > 0
1130                && out.len() >= lim as usize
1131            {
1132                break;
1133            }
1134            if pages >= MAX_PAGES_SOFT {
1135                break;
1136            }
1137
1138            let pivot_ms = if let Some(a) = after_ms {
1139                a
1140            } else if let Some(b) = before_ms {
1141                b
1142            } else {
1143                now_ms
1144            };
1145            // Choose endpoint based on how old the data is:
1146            // - Use regular endpoint for recent data (< 1 hour old)
1147            // - Use history endpoint for older data (> 1 hour old)
1148            // This avoids the "gap" where history endpoint has no recent data
1149            // and regular endpoint has limited depth
1150            let age_ms = now_ms.saturating_sub(pivot_ms);
1151            let age_hours = age_ms / (60 * 60 * 1000);
1152            let using_history = age_hours > 1; // Use history if data is > 1 hour old
1153
1154            let page_ceiling = if using_history { 100 } else { 300 };
1155            let remaining = limit
1156                .filter(|&l| l > 0) // Treat limit=0 as no limit
1157                .map(|l| (l as usize).saturating_sub(out.len()))
1158                .unwrap_or(page_ceiling);
1159            let page_cap = remaining.min(page_ceiling);
1160
1161            let mut p = GetCandlesticksParamsBuilder::default();
1162            p.inst_id(symbol.as_str())
1163                .bar(&bar_param)
1164                .limit(page_cap as u32);
1165
1166            // Track whether this planned request uses BEFORE or AFTER.
1167            let mut req_used_before = false;
1168
1169            match mode {
1170                Mode::Latest => {
1171                    if have_latest_first_page && let Some(b) = before_ms {
1172                        p.before_ms(b);
1173                        req_used_before = true;
1174                    }
1175                }
1176                Mode::Backward => {
1177                    if let Some(b) = before_ms {
1178                        p.before_ms(b);
1179                        req_used_before = true;
1180                    }
1181                }
1182                Mode::Range => {
1183                    // For first request with regular endpoint, try without parameters
1184                    // to get the most recent bars, then filter
1185                    if pages == 0 && !using_history {
1186                        // Don't set any time parameters on first request
1187                        // This gets the most recent bars available
1188                    } else if forward_prepend_mode {
1189                        if let Some(b) = before_ms {
1190                            p.before_ms(b);
1191                            req_used_before = true;
1192                        }
1193                    } else if let Some(a) = after_ms {
1194                        p.after_ms(a);
1195                    }
1196                }
1197            }
1198
1199            let params = p.build().map_err(anyhow::Error::new)?;
1200
1201            let mut raw = if using_history {
1202                self.inner
1203                    .http_get_candlesticks_history(params.clone())
1204                    .await
1205                    .map_err(anyhow::Error::new)?
1206            } else {
1207                self.inner
1208                    .http_get_candlesticks(params.clone())
1209                    .await
1210                    .map_err(anyhow::Error::new)?
1211            };
1212
1213            // --- Fallbacks on empty page ---
1214            if raw.is_empty() {
1215                // LATEST: retry same cursor via history, then step back a page-interval before giving up
1216                if matches!(mode, Mode::Latest)
1217                    && have_latest_first_page
1218                    && !using_history
1219                    && let Some(b) = before_ms
1220                {
1221                    let mut p2 = GetCandlesticksParamsBuilder::default();
1222                    p2.inst_id(symbol.as_str())
1223                        .bar(&bar_param)
1224                        .limit(page_cap as u32);
1225                    p2.before_ms(b);
1226                    let params2 = p2.build().map_err(anyhow::Error::new)?;
1227                    let raw2 = self
1228                        .inner
1229                        .http_get_candlesticks_history(params2)
1230                        .await
1231                        .map_err(anyhow::Error::new)?;
1232                    if !raw2.is_empty() {
1233                        raw = raw2;
1234                    } else {
1235                        // Step back one page interval and retry loop
1236                        let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
1237                        before_ms = Some(b.saturating_sub(jump));
1238                        progressless_loops = progressless_loops.saturating_add(1);
1239                        if progressless_loops >= 3 {
1240                            break;
1241                        }
1242                        continue;
1243                    }
1244                }
1245
1246                // Range mode doesn't need special bootstrap - it uses the normal flow with before_ms set
1247
1248                // If still empty: for Range after first page, try a single backstep window using BEFORE
1249                if raw.is_empty() && matches!(mode, Mode::Range) && pages > 0 {
1250                    let backstep_ms = (page_cap as i64).saturating_mul(slot_ms.max(1));
1251                    let pivot_back = after_ms.unwrap_or(now_ms).saturating_sub(backstep_ms);
1252
1253                    let mut p2 = GetCandlesticksParamsBuilder::default();
1254                    p2.inst_id(symbol.as_str())
1255                        .bar(&bar_param)
1256                        .limit(page_cap as u32)
1257                        .before_ms(pivot_back);
1258                    let params2 = p2.build().map_err(anyhow::Error::new)?;
1259                    let raw2 = if (now_ms.saturating_sub(pivot_back)) / (24 * 60 * 60 * 1000)
1260                        > HISTORY_SPLIT_DAYS
1261                    {
1262                        self.inner.http_get_candlesticks_history(params2).await
1263                    } else {
1264                        self.inner.http_get_candlesticks(params2).await
1265                    }
1266                    .map_err(anyhow::Error::new)?;
1267                    if raw2.is_empty() {
1268                        break;
1269                    } else {
1270                        raw = raw2;
1271                        forward_prepend_mode = true;
1272                        req_used_before = true;
1273                    }
1274                }
1275
1276                // First LATEST page empty: jump back >100d to force history, then continue loop
1277                if raw.is_empty()
1278                    && matches!(mode, Mode::Latest)
1279                    && !have_latest_first_page
1280                    && !using_history
1281                {
1282                    let jump_days_ms = (HISTORY_SPLIT_DAYS + 1) * 86_400_000;
1283                    before_ms = Some(now_ms.saturating_sub(jump_days_ms));
1284                    have_latest_first_page = true;
1285                    continue;
1286                }
1287
1288                // Still empty for any other case? Just break.
1289                if raw.is_empty() {
1290                    break;
1291                }
1292            }
1293            // --- end fallbacks ---
1294
1295            pages += 1;
1296
1297            // Parse, oldest → newest
1298            let ts_init = self.generate_ts_init();
1299            let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
1300            for r in &raw {
1301                page.push(parse_candlestick(
1302                    r,
1303                    bar_type,
1304                    inst.price_precision(),
1305                    inst.size_precision(),
1306                    ts_init,
1307                )?);
1308            }
1309            page.reverse();
1310
1311            let page_oldest_ms = page.first().map(|b| b.ts_event.as_i64() / 1_000_000);
1312            let page_newest_ms = page.last().map(|b| b.ts_event.as_i64() / 1_000_000);
1313
1314            // Range filter (inclusive)
1315            // For Range mode, if we have no bars yet and this is an early page,
1316            // be more tolerant with the start boundary to handle gaps in data
1317            let mut filtered: Vec<Bar> = if matches!(mode, Mode::Range)
1318                && out.is_empty()
1319                && pages < 2
1320            {
1321                // On first pages of Range mode with no data yet, include the most recent bar
1322                // even if it's slightly before our start time (within 2 bar periods)
1323                // BUT we want ALL bars in the page that are within our range
1324                let tolerance_ns = slot_ns * 2; // Allow up to 2 bar periods before start
1325
1326                // Debug: log the page range
1327                if !page.is_empty() {
1328                    tracing::debug!(
1329                        "Range mode bootstrap page: {} bars from {} to {}, filtering with start={:?} end={:?}",
1330                        page.len(),
1331                        page.first().unwrap().ts_event.as_i64() / 1_000_000,
1332                        page.last().unwrap().ts_event.as_i64() / 1_000_000,
1333                        start_ms,
1334                        end_ms
1335                    );
1336                }
1337
1338                let result: Vec<Bar> = page
1339                    .clone()
1340                    .into_iter()
1341                    .filter(|b| {
1342                        let ts = b.ts_event.as_i64();
1343                        // Accept bars from (start - tolerance) to end
1344                        let ok_after =
1345                            start_ns.is_none_or(|sns| ts >= sns.saturating_sub(tolerance_ns));
1346                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1347                        ok_after && ok_before
1348                    })
1349                    .collect();
1350
1351                result
1352            } else {
1353                // Normal filtering
1354                page.clone()
1355                    .into_iter()
1356                    .filter(|b| {
1357                        let ts = b.ts_event.as_i64();
1358                        let ok_after = start_ns.is_none_or(|sns| ts >= sns);
1359                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1360                        ok_after && ok_before
1361                    })
1362                    .collect()
1363            };
1364
1365            if !page.is_empty() && filtered.is_empty() {
1366                // For Range mode, if all bars are before our start time, there's no point continuing
1367                if matches!(mode, Mode::Range)
1368                    && !forward_prepend_mode
1369                    && let (Some(newest_ms), Some(start_ms)) = (page_newest_ms, start_ms)
1370                    && newest_ms < start_ms.saturating_sub(slot_ms * 2)
1371                {
1372                    // Bars are too old (more than 2 bar periods before start), stop
1373                    break;
1374                }
1375            }
1376
1377            // Track contribution for progress guard
1378            let contribution;
1379
1380            if out.is_empty() {
1381                contribution = filtered.len();
1382                out = filtered;
1383            } else {
1384                match mode {
1385                    Mode::Backward | Mode::Latest => {
1386                        if let Some(first) = out.first() {
1387                            filtered.retain(|b| b.ts_event < first.ts_event);
1388                        }
1389                        contribution = filtered.len();
1390                        if contribution != 0 {
1391                            let mut new_out = Vec::with_capacity(out.len() + filtered.len());
1392                            new_out.extend_from_slice(&filtered);
1393                            new_out.extend_from_slice(&out);
1394                            out = new_out;
1395                        }
1396                    }
1397                    Mode::Range => {
1398                        if forward_prepend_mode || req_used_before {
1399                            // We are backfilling older pages: prepend them.
1400                            if let Some(first) = out.first() {
1401                                filtered.retain(|b| b.ts_event < first.ts_event);
1402                            }
1403                            contribution = filtered.len();
1404                            if contribution != 0 {
1405                                let mut new_out = Vec::with_capacity(out.len() + filtered.len());
1406                                new_out.extend_from_slice(&filtered);
1407                                new_out.extend_from_slice(&out);
1408                                out = new_out;
1409                            }
1410                        } else {
1411                            // Normal forward: append newer pages.
1412                            if let Some(last) = out.last() {
1413                                filtered.retain(|b| b.ts_event > last.ts_event);
1414                            }
1415                            contribution = filtered.len();
1416                            out.extend(filtered);
1417                        }
1418                    }
1419                }
1420            }
1421
1422            // Duplicate-window mitigation for Latest/Backward
1423            if contribution == 0
1424                && matches!(mode, Mode::Latest | Mode::Backward)
1425                && let Some(b) = before_ms
1426            {
1427                let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
1428                let new_b = b.saturating_sub(jump);
1429                if new_b != b {
1430                    before_ms = Some(new_b);
1431                }
1432            }
1433
1434            if contribution == 0 {
1435                progressless_loops = progressless_loops.saturating_add(1);
1436                if progressless_loops >= 3 {
1437                    break;
1438                }
1439            } else {
1440                progressless_loops = 0;
1441
1442                // Advance cursors only when we made progress
1443                match mode {
1444                    Mode::Latest | Mode::Backward => {
1445                        if let Some(oldest) = page_oldest_ms {
1446                            before_ms = Some(oldest.saturating_sub(1));
1447                            have_latest_first_page = true;
1448                        } else {
1449                            break;
1450                        }
1451                    }
1452                    Mode::Range => {
1453                        if forward_prepend_mode || req_used_before {
1454                            if let Some(oldest) = page_oldest_ms {
1455                                // Move back by at least one bar period to avoid getting the same data
1456                                let jump_back = slot_ms.max(60_000); // At least 1 minute
1457                                before_ms = Some(oldest.saturating_sub(jump_back));
1458                                after_ms = None;
1459                            } else {
1460                                break;
1461                            }
1462                        } else if let Some(newest) = page_newest_ms {
1463                            after_ms = Some(newest.saturating_add(1));
1464                            before_ms = None;
1465                        } else {
1466                            break;
1467                        }
1468                    }
1469                }
1470            }
1471
1472            // Stop conditions
1473            if let Some(lim) = limit
1474                && lim > 0
1475                && out.len() >= lim as usize
1476            {
1477                break;
1478            }
1479            if let Some(ens) = end_ns
1480                && let Some(last) = out.last()
1481                && last.ts_event.as_i64() >= ens
1482            {
1483                break;
1484            }
1485            if let Some(sns) = start_ns
1486                && let Some(first) = out.first()
1487                && (matches!(mode, Mode::Backward) || forward_prepend_mode)
1488                && first.ts_event.as_i64() <= sns
1489            {
1490                // For Range mode, check if we have all bars up to the end time
1491                if matches!(mode, Mode::Range) {
1492                    // Don't stop if we haven't reached the end time yet
1493                    if let Some(ens) = end_ns
1494                        && let Some(last) = out.last()
1495                    {
1496                        let last_ts = last.ts_event.as_i64();
1497                        if last_ts < ens {
1498                            // We have bars before start but haven't reached end, need to continue forward
1499                            // Switch from backward to forward pagination
1500                            forward_prepend_mode = false;
1501                            after_ms = Some((last_ts / 1_000_000).saturating_add(1));
1502                            before_ms = None;
1503                            continue;
1504                        }
1505                    }
1506                }
1507                break;
1508            }
1509
1510            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1511        }
1512
1513        // Final rescue for FORWARD/RANGE when nothing gathered
1514        if out.is_empty() && matches!(mode, Mode::Range) {
1515            let pivot = end_ms.unwrap_or(now_ms.saturating_sub(1));
1516            let hist = (now_ms.saturating_sub(pivot)) / (24 * 60 * 60 * 1000) > HISTORY_SPLIT_DAYS;
1517            let mut p = GetCandlesticksParamsBuilder::default();
1518            p.inst_id(symbol.as_str())
1519                .bar(&bar_param)
1520                .limit(300)
1521                .before_ms(pivot);
1522            let params = p.build().map_err(anyhow::Error::new)?;
1523            let raw = if hist {
1524                self.inner.http_get_candlesticks_history(params).await
1525            } else {
1526                self.inner.http_get_candlesticks(params).await
1527            }
1528            .map_err(anyhow::Error::new)?;
1529            if !raw.is_empty() {
1530                let ts_init = self.generate_ts_init();
1531                let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
1532                for r in &raw {
1533                    page.push(parse_candlestick(
1534                        r,
1535                        bar_type,
1536                        inst.price_precision(),
1537                        inst.size_precision(),
1538                        ts_init,
1539                    )?);
1540                }
1541                page.reverse();
1542                out = page
1543                    .into_iter()
1544                    .filter(|b| {
1545                        let ts = b.ts_event.as_i64();
1546                        let ok_after = start_ns.is_none_or(|sns| ts >= sns);
1547                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1548                        ok_after && ok_before
1549                    })
1550                    .collect();
1551            }
1552        }
1553
1554        // Trim against end bound if needed (keep ≤ end)
1555        if let Some(ens) = end_ns {
1556            while out.last().is_some_and(|b| b.ts_event.as_i64() > ens) {
1557                out.pop();
1558            }
1559        }
1560
1561        // Clamp first bar for Range when using forward pagination
1562        if matches!(mode, Mode::Range)
1563            && !forward_prepend_mode
1564            && let Some(sns) = start_ns
1565        {
1566            let lower = sns.saturating_sub(slot_ns);
1567            while out.first().is_some_and(|b| b.ts_event.as_i64() < lower) {
1568                out.remove(0);
1569            }
1570        }
1571
1572        if let Some(lim) = limit
1573            && lim > 0
1574            && out.len() > lim as usize
1575        {
1576            out.truncate(lim as usize);
1577        }
1578
1579        Ok(out)
1580    }
1581
1582    /// Requests historical order status reports for the given parameters.
1583    ///
1584    /// # References
1585    ///
1586    /// - <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-history-last-7-days>.
1587    /// - <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-history-last-3-months>.
1588    #[allow(clippy::too_many_arguments)]
1589    pub async fn request_order_status_reports(
1590        &self,
1591        account_id: AccountId,
1592        instrument_type: Option<OKXInstrumentType>,
1593        instrument_id: Option<InstrumentId>,
1594        start: Option<DateTime<Utc>>,
1595        end: Option<DateTime<Utc>>,
1596        open_only: bool,
1597        limit: Option<u32>,
1598    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1599        // Build params for order history
1600        let mut history_params = GetOrderHistoryParamsBuilder::default();
1601
1602        let instrument_type = if let Some(instrument_type) = instrument_type {
1603            instrument_type
1604        } else {
1605            let instrument_id = instrument_id.ok_or_else(|| {
1606                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
1607            })?;
1608            let instrument = self
1609                .instrument_or_fetch(instrument_id.symbol.inner())
1610                .await?;
1611            okx_instrument_type(&instrument)?
1612        };
1613
1614        history_params.inst_type(instrument_type);
1615
1616        if let Some(instrument_id) = instrument_id.as_ref() {
1617            history_params.inst_id(instrument_id.symbol.inner().to_string());
1618        }
1619
1620        if let Some(limit) = limit {
1621            history_params.limit(limit);
1622        }
1623
1624        let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
1625
1626        // Build params for pending orders
1627        let mut pending_params = GetOrderListParamsBuilder::default();
1628        pending_params.inst_type(instrument_type);
1629
1630        if let Some(instrument_id) = instrument_id.as_ref() {
1631            pending_params.inst_id(instrument_id.symbol.inner().to_string());
1632        }
1633
1634        if let Some(limit) = limit {
1635            pending_params.limit(limit);
1636        }
1637
1638        let pending_params = pending_params.build().map_err(|e| anyhow::anyhow!(e))?;
1639
1640        let combined_resp = if open_only {
1641            // Only request pending/open orders
1642            self.inner
1643                .http_get_order_list(pending_params)
1644                .await
1645                .map_err(|e| anyhow::anyhow!(e))?
1646        } else {
1647            // Make both requests concurrently
1648            let (history_resp, pending_resp) = tokio::try_join!(
1649                self.inner.http_get_order_history(history_params),
1650                self.inner.http_get_order_list(pending_params)
1651            )
1652            .map_err(|e| anyhow::anyhow!(e))?;
1653
1654            // Combine both responses
1655            let mut combined_resp = history_resp;
1656            combined_resp.extend(pending_resp);
1657            combined_resp
1658        };
1659
1660        // Prepare time range filter
1661        let start_ns = start.map(UnixNanos::from);
1662        let end_ns = end.map(UnixNanos::from);
1663
1664        let ts_init = self.generate_ts_init();
1665        let mut reports = Vec::with_capacity(combined_resp.len());
1666
1667        // Use a seen filter in case pending orders are within the histories "2hr reserve window"
1668        let mut seen = AHashSet::new();
1669
1670        for order in combined_resp {
1671            if seen.contains(&order.cl_ord_id) {
1672                continue; // Reserved pending already reported
1673            }
1674            seen.insert(order.cl_ord_id);
1675
1676            let inst = self.instrument_or_fetch(order.inst_id).await?;
1677
1678            let report = parse_order_status_report(
1679                &order,
1680                account_id,
1681                inst.id(),
1682                inst.price_precision(),
1683                inst.size_precision(),
1684                ts_init,
1685            );
1686
1687            if let Some(start_ns) = start_ns
1688                && report.ts_last < start_ns
1689            {
1690                continue;
1691            }
1692            if let Some(end_ns) = end_ns
1693                && report.ts_last > end_ns
1694            {
1695                continue;
1696            }
1697
1698            reports.push(report);
1699        }
1700
1701        Ok(reports)
1702    }
1703
1704    /// Requests fill reports (transaction details) for the given parameters.
1705    ///
1706    /// # References
1707    ///
1708    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-transaction-details-last-3-days>.
1709    pub async fn request_fill_reports(
1710        &self,
1711        account_id: AccountId,
1712        instrument_type: Option<OKXInstrumentType>,
1713        instrument_id: Option<InstrumentId>,
1714        start: Option<DateTime<Utc>>,
1715        end: Option<DateTime<Utc>>,
1716        limit: Option<u32>,
1717    ) -> anyhow::Result<Vec<FillReport>> {
1718        let mut params = GetTransactionDetailsParamsBuilder::default();
1719
1720        let instrument_type = if let Some(instrument_type) = instrument_type {
1721            instrument_type
1722        } else {
1723            let instrument_id = instrument_id.ok_or_else(|| {
1724                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
1725            })?;
1726            let instrument = self
1727                .instrument_or_fetch(instrument_id.symbol.inner())
1728                .await?;
1729            okx_instrument_type(&instrument)?
1730        };
1731
1732        params.inst_type(instrument_type);
1733
1734        if let Some(instrument_id) = instrument_id {
1735            let instrument = self
1736                .instrument_or_fetch(instrument_id.symbol.inner())
1737                .await?;
1738            let instrument_type = okx_instrument_type(&instrument)?;
1739            params.inst_type(instrument_type);
1740            params.inst_id(instrument_id.symbol.inner().to_string());
1741        }
1742
1743        if let Some(limit) = limit {
1744            params.limit(limit);
1745        }
1746
1747        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1748
1749        let resp = self
1750            .inner
1751            .http_get_transaction_details(params)
1752            .await
1753            .map_err(|e| anyhow::anyhow!(e))?;
1754
1755        // Prepare time range filter
1756        let start_ns = start.map(UnixNanos::from);
1757        let end_ns = end.map(UnixNanos::from);
1758
1759        let ts_init = self.generate_ts_init();
1760        let mut reports = Vec::with_capacity(resp.len());
1761
1762        for detail in resp {
1763            let inst = self.instrument_or_fetch(detail.inst_id).await?;
1764
1765            let report = parse_fill_report(
1766                detail,
1767                account_id,
1768                inst.id(),
1769                inst.price_precision(),
1770                inst.size_precision(),
1771                ts_init,
1772            )?;
1773
1774            if let Some(start_ns) = start_ns
1775                && report.ts_event < start_ns
1776            {
1777                continue;
1778            }
1779
1780            if let Some(end_ns) = end_ns
1781                && report.ts_event > end_ns
1782            {
1783                continue;
1784            }
1785
1786            reports.push(report);
1787        }
1788
1789        Ok(reports)
1790    }
1791
1792    /// Requests current position status reports for the given parameters.
1793    ///
1794    /// # References
1795    ///
1796    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions>.
1797    pub async fn request_position_status_reports(
1798        &self,
1799        account_id: AccountId,
1800        instrument_type: Option<OKXInstrumentType>,
1801        instrument_id: Option<InstrumentId>,
1802    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1803        let mut params = GetPositionsParamsBuilder::default();
1804
1805        let instrument_type = if let Some(instrument_type) = instrument_type {
1806            instrument_type
1807        } else {
1808            let instrument_id = instrument_id.ok_or_else(|| {
1809                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
1810            })?;
1811            let instrument = self
1812                .instrument_or_fetch(instrument_id.symbol.inner())
1813                .await?;
1814            okx_instrument_type(&instrument)?
1815        };
1816
1817        params.inst_type(instrument_type);
1818
1819        instrument_id
1820            .as_ref()
1821            .map(|i| params.inst_id(i.symbol.inner()));
1822
1823        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1824
1825        let resp = self
1826            .inner
1827            .http_get_positions(params)
1828            .await
1829            .map_err(|e| anyhow::anyhow!(e))?;
1830
1831        let ts_init = self.generate_ts_init();
1832        let mut reports = Vec::with_capacity(resp.len());
1833
1834        for position in resp {
1835            let inst = self.instrument_or_fetch(position.inst_id).await?;
1836
1837            let report = parse_position_status_report(
1838                position,
1839                account_id,
1840                inst.id(),
1841                inst.size_precision(),
1842                ts_init,
1843            );
1844            reports.push(report);
1845        }
1846
1847        Ok(reports)
1848    }
1849}